How to Do Big Data Sanity Checks with PySpark and Evaluate Data Quality

Written by David Eigenstuhler, Data Engineer

In the Data Engineering department of Runtastic, we heavily rely on Hadoop and its services to ingest and pre-process big amounts of data. This data forms the basis for further analysis and utilization by our data scientists.

1.) What’s the Problem?

Everybody who has some experience in collecting big data knows that data is like humankind — it comes in all shapes and sizes. More importantly, it comes from all corners of the company and even beyond. This makes it prone to data quality issues which can easily go unnoticed.

Here is a non-exhaustive list of potential pitfalls to give you an idea:

  • An attribute (of hundreds) is not parsed correctly because the name or format in the source data changed
  • An attribute (of hundreds) is not included in the source data anymore because of a bug/change in the source application
  • An edge case or unforeseen situation results in producing too many/too few events
  • A mandatory attribute is missing due to a problem in either producing the events or processing the information
  • Unique values are duplicated

As Runtastic — like any other tech company — is not immune to failures or errors, we had to come up with a solution to face those challenges. We wanted to make simple checks on the so-called sanity of our incoming data and also verify the first step of pre-processing: correctly parsing and structuring the input information.

2.) Status Quo

We use a batch job every 24 hours to decode the data (which was rawly ingested yesterday as binary data) to text. In the process, we also parse the information and try to put the semi- or unstructured data into flat, relational table forms so our analytical services down the lane can access it via standard JDBC connections. This is done mainly with Hive queries. We decided to execute our data quality checks also in a batch fashion after pre-processing to not disrupt that process.

3.) Our Solution

To keep it simple and maintainable, we are using the unittest library of Python and writing a job for each entity type (e.g. sports activities). PySpark has a fully compatible Python instance running on the Spark driver (where the job was started) while still having access to the Spark cluster running in Scala. This allows us to use the performant parallel computation of Spark and combine it with standard Python unit testing.

As we receive multiple updates per entity each day, we have to select the most current one per day to get the most up-to-date version. We use a Hive script which orders the activities by the modified timestamp, grouped by id, and selects the one with the most recent timestamp. The first test is a simple assertion if the count of all entities per day is greater than zero. Please have a look at the function “test_raw_entity_count” in the base class  “EntityTestCase” in the appended code example for details. To check the correct result, we compare the count of filtered activities from the Hive script with the count of unique ids calculated with PySpark based on the raw data which we received in the first step. Please have a look at the function “test_processed_specific_entitys_count” in the class  “SpecificTestCase” for details.

Next, we check if yesterday’s count is within the range of last week’s average +/- 30%. In addition, we check whether there are any NULLs or default values in mandatory columns. Lastly, we assert that optional fields are filled for at least a certain tolerance percentage in relation to all rows. Please have a look at the functions “test_compare_raw_count_to_last_days” in the base class  “EntityTestCase” and  “test_specific_entity_fields” in the class “SpecificTestCase” for details.

Besides using a different workflow for asserting the correct processing, we also let another data engineer write the test in contrast to the one who implemented the actual workflow.

4.) Solution Details

To avoid duplicate code, we use a base class which stores specific attributes and handles generic assertions which are the same for all entity types.

class EntityTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls, entity_name, tolerance, user, partition, database, sc, sqlContext):
        cls.entity     = entity_name
        cls.tolerance  = tolerance # tolerance for missing values per column as float
        cls.user       = user
        cls.partition  = int(partition)
        cls.database   = database
        cls.sc         = sc
        cls.sqlContext = sqlContext
        cls.entities   = sc.sequenceFile("/user/%s/%s/%s/*" % (cls.user, cls.entity_name, cls.partition)) # raw data
def test_raw_entity_count(self):
        raw_count = self.entities.count()
        self.assertGreater(raw_count,0) # our first assertion
        upsert_entity_count_to_hive(db = self.database, entity = self.entity_name, partition = self.partition, value_to_update = raw_count) # helper method for up-/inserting counts to hive table
def test_compare_raw_count_to_last_days(self):
        raw_count           = self.entities.count()
        base_date           = partition_to_date(self.partition) # helper method to get partition as date type
        date_list           = [base_date - timedelta(days=x) for x in range(1, 8)] # get last 7 days
        date_list           = [int(datetime.strftime(i, "%Y%m%d")) for i in date_list] # get days in partition format
        counts_from_hive_df = fetch_counts_from_hive(db = self.database, entity = self.entity_name).where(col('partition').isin(date_list)) # counts from last 7 days
        avg_count           = counts_from_hive_df.agg(avg(col('raw_entity_count'))).collect()[0][0] # average of last week's counts
        in_range            = (raw_count > avg_count*(1-self.tolerance)) and (raw_count < avg_count*(1+self.tolerance)) # boolean if yesterday is within +/- 30% tolerance
        self.assertTrue(in_range, msg='Lower={0}; Upper={1}; Actual={2}'.format(avg_count*(1-self.tolerance), avg_count*(1+self.tolerance), raw_count)) # assertion with custom message for tolerance limits

The entity-specific class inherits from our base class and defines additional test methods.

app_name   = 'TestingContext_%s' % str(int(time.time()))
sc         = SparkContext(appName=app_name)
sqlContext = HiveContext(sc)
class SpecificTestCase(EntityTestCase):
    @classmethod
    def setUpClass(cls):
        entity_name    = 'specific_entity'
        tolerance      = 0.3
        user           = USER
        partition      = PARTITION
        database       = DATABASE
        super(SpecificTestCase, cls).setUpClass(entity_name, tolerance, user, partition, database, sc, sqlContext)
    def test_specific_entity_fields(self):
        specific_entitys_table = sqlContext.sql("select * from %s.%s where partition=%s" % (self.database, self.entity_name, self.partition))
        count                  = specific_entitys_table.count()
        null_tolerance         = 0.75
        # check if all mandatory fields are set
        self.assertEquals(0, specific_entitys_table.where(col('id').isNull()).count())
        self.assertEquals(0, specific_entitys_table.where(col('app_id').isNull()).count())
        ...
        # check if optional fields contain lots of null values (depending on tolerance level)
        self.assertTrue(specific_entitys_table.where(col('notes').isNull()).count() < count*null_tolerance)
        self.assertTrue(specific_entitys_table.where(col('weather').isNull()).count() < count*null_tolerance)
        ...
    def test_processed_specific_entitys_count(self):
        specific_entitys_dict  = map(lambda (k,v): json.loads(v.decode('utf-8')), self.entities.collect()) # decoding from binary to utf-8 json and parsing to dict
        specific_entity_ids    = map(lambda d: d["data"]["relationships"]["specific_entity"]["data"]["id"], specific_entitys_dict) # get ids of entities
        raw_unique_count       = len(set(specific_entity_ids)) # get the number of distinct ids (deduplication)
        specific_entitys_table = sqlContext.sql("select * from %s.%s where partition=%s" % (self.database, self.entity_name, self.partition))
        processed_unique_count = specific_entitys_table.count()
        self.assertEqual(raw_unique_count, processed_unique_count)
if __name__ == '__main__':
    loader = unittest.TestLoader()
    suite  = unittest.TestSuite()
    suite.addTests(loader.loadTestsFromTestCase(SpecificTestCase))
    result = unittest.TextTestRunner(verbosity=2).run(suite)
    sc.stop()

The main function creates a unittest testsuite, loads all methods to test (all functions with names beginning with “test”) and finally runs the tests. The result can then be stored, printed, emailed, etc.

In order to automate the sanity checks, we set up an Oozie scheduler running each day after the pre-processing workflow and logging any errors to a text file on HDFS. A decision action in Oozie checks if the text file exists and sends an email with a link to our Data Engineering team to inform us about any data quality issues of yesterday’s load.

5.) Conclusion

Simplicity is essential — not only according to Agile principles. Using standard Python test frameworks and basic assertions will let you get started quickly and gives you more time to work on the actual data quality issues.

Have you come up with similar solutions or something totally different? Please let us know in the comment section below!

Interested in more? Visit our Tech Blog for more interesting articles!

***

RATE THIS ARTICLE NOW

Runtastic Tech Team We are made up of all the tech departments at Runtastic like iOS, Android, Web, Infrastructure, DataEngineering, etc. We’re eager to tell you how we work and what we have learned along the way. View all posts by Runtastic Tech Team »

Leave a Reply