Codementor Events

How to make joins in Spark Dataset API more type-safe.

Published Dec 10, 2022Last updated May 10, 2023
How to make joins in Spark Dataset API more type-safe.

I'm a senior freelance developer specializing in Scala and Apache Spark. I primarily work with companies in the fintech domain and help them tackle their big data challenges.

In this blog post, I want to address a common issue that we encounter when writing data transformations in Spark using the Dataset API. As developers, we often face a trade-off between optimizing for type-safety or performance. In my experience, this decision is particularly challenging in the fintech domain where the data can be complex and requires careful handling.

Over the years, I have found that prioritizing type-safety can make a significant difference in the reliability and accuracy of data transformations. In this article, I will share some tips and tricks that I have learned along the way to help you navigate this dilemma and make informed decisions when working with Spark's Dataset API.

Let's explore the limitations of Spark's Dataset API when it comes to type-safe "outer join" operations. As you may already know, the join operation is one of the most common operations in Spark. However, it's surprising that the standard Dataset API does not provide a type-safe variant for outer joins. Although we do have joinWith, it only works for the "inner join" variant. When we use the left.joinWith(right, ...) operation, the resulting dataset of pairs Dataset[(L,R)] can produce rows with null fields on the right side in case of a "left_outer" join. To handle these nulls in our Scala code, we must convert the R part to an Option[R].

Here's an example of how we can achieve this conversion:

leftDS
  .joinWith(rightDS, joinCondition, "left_outer")
  .as[L, Option[R]] // conversion happens here
  .map { case (left:L, right:Option[R]) => ... } // now we can safely map

Extending outer joins in Spark

Let's dive into how we can extend outer joins in Spark and simplify our code. After encountering the same pattern repeatedly, I decided to refactor it into a separate class. Using an implicit class, I created multiple useful methods that extend the joining functionality of a standard Dataset.

For those interested, here's a quick rundown of the tech stack I used: Spark 3.3.1, Scala 2.13 (since Spark does not yet officially support Scala 3 at the time of writing this post).

Now, let's talk about how we implement this solution. Implicit classes are a feature of Scala that allows us to extend the functionality of an existing API without accessing its source code. This feature can significantly improve the developer experience when used carefully.

In our case, we define a few extension methods for the Dataset type called left. These methods handle the conversion of the R type to Option[R] for the "left_outer" join variant and enable us to write type-safe and concise code.

implicit class DatasetTypesafeJoins[L <: Product : TypeTag](left: Dataset[L]) {
  
  type JoinResult = (L, Option[R]) // type alias for better traceability
  
  def joinLeftOuterWith[R <: Product : TypeTag]
        (right: Dataset[R], condition: Column): Dataset[JoinResult] = {
    left
      .joinWith(right, condition, "left_outer")
      .as(Encoders.product[JoinResult])
  }
  
  // ... more extension methods
}

Moreover, we need to import the TypeTag trait from scala.reflect.runtime.universe package required by the Encoders.product, which derives the encoder for our result type.

import scala.reflect.runtime.universe.TypeTag

Other useful extension methods that we can define are:

  • joinLeftOuterWith(right: Dataset[R], usingColumn: String)
  • joinLeftAntiWith(right: Dataset[R], condition: Column)`
  • joinLeftAntiWith(right: Dataset[R], usingColumn: String)`
  • crossJoinWith(right: Dataset[R])
  • joinLeftInnerWith(right: Dataset[R], usingColumn: String)

Challenges I faced

Unfortuatelly, the presented solution only works for Scala 2.13. This is due to the TypeTag not being available in Scala 3.

Key learnings

Refactoring our code using this new API simplified many of our existing transformations. We were also able to avoided some NullPointerExceptions from the past (and hopefully from the future as well).

Tips and advice

I wish such functionality would be available in standard Spark by default. Until then, you can at least use my solution. If you needed help with refactoring code or implementing similar functionality, just drop me an email or reach our on Codementor and schedule an online session.

Final thoughts and next steps

Next time, we can take a look at more extension methods for the Spark Dataset API and Kafka that I gathered from recent projects.

Discover and read more posts from Dr. Viliam Simko
get started
post commentsBe the first to share your opinion
Salamahin
2 months ago

I was trying to solve almost the same problem couple of years ago. Its not only about making it ‘typesafe’, ideally we can replace chained join placeholders like
(_._1._1._1._1.value) to something more nice

This ended with a small macro-base project https://github.com/Salamahin/joinwiz, which can not only bring you typesafety, but also speedup some of your tests with spark

Show more replies