Staying Sane While Working With Avro In Scalding

Recently we’ve been writing a lot of our Analytics Jobs in scalding. Scalding is a “a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of Cascading, a Java library that abstracts away low-level Hadoop details.” While writing jobs in scalding is much much cleaner than the direct hadoop code we were writing before (thank you scala for being awesome), we had to put in some work to fully unlock Scalding with Avro data as our input. I’m going to talk about how we solved some of the problems we had with getting scalding to run on avro files.

I’m going to detail some of the headaches we had getting Scalding to play nice with Avro input. This post assumes a basic understanding of Scalding. If you want to learn more about Scalding, Twitter has a good list of resources for getting started on the Scalding Wiki.

Using TypedPipes for Avro data

Using TypedPipes with Avro lets us have greater confidence that the code we write won’t throw a runtime exception due to type mismatches as the compiler will be able to catch many of those errors (check http://watchchrislearn.com/blog/2014/05/24/typed-scalding-pipes/ for a good intro into the Typed api and its advantages).
Scalding provides the PackedAvroSource source for creating a TypedPipe of a specific Avro record type. However, to use it, you have to define a type for the TypedPipe. It’s not immediately obvious how to do this with Avro.

How We Attacked It

We ended up using a code generation tool to compile our Avro schemas into SpecificRecord classes that we can use as an PackedAvroSource type (big shout out to this blog post for the tip). We ended up using the sbt-avro scalding build tool plugin to compile avro schemas during the sbt build process.
To wire up sbt-avro, I had to add the following to our plugins.sbt file:

resolvers ++= Seq(
  "cascading-repo" at "http://conjars.org/repo/"
addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2")

And in build.sbt file, this:

// Settings for avro codegen
seq( sbtavro.SbtAvro.avroSettings : _*)

(stringType in avroConfig) := "String"

(version in avroConfig) := "1.7.3"

Also, I have this snippet in our build.sbt file:

// Uncomment if you want to generate output code for use in an IDE environment
// Run codegen as ./sbt avro:generate
// Sometimes I have ran into some issues with it not generating because
//  of caching, if this is the case, try deleting target
// (javaSource in avroConfig) := new java.io.File("src/main/scala/")

When developing locally in my IDE I run ./sbt avro:generate so my IDE can find the compiled files. I’m sure there is a better way to do this (when doing a normal build, I then have to remove the manually generated files before building), but it allows me to leverage the features my IDE provides.

Getting Hadoop to Play Nicely with Avro in HDFS Mode

After you’ve written your job with your typed pipe, you may be surprised to find that if you run it using hdfs mode, it will fail with an error similar to this:

Caused by: cascading.pipe.OperatorException: [com.twitter.scalding.a...][write() @ com.jana.emr.datausage.DailyDataUsage.<init>(DailyDataUsage.scala:59)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
... 4 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.jana.avro.kraken.DataUsage
at com.jana.emr.datausage.DailyDataUsage$$anonfun$1.apply(DailyDataUsage.scala:42)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:42)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

This baffled me for a long time. I couldn’t figure out why it was not able to cast the record to the specific record. Also, the job executes successfully using local mode. Why would that be? Because of the nature of the error, it seems like it could be a classpath issue, but I had little success getting to the true root cause.

How We Attacked It

After poking around for a while, I stumbled on this FAQ hint: https://github.com/twitter/scalding/wiki/Frequently-asked-questions#q-my-hadoop-job-is-erroring-out-with-abstractmethoderror-or-incompatibleclasschangeerror
specifically this note:
“If your job has dependencies that clash with Hadoop’s, Hadoop can replace your version of a library (like log4j or ASM) with its own native version.”.

By following their suggestion, this did the trick:

export HADOOP_CLASSPATH=<your built scalding jar>

Avro Deserialization Woes With Arrays and Nested Records

As we started writing more jobs, we found that Scalding was not able to handle some Avro objects. For instance, it did a bad job of deserializing an array in a record. We saw errors similar to these:

java.lang.Exception: cascading.CascadingException: unable to compare stream elements in position: 0
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: cascading.CascadingException: unable to compare stream elements in position: 0
at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:164)
at cascading.tuple.hadoop.util.GroupingSortingComparator.compare(GroupingSortingComparator.java:59)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
phone (com.jana.avro.mcent.Member)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:629)
at com.twitter.chill.SerDeState.readObject(SerDeState.java:58)
at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:160)
... 22 more
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:177)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 33 more

How We Attacked It

This time, the scalding-avro package maintained by twitter came to the rescue (https://github.com/twitter/scalding/tree/develop/scalding-avro):

In some case Kryo (the default serializer used by Scalding) doesn’t work well with Avro objects. If you run in to serialization errors, or if you want to preempt and trouble, you should add the following to your Job class:

override def ioSerializations =
super.ioSerializations :+ classOf[cascading.avro.serialization.AvroSpecificRecordSerialization[_]]

After adding this to jobs, it was able to properly decode all Avro records successfully!

To make our lives a little easier, we created a class to properly handle the Avro serialization:

import com.twitter.scalding.Job
import com.twitter.scalding.Args

class AvroScaldingJob(args: Args) extends Job(args) {

  override def ioSerializations =
    super.ioSerializations :+ classOf[cascading.avro.serialization.AvroSpecificRecordSerialization[_]]

Running jobs on EMR using Scala 2.11 Failed Hard

By this point, we were able to run our scalding jobs locally; next step: firing them up on EMR. You would think it would go smoothly, given that we were able to successfully run them locally.
Some EMR AMI versions have scala 2.10 on the classpath (we had this issue with AWS AMI v3.9.0), causing issues when running code using scala 2.11. Our jobs were failing with this error:

Error: cascading.tuple.TupleException: unable to read from input identifier: s3://jana-emr-data-dev-stage/jameson/counter-load/1441900801/counter/hourly/2015/09/10/09/counter.10017.0.1441900800000.34952639.3327199169246.avro
at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:149)
at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
at com.jana.mcent.Counter.put(Counter.java:71)
at org.apache.avro.generic.GenericData.setField(GenericData.java:530)
at org.apache.avro.generic.GenericData.setField(GenericData.java:547)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at cascading.avro.AvroScheme.source(AvroScheme.java:247)
at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:163)
at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:136)

How We Attacked It

Run a bootstrap script which removes scala 2.10 on the classpath.
Apparently some EMR AMIs have some versions of scala 2.10 laying around. If you are using 2.11 that can cause problems. This stack overflow post was a lifesaver: http://stackoverflow.com/questions/31103525/scalding-on-emr-hadoop-job-fails-with-nosuchmethoderror-scala-predef-arrowass
With EMR, you can run a bootstrap script on each node’s startup. By creating a script to remove the scala 2.10 versions, I was able to successfully run EMR jobs.

sudo find / -name "scala-library-2.10.*.jar" -exec rm -rf {} \;


Writing this post brought back more haunting memories than I remembered I had. Getting Scalding to handle Avro records properly was certainly much trickier than we expected. However, with these solutions, we now use Avro with Scalding seamlessly. If you have any other tips for working with Avro in Scalding, feel free to let us know!

Also, if you found this post helpful, we’d love to talk to you.  Head over here to find out more about our team.


2 responses to ‘Staying Sane While Working With Avro In Scalding

  1. I added
    override def ioSerializations =
    super.ioSerializations :+ classOf[cascading.avro.serialization.AvroSpecificRecordSerialization[_]]

    But sometimes it still failed due to the same issue

    • Sorry about the trouble (and the late reply)! Do you have any more details on when it works vs when it doesn’t? I do remember encountering some weirdness around that. One thing I would make sure of is that the classpaths are all in order.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s