Apache Spark UDFs with binary type encoding a Java Class

Photo by Alexander Sinn on Unsplash

I am currently working on a project wherein a Spark Dataframe has a column of type binary that contains an encoded Java class and I need to make transformations to create new columns from it.

This article is about how to create a UDF that accepts and parses a binary type that encodes a Java Class.

Apache Spark

If you are on this very specific article I imagine you know what Apache Spark is, so no need for me to explain it:

Apache Spark is a unified analytics engine for large-scale data processing. … It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing — spark.apache.org

BinaryType

What is a binary type in Apache Spark? From the documentation:

Binary is the data type representing Array[Byte] values.

In my project, the column contains a Java class encoded as a byte array.

When you print it, it looks like this:

How the DataSet looks like with a binary column

And the schema:

The schema of the DataSet with column “value” of type binary

Transformations with BinaryType

Now that we understood that the binary column in my case contains encoded Java classes I can better explain what kind of transformations I need to do.

First of all, I need to extract some attributes and create new columns from them; secondly, I need to perform some operations between attributes and create again new columns.

To do so I decided to use a UDF (User Defined Function). What I didn’t know was the exact syntax to use to achieve this. That’s why I decided to write this article.

Java UDF that accepts a binary

Let’s jump to the code. I want to create a UDF function that can be called in a .withColumn() function in Apache Spark in Java. The function is implemented as follows:

Implementation of the UDF

That simple. I am basically implementing a UDF1 function that accepts one parameter (that’s why is UDF1, otherwise I would have used UDF2–22) of type byte[]. This byte array contains the encoded content of my class. Now I need to decode it to the class, using a ByteArrayInputStream and ObjectInputStream as shown above. Finally, cast it to MyWonderfulObject and read its parameter that I need to return, in this case, a Long.

To make this work I need to do two other operations: register the UDF and use it. For the former I can do something like this:

Registering the UDF created above

From the SqlContext I call the .udf() method that returns a UDFRegistration class. On it, I call the .register() method which requires three parameters:

  1. The name of the UDF that I will use later when calling it
  2. The function that I just created above
  3. The type that the function returns, in this case, aLong

Finally, I need to use the UDF:

Using the registered UDF

Calling it is even simpler, in this case, I used the .withColumn() method on the input dataset data, saying I want to create a new column “ID” by applying the UDF extractIdUDF on column “column_name” which is the one of type binary.

Conclusions

Using a UDF in Java on a column of type binary is very similar to any other type, you only have to pay attention to how to represent such binary type in Java (with a byte[]) and on how to cast it to your Class, with ByteArrayInputStream and ObjectInputStream.

Hope this helped, you can find a working example here on GitHub.

Senior Data Engineer/Scientist at TomTom. MSc in Telecommunications Engineer, Ph.D. in Computer Science.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store