Data pipelines with Apache Crunch and Java 8

With Java 8 now in the mainstream, Scala and Clojure are no longer the only choices to develop readable, functional code for big data technology on the JVM. In this post we see how SoundCloud is leveraging Apache Crunch and the new Crunch Lambda module to do the high-volume data processing tasks which are essential at early stages in our batch data pipeline efficiently, robustly and simply in Java 8.

Hadoop MapReduce is not dead

It may not be cool or trendy any more, but Hadoop MapReduce (based on the Google MapReduce paper from 2004) still has an important role to play in today’s world of big data technology. Whilst Spark captures people’s imagination as a replacement for it, with its interactive response times and suitability for machine learning, it’s not a silver bullet to every batch processing requirement. It can be unstable at times, and uses far more cluster resources (especially memory) than its simpler predecessor. Spark is used in the application domain for products like search, recommendations and classification tasks, but for preparing our datasets for downstream usage, MapReduce still has a huge part to play. It still provides a robust and relatively reliable platform which is ideally suited to transforming, cleaning and partitioning our huge data volumes ready for use by the insights and product engineering teams.

Type safety is crucial

Working with data at this end of the pipeline has different developer requirements too. Whilst at the application end of the pipeline the workflows are often based on picking out individual fields from the data and aggregating them in various ways (such functionality is well provided on MapReduce by Hive and Scalding), at early stages of our pipelines we are working with complete objects with strict schemas to adhere to, so trying to work with a fields-based API would be very dangerous.

Event data at SoundCloud is stored in the structured binary Protobuf format, so being able to work directly with Java Protobuf objects rather than an API-specific view saves us lots of mapping code, and prevents errors if we were to get it wrong.

Crunch provides a useful abstraction for this kind of workload, because it works with type safety and structured records at its heart. It’s based on Google’s FlumeJava paper, and provides a general-purpose boilerplate-free developer-friendly typesafe Java API for developing data jobs to run on Hadoop MapReduce.

Crunch code is relatively simple to understand, as this classic “word count” example demonstrates:

Pipeline crunch = new MRPipeline(WordCountJob.class);
crunch.read(From.textFile("/path/on/hdfs"))
      .parallelDo(new DoFn<String, String>() {
          public void process(String s, Emitter<String> emitter) {
              for (String word: s.split(" ")) {
                  emitter.emit(word);
              }
          }
      }, strings())
      .count()
      .parallelDo(new MapFn<Pair<String, Long>, String>() {
          public String map(Pair<String, Long> wordCount) {
              return wordCount.first() + ":" + wordCount.second();
          }
      }, strings())
      .write(To.textFile("/path/to/output"));
crunch.done();
}

Reading from top to bottom, we see that we’re reading a collection of Strings from a text file, then applying a DoFn which tokenizes each line into words and outputs each word. We then use the built-in count() method to count occurrences, then we apply a MapFn operation which formats those pairs back into String lines and writes them to a text file. The execution is lazy, so crunch.done() tells Crunch to run everything.

Unfortunately this code snippet suffers in the same way that a lot of functional-style Java code has suffered with over the years. Those anonymous inner classes are ugly and add a huge amount of redundant information, making it difficult to read and irritating to write.

Java 8 to the rescue

Luckily, Java 8 gave us the gift of Lambda expressions, method references and Streams; all of which are intended specifically to make functional programming in Java more user friendly. With the rest of our code starting to look more functional and concise, the Crunch parts of the code started to look even more archaic then they did before, so I set about building a thin Java-8-friendly wrapper around the Crunch API to make the situation better. That wrapper became Crunch Lambda, which as of version 0.14.0 is now part of the main Crunch project (in the crunch-lambda submodule, artifact org.apache.crunch:crunch-lambda:0.14.0)

Mini side-note about Crunch versions: Some people are worried about using Crunch because it’s not version “1” yet, but Crunch version numbers are incredibly conservative. We and many other large organisations have been using Crunch in production for some time and we consider it very stable.

Crunch Lambda

Let’s revisit the above example with Crunch Lambda instead:

Pipeline crunch = new MRPipeline(WordCountJobLambda.class);
Lambda.wrap(
  crunch.read   (From.textFile("/path/on/hdfs")))
        .flatMap(line -> Arrays.stream(line.split(" ")), strings())
        .count  ()
        .map    (wc -> wc.first() + ":" + wc.second(), strings())
        .write  (To.textFile("/path/to/output"));
crunch.done();
}

See how the type information is only required once now for each transformation and there’s no distracting superfluous information.

What about the real world?

Let’s take a look at how we’re using Crunch at SoundCloud in one of our data jobs.

The EventStatsJob counts the number of events that have successfully made it through to the end of the core event pipeline and splits it out by application and event type. We use this data to compare against data collected in real-time as events are first collected to check for any anomalies in our computation.

Once it is collected the data, it emits it as data points to a Prometheus PushGateway so they can be graphed over time.

Pipeline crunch = new MRPipeline(EventStatsJob.class);

// Prometheus setup
PushGateway pushGateway = new PushGateway(PUSH_GATEWAY_ADDRESS);
Gauge gauge = Gauge.build()
    .name("event_count")
    .labelNames("event_type", "application")
    .create();

// Read events from Protobuf SequenceFile
LTable<Long, Event> events = 
  Lambda.wrap(
    crunch.read(
      From.sequenceFile(
        dataPath,
        Writables.longs(),
        PTypes.protos(Event.class, WritableTypeFamily.getInstance()))));

