big data, software engineering

Luigi and Google Cloud in production – retrospective

We’ve been running Luigi now in production for 3 weeks without any issues, so I thought it was time to share the code I wrote to link luigi with Google Cloud (see the previous article). I have to warn you though, it’s a first iteration and it’s far from perfect, still it will give you an idea of how to start linking the two product. I thought I would have more time to clean it up. Anyway, fetch it here: github/luigiext-gcloud

The tasks concentrate on using BigQuery because for storage, we can get by using the Google Cloud Storage connector for HDFS. But we’re planning to add some GCS tasks this sprint to speed up some of the processes.

Luigi_CPU

I also wanted to show you our CPU graph of our nightly tasks as shown in the cloud dashboard. Till now most of the Hadoop MR tasks are written in Pig and started sequentially in a batch process. You can imagine that this all but optimal. Writing disaster recovery in batch is almost impossible. You see it clearly in the left part of the graph: Only a small slice of about 45 minutes the cluster is being used optimal (by 1 big pig script), the rest of the time most of the CPU’s are idle because the tasks where not big enough.

But if you look at the right side of the graph, you see that cluster is being use more optimal, reaching almost 100% CPU usage of the 32 nodes. This was be reached by using Luigi and setting multiple workers. For our relative small cluster 6 seems like the sweet spot. Granted, you see almost no CPU usage in the beginning and that’s because the building of the complete dependency graph of all our new Luigi tasks is quite slow. But this is mainly because checking the output file is done by starting the hadoop fs command-line utility for each output. That will be solved once I’ve written the native support for Google Cloud Storage. The last dip are all the BigQuery import tasks that run at the end.

I’m a lot happier since we adapted Luigi for running our data pipeline (the new part anyway) and if you’re looking for a tool for managing all you hundreds of Hadoop jobs make sure to look at Luigi.

Advertisements
Standard
big data, software engineering

Using Luigi on the Google Cloud Platform

When you hear about big data you only hear about the hottest compute engines, the fastest algorithms and the sexiest visualisations. But almost nobody is talking about the plumbing that is needed to link everything together and move your data from one algorithm to the next. Strange because for me that’s as important as the algorithms you are using.

Due to this lack of information we had some setbacks and made some wrong choices in the past. We started out with LinkedIn’s Azkaban but never got that reliable enough to get it running in production. It was cumbersome to update workflows and extending it seems to be completely out of the question on the limit time we have. So we switched to the pragmatic approach and link everything together with batch and python.

But as time go’s on and more and more algorithms get implemented, it’s hard to keep up with writing script that are easy to maintain and still fault tolerant. Luckily I discovered Luigi, an open-source no-nonsense workflow manager created by Spotify. The beauty lies in it’s simplicity. Describing a Task in a workflow graph is done in python, giving you the benefit you can edit your workflow in your favorite IDE.

And important feature to look for in a workflow manager is extensibility. You will never find a product that will support all of the exotic components you use out of the box. As at Vente-Exclusive.com we’re a big consumer of Google Services (like BigQuery, Cloud Storage and GCS-Connector for Hadoop) they needed to be integrated in the Luigi workflows.

The task proved to be amazingly simple. Once you understand the internals it is a simple tasks to write a Luigi Task and Target for a service (example for Google BigQuery). It took a bit over a day to write and less then 200 lines of code. But once that’s done that, a node in your workflow is only a few lines of code. Look at the following task node:

class SEG_BQ_Day(gcloud.BqTableLoadTask):
    day = luigi.DateParameter(default=dateutils.yester_day())

    def requires(self):
        return SEG_Export4BQ_Day(self.day)

    def source(self):
        return self.day.strftime('/datasets/output/bigquery/segment/%Y/%m/%d/part*')

    def table(self):
        return "XXXXXXXX:XXXXXXX.Segment"

    def output(self):
        return gcloud.BqQueryTarget(
            table="XXXXXXXX:XXXXXXX.Segment",
            query=self.day.strftime("SELECT 0 < count(bucket_date) "
                                    "FROM [XXXXXXX.Segment] "
                                    "WHERE bucket_date = '%Y-%m-%d 00:00:00 UTC'"),
        )

It describe everything you need to load some data in BigQuery: requires tells Luigi what the dependencies are that produce the data we need to load, which in turn can have other dependencies. Output tells Luigi what to do to check if the task already ran or what output it produces, here it returns on of our custom targets that queries BigQuery.

If we have the required dependencies and output tells that the data is not already loaded the task will load the source into the table. If we try to run the task again with the same parameters, output will say the data is already loaded in out BigQuery table and will not run again. Small and simple, but a lot of small blocks can quickly build up to a big graph.

Luigi Task Visualiser

So, if you’re still searching for a way to plumb all your algorithms together, be it Hadoop MR, Spark, BigQuery, DataFlow, DIY stuff… be sure to check out Spotify’s Luigi.

Oh, and I got the approval to open source the code we’ve written for integrating with the Google Services so you can expect the code on github in the near future. I’ll keep you posted.

Standard