Compare Millions of messages using Spark(PySpark)

Compare Millions of messages using Spark(PySpark)

Using the Power of Apache Spark(PySpark) to Verify Loads of Messages

Imagine a scenario where you have a system costing you about £30K to run and maintain coupled with the fact that it is a necessary component in your company’s audit trail hence it cannot deprecated just like that — as this will be an expensive business decision.

Overview of Legacy System

All events in the system emit messages for audit purposes and the messages are immediately available, for initially 7 days and then 30 days. The messages are archived once the 30 days period is achieved. Archive location of storage is simply AWS S3.

Stored messages can be retrieved from S3 as at when needed. 

Various teams in the organisation need access to these audit messages and the messages are all in JSON format with the content not sorted i.e:

Message 1:

{ “username”: “bonzo”, “location”: “London”, “subject”: “mathematics”, “activityTimestamp”: “2017–02–01:21:32:00Z”}

Message 2:

{ “username”: “iyabo”, , “subject”: “english literature”, “activityTimestamp”: “2017–05–09:06:03:09Z”, “location”: “Abeokuta”}

A project to replace this legacy system has been initiated and will use Apache Kafka as a platform to produce and consume these audit events. Kafka is an open-source platform and if you stick with the free version rather than subscribe to the services provided by Confluent , the parent company, then you have to play at a lot with this beauty called Kafka. If you want to know more about Kafka, simply visit (don’t click away now!)

The legacy system produces audit messages in JSON format hence the new system being will use the same message protocol. The real challenge is to ensure that the new system produces exactly the same message(s) generated by the legacy system so that when legacy system is switched-off, no audit message is missing. In short, like for like system.

The key question that must be answered is this:

“Do we have all messages?”

If yes, then let’s get some messages randomly from legacy systems and confirm our new system has the exact message.

To verify massive data (BigData), you use either Stir and Compare(Sampling) or Minus queries. In simple terms, Stir and Compare, takes a sample from source data and checks if that chunk of data is present in the target data. Simple? This strategy has limitations as we are talking about big data — it won’t fit in Excel spreadsheet (over 1 Million rows!).

As for Minus Query strategy, it’s simply using machines to do the job. Run a query on source data and target data, then subtract one result-set from the other, the outcome is the set of differences between the two data sources. Please note the queries may be in different query languages (SQL/HQL — data people you understand).

Back to the task.

You can go about setting up complex tools and running scripts and any other thing that makes you feel you’re using technology to solve the problem.

Hold on, there’s a simple solution: Apache Spark. You can checkout a quick overview and quick start on Spark here and the PySpark flavour can be found here.

Let’s assume you’re up and running with PySpark, then you need to load the data you need to compare. Let’s call the 2 sets of data, primary and secondary. Primary being the source data(legacy) and Secondary the new source of data.

In Spark, we use RDD(Resilient Distributed Datasets) to hold the data and in this case I will use rdd1 and rdd2.

Other Posts

Elasticsearch:  Bulk ingest data

Elasticsearch: Bulk ingest data

Often you think of a solution to a simple problem and once you come up with that solution you realise you need to apply this to a large dataset.   In this post, I will explain how I deployed a simple solution to a larger dataset while preparing the system for future growth.  Here’s the state of play before changes in my client’s eCommerce system:

Existing System:

  1. Login to aggregator’s portal to retrieve datafeed URI
  2. Login to customer admin interface to create or update merchant details
  3. Create cron job to pull data from partner URI after inital setup
  4. Cron job dumps data in MySQL
  5. Client shopping UI presents search field and filters to customers to search and use
  6. Search result is extremely slow (homepage: 2.03 s, search results page:35.73 s, product page:28.68 s ).  Notice the search results and product pages are completely unacceptable

Proposed System:

Phase 1:

  1. Follow steps 1-4 of existing system
  2. Export MySQL data as csv
  3. Create an instance of Elasticsearch with an index to store product data
  4.   Export MySQL data as CSV
  5. Create script that bulk insert the exported data into Elasticsearch
  6.   On command line, search Elasticsearch instance using various product attributes (product name, type, category, size etc.).  Check the time speed of search results.

Phase 2:

  1. Build a search interface that uses Elasticsearch
  2. Display search results with pagination
  3. Add filters to search results
  4. AB Test existing search interface and Elasticsearch based and compare conversion (actual sales)
  5. Switch on the best solution – Elasticsearch

A few libraries already exists that can solve some of these challenges e.g

  • Elasticsearch-CSV –
  • SearchKit –
  • PingDom –

In the next post, I will dive deep into how I used Elasticsearch-CSV to quickly ingest merchant data and the response I got

BigData at the Commandline

BigData at the Commandline

BigData and Agile seem not to be friendly in the past but that is no more the case.  One of the important points in processes data is data integrity.   Assuming you are pulling data from an API(Application Programming Interface) and performing some processing on the result before dumping as utf-8 gzipped csv files on  Amazon’s S3.  The task is to confirm that the files are properly encoded(UTF-8), each file has the appropriate headers, each row in each file do not have missing data and finally produce a report with filenames, column count, records count and encoding type.   There are many languages today and we can use any BUT speed is of great importance.  also, we want to have a Jenkins (Continuous Integration Server) job running.

I have decided to use Bash to perform these checks and will do it twice!  First, I will use basic Bash commands and then will use the csvkit (  The other tool in the mix is the AWS commandline tool(aws-cli)