I recently had to set up a PySpark pipeline that performed a number of data transformations (dimensionally reducing the data to focus on rollups of factors we wanted to target and streamline for downstream querying and analysis). Writing tests for this was not immediately intuitive. In the end, the pattern follows that of most
pytest tests that you would see in a standard Python library, though. This post documents a simplified, generic example of a pipeline job and how to break apart step transformations and test them independent of the parent operation’s complete step flow.
Main function overview
The generic method will have a few key steps. First it will format a query, then it will submit the query (with
spark.sql) and return a Spark DataFrame with the result. At that point, it will run a series transformations on the DataFrame and, finally, it will write the results to some parameterized destination (which in production might be an S3 bucket, for example).
We can describe the steps involved with the following pseudocode:
Main function steps details
Fleshing this out, we can describe the
format_query step with the following method:
Here, we have a SQL query that can be formatted with parameters supplied to the
Next we have to make the query by submitting it with
This step will be patched in unit tests to avoid actually querying the database. In its place, a fixture representing a subset of data that matches the database schema will be supplied instead.
Now we will provide two example transformation on the DataFrame. What these do is not super important - these steps are purely for demonstration.
Fleshing out the main method
We can now revisit the
main() method and show how all the example steps can be rolled together in a
main() method workflow:
save/write operation could probably also be broken out into a different step, too. But, I’ll leave at this for the sake of the example.
I will do two main tests. First, I will have tests that check the individual steps and make sure they behave as expected. Then I will have tests that check that each step integrates with its subsequent steps by running the whole main method and checking its results.
First, to check each step we can start by creating
pytest fixtures that mock inputs and outputs from each step of our multi-step
In this case, I create the following two:
We can see how these are used by examining all the tests for the steps:
What has been done is that, each step has been isolated with a mock for the input and a mock for the output having been shimmed. Then, we can compare the result produced with the result we expected by comparing the output fixture against the one generated.
In the case of the multiple steps, the output of one step can also be recycled to be the reference DataFrame for the subsequent step.
Testing the main method
Now we can move on to test the whole process combined in the main function. In this case, we can also test the write step since it’s an “output” of the
main method, essentially.
Now, by parameterizing the write location, we can avoid writing to an external service like S3 and instead write to a temporary directory locally. See the “TODO” section for where that file could then be read in and compared to an example dataset to ensure that the output data produced from the
main() method matched with what is expected.
I’ve noticed that unit testing may not be as “popular” with Spark applications because the set up is onerous but, with some careful method structuring, unit tests can be developed to ensure that transformation steps behave as expected. In addition, such steps help enforce that each method can safely expect certain columns and data presence and make future modifications to data transformations performed more safely.