When you hear about big data you only hear about the hottest compute engines, the fastest algorithms and the sexiest visualisations. But almost nobody is talking about the plumbing that is needed to link everything together and move your data from one algorithm to the next. Strange because for me that’s as important as the algorithms you are using.
Due to this lack of information we had some setbacks and made some wrong choices in the past. We started out with LinkedIn’s Azkaban but never got that reliable enough to get it running in production. It was cumbersome to update workflows and extending it seems to be completely out of the question on the limit time we have. So we switched to the pragmatic approach and link everything together with batch and python.
But as time go’s on and more and more algorithms get implemented, it’s hard to keep up with writing script that are easy to maintain and still fault tolerant. Luckily I discovered Luigi, an open-source no-nonsense workflow manager created by Spotify. The beauty lies in it’s simplicity. Describing a Task in a workflow graph is done in python, giving you the benefit you can edit your workflow in your favorite IDE.
And important feature to look for in a workflow manager is extensibility. You will never find a product that will support all of the exotic components you use out of the box. As at Vente-Exclusive.com we’re a big consumer of Google Services (like BigQuery, Cloud Storage and GCS-Connector for Hadoop) they needed to be integrated in the Luigi workflows.
The task proved to be amazingly simple. Once you understand the internals it is a simple tasks to write a Luigi Task and Target for a service (example for Google BigQuery). It took a bit over a day to write and less then 200 lines of code. But once that’s done that, a node in your workflow is only a few lines of code. Look at the following task node:
class SEG_BQ_Day(gcloud.BqTableLoadTask): day = luigi.DateParameter(default=dateutils.yester_day()) def requires(self): return SEG_Export4BQ_Day(self.day) def source(self): return self.day.strftime('/datasets/output/bigquery/segment/%Y/%m/%d/part*') def table(self): return "XXXXXXXX:XXXXXXX.Segment" def output(self): return gcloud.BqQueryTarget( table="XXXXXXXX:XXXXXXX.Segment", query=self.day.strftime("SELECT 0 < count(bucket_date) " "FROM [XXXXXXX.Segment] " "WHERE bucket_date = '%Y-%m-%d 00:00:00 UTC'"), )
It describe everything you need to load some data in BigQuery:
requires tells Luigi what the dependencies are that produce the data we need to load, which in turn can have other dependencies.
Output tells Luigi what to do to check if the task already ran or what output it produces, here it returns on of our custom targets that queries BigQuery.
If we have the
required dependencies and
output tells that the data is not already loaded the task will load the
source into the
table. If we try to run the task again with the same parameters,
output will say the data is already loaded in out BigQuery table and will not run again. Small and simple, but a lot of small blocks can quickly build up to a big graph.
So, if you’re still searching for a way to plumb all your algorithms together, be it Hadoop MR, Spark, BigQuery, DataFlow, DIY stuff… be sure to check out Spotify’s Luigi.
Oh, and I got the approval to open source the code we’ve written for integrating with the Google Services so you can expect the code on github in the near future. I’ll keep you posted.