I am currently working on a project wherein a Spark Dataframe has a column of type binary that contains an encoded Java class and I need to make transformations to create new columns from it.
This article is about how to create a UDF that accepts and parses a binary type that encodes a Java Class.
If you are on this very specific article I imagine you know what Apache Spark is, so no need for me to explain it:
Apache Spark is a unified analytics engine for large-scale data processing. … It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing — spark.apache.org
What is a binary type in Apache Spark? From the documentation:
Binary is the data type representing
In my project, the column contains a Java class encoded as a byte array.
When you print it, it looks like this:
And the schema:
Transformations with BinaryType
Now that we understood that the binary column in my case contains encoded Java classes I can better explain what kind of transformations I need to do.
First of all, I need to extract some attributes and create new columns from them; secondly, I need to perform some operations between attributes and create again new columns.
To do so I decided to use a UDF (User Defined Function). What I didn’t know was the exact syntax to use to achieve this. That’s why I decided to write this article.
Java UDF that accepts a binary
Let’s jump to the code. I want to create a UDF function that can be called in a .withColumn() function in Apache Spark in Java. The function is implemented as follows:
That simple. I am basically implementing a UDF1 function that accepts one parameter (that’s why is UDF1, otherwise I would have used UDF2–22) of type byte. This byte array contains the encoded content of my class. Now I need to decode it to the class, using a ByteArrayInputStream and ObjectInputStream as shown above. Finally, cast it to MyWonderfulObject and read its parameter that I need to return, in this case, a Long.
To make this work I need to do two other operations: register the UDF and use it. For the former I can do something like this:
From the SqlContext I call the .udf() method that returns a UDFRegistration class. On it, I call the .register() method which requires three parameters:
- The name of the UDF that I will use later when calling it
- The function that I just created above
- The type that the function returns, in this case, aLong
Finally, I need to use the UDF:
Calling it is even simpler, in this case, I used the .withColumn() method on the input dataset data, saying I want to create a new column “ID” by applying the UDF extractIdUDF on column “column_name” which is the one of type binary.
Using a UDF in Java on a column of type binary is very similar to any other type, you only have to pay attention to how to represent such binary type in Java (with a byte) and on how to cast it to your Class, with ByteArrayInputStream and ObjectInputStream.
Hope this helped, you can find a working example here on GitHub.