Introduction
I recently had to set up a PySpark pipeline that performed a number of data transformations (dimensionally reducing the data to focus on rollups of factors we wanted to target and streamline for downstream querying and analysis). Writing tests for this was not immediately intuitive. In the end, the pattern follows that of most pytest
tests that you would see in a standard Python library, though. This post documents a simplified, generic example of a pipeline job and how to break apart step transformations and test them independent of the parent operation’s complete step flow.
Main function overview
The generic method will have a few key steps. First it will format a query, then it will submit the query (with spark.sql
) and return a Spark DataFrame with the result. At that point, it will run a series transformations on the DataFrame and, finally, it will write the results to some parameterized destination (which in production might be an S3 bucket, for example).
We can describe the steps involved with the following pseudocode:
Main function steps details
Fleshing this out, we can describe the format_query
step with the following method:
Here, we have a SQL query that can be formatted with parameters supplied to the main()
method.
Next we have to make the query by submitting it with spark
:
This step will be patched in unit tests to avoid actually querying the database. In its place, a fixture representing a subset of data that matches the database schema will be supplied instead.
Now we will provide two example transformation on the DataFrame. What these do is not super important - these steps are purely for demonstration.
Fleshing out the main method
We can now revisit the main()
method and show how all the example steps can be rolled together in a main()
method workflow:
The save/write operation
could probably also be broken out into a different step, too. But, I’ll leave at this for the sake of the example.
Testing overview
I will do two main tests. First, I will have tests that check the individual steps and make sure they behave as expected. Then I will have tests that check that each step integrates with its subsequent steps by running the whole main method and checking its results.
Testing steps
First, to check each step we can start by creating pytest
fixtures that mock inputs and outputs from each step of our multi-step main
method.
In this case, I create the following two:
We can see how these are used by examining all the tests for the steps:
What has been done is that, each step has been isolated with a mock for the input and a mock for the output having been shimmed. Then, we can compare the result produced with the result we expected by comparing the output fixture against the one generated.
In the case of the multiple steps, the output of one step can also be recycled to be the reference DataFrame for the subsequent step.
Testing the main method
Now we can move on to test the whole process combined in the main function. In this case, we can also test the write step since it’s an “output” of the main
method, essentially.
Now, by parameterizing the write location, we can avoid writing to an external service like S3 and instead write to a temporary directory locally. See the “TODO” section for where that file could then be read in and compared to an example dataset to ensure that the output data produced from the main()
method matched with what is expected.
Conclusion
I’ve noticed that unit testing may not be as “popular” with Spark applications because the set up is onerous but, with some careful method structuring, unit tests can be developed to ensure that transformation steps behave as expected. In addition, such steps help enforce that each method can safely expect certain columns and data presence and make future modifications to data transformations performed more safely.