// Count events by their stats key and set Gauge values
events.values()
      .map(StatsKey::fromEvent, StatsKey.pType())
      .count()
      .materialize()
      .forEach(statsRecord ->
          gauge.labels(
                  statsRecord.first().eventType,
                  statsRecord.first().application)
               .set(statsRecord.second()));

// Send the gauge contents to the PushGateway
pushGateway.push(gauge, "event_stats_job");

Here we see how simply Crunch code can interoperate with normal Java code. It calls out to a nearby normal method to perform a distributed map operation (via the StatsKey::fromEvent method reference) and then brings the results back locally via the materialize() method to operate on as a normal Java Stream. Note that we didn’t even specify an output file, because they only output from this job is to feed metrics to Prometheus; Crunch handles all the temporary files and IO for you.

The StatsKey custom type is a simple immutable wrapper for the eventType and application (it has a few more fields in real life, but these are omitted here for brevity). We can easily work with this in Crunch by giving it a conversion mechanism in and out of a built-in Crunch type, as seen in the pType() method here:

public static class StatsKey {
  public final String eventType;
  public final String application;

  public StatsKey(String eventType, String application) {
      this.eventType = eventType;
      this.application = application;
  }

  public static PType<StatsKey> pType() {
      return derivedImmutable(
              StatsKey.class,
              mapFn(t -> new StatsKey(t.first(), t.second())),
              mapFn(k -> Pair.of(k.eventType, k.application)),
              pairs(strings(), strings()));
  }

  public static StatsKey fromEvent(Event event) {
      /* ... */
  }
}

This PType serialization/deserialization behaviour descriptor is provided built-in to Crunch for many serialization types, including Protobuf (as you can see with the PTypes.protos call above) and Avro.

A more complex example - Creating toplists

As part of the Soundcloud Stats product, we present toplists to sound creators in different dimensions - for example “my most-favourited tracks” or “top countries where my sound is played”. This is the main part of the code to compute these:

LTable<Bucket, Long> summedBuckets =
  events
    .flatMap(this::createFactsFromEvent,
             tableOf(proto(Bucket.class), longs()))
    .groupByKey()
    .reduceValues((a, b) -> a + b)
    .filterByValue(count -> count > 0L); // Remove negative counts

return summedBuckets
    .map(this::remapDimensionKeys,
         tableOf(proto(FactKey.class), proto(TopListPair.class)))
    .groupByKey()
    .mapValues(rowSet -> findTopK(k, rowSet),       
               collections(proto(TopListPair.class)));

Here you can see that we are doing some much more complex operations than in our previous example.

Firstly we transform each events into a set of “facts”. A “sound played” event, for example, might generate several facts. One based on which country it occured in, one based on which creator published the sound, and so on. The createFactsFromEvent method takes an Event as its parameter, and returns a Java Stream of facts corresponding to each of these so-called “dimensions”. We group by the key (eg. ‘myawesomesound’), dimension (eg. “country”) and its value (eg. “Sweden”) - together called a ‘bucket’ - and sum the Long values as a reduce operation. As some events (such as “sound un-favourited”) can cause a negative impact on the counts, we filter out any totals which are non-positive, as these tend to look strange to users.

Following this, the remapDimensionKeys operation effectively moves the dimension value (eg. “Sweden”) from the key part of the record to the value part, so we’re left with a FactKey(“myawesomesound”, “country”) and a TopListPair(“Sweden”, 581). We then groupByKey() with this new key to collect all the TopListPairs for each FactKey, then run them through the findTopK to obtain the K TopListPairs with the greatest count.

public Collection<TopListPair> findTopK(int k, Stream<TopListPair> input) {
   SortedSet<TopListPair> set = new TreeSet<>(
           Comparator.comparingLong(TopListPair::getCount)
                   .reversed()
                   .thenComparing(TopListPair::getDimensionValue));
   input.forEach(pair -> {
       set.add(pair);
       if (set.size() > k) {
           set.remove(set.last());
       }
   });
   return set;
}

As you can see, there’s absolutely nothing unusual about this part of the computation. We use a standard streaming top-k algorithm to create the result set for the key.

Testing

Finally, one of the greatest advantages of using Crunch is how it facilitates easy local testing of your pipeline logic. A combination of an in-memory implementation of the Pipeline object MemPipeline and the fact that the API encourages using normal Java methods to compose your pipes (for simple low-level unit testing) means that it’s really easy to have confidence in your pipeline before it even makes it on to the cluster.

Future

We’ve got our eyes firmly on the Apache Beam project as it progresses. Inspired by work from FlumeJava, Crunch, Google DataFlow, Spark and Flink (and also running on at least the latter 3) it aims to generalise the expression of data pipelines like this away from Hadoop altogether and break the cycle of having to rewrite everything to keep up with the latest technology. This sounds like a big step in the right direction, but we’re confident writing new Crunch code now isn’t a waste of time either, as it looks very likely so far that translating from Crunch to Beam would be a relatively simple operation when the time comes.

David is a Data Engineer at SoundCloud and a committer to the Apache Crunch project. You can learn more by reading the Crunch User Guide and the Javadocs for Crunch Lambda. If you’re interested in working on problems like these, check our open positions on the SoundCloud Jobs page