1. Overview

In this tutorial, we’ll learn how to convert an RDD to a DataFrame in Spark. We’ll look into the details by calling each method with different parameters.

Along the way, we’ll see some interesting examples that’ll help us understand concepts better.

2. RDD and DataFrame in Spark

RDD and DataFrame are two major APIs in Spark for holding and processing data. RDD provides us with low-level APIs for processing distributed data. On the other hand, DataFrame provides us with higher-level APIs that support SQL methods.

In our programs, we often require converting RDDs to DataFrames and vice versa. In this tutorial, we’re focusing on converting RDD to DataFrame. There are two methods for achieving this, and we’ll go through each one in detail.

Let’s first create our SparkContext and an RDD using a sequence of tuples:

val spark: SparkSession = SparkSession.builder().master("local").getOrCreate
val sc = spark.sparkContext
val rdd = sc.parallelize(
  Seq(
    ("John", "Manager", 38),
    ("Mary", "Director", 45),
    ("Sally", "Engineer", 30)
  )
)

We’ve just created a new RDD containing three rows. We’ll be using this in all our examples in the following sections.

3. Convert Using createDataFrame Method

The SparkSession object has a utility method for creating a DataFrame – createDataFrame. This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema.

Let’s convert the RDD we have without supplying a schema:

val dfWitDefaultSchema = spark.createDataFrame(rdd)

Now, let’s inspect the schema of our newly created DataFrame:

dfWitDefaultSchema.printSchema()
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: integer (nullable = false)

We can observe the column names are following a default sequence of names based on a default template. The default schema is created based on type inference, which need not be correct all the time.

For better type safety and control, it’s always advisable to create a DataFrame using a predefined schema object. The overloaded method createDataFrame takes schema as a second parameter, but it now accepts only RDDs of type Row. Therefore, we’ll convert our initial RDD to an RDD of type Row:

val rowRDD:RDD[Row] = rdd.map(t => Row(t._1, t._2, t._3))

Next, we need is a schema object, so let’s create one:

val schema = new StructType()
  .add(StructField("Name", StringType, false))
  .add(StructField("Job", StringType, true))
  .add(StructField("Age", IntegerType, true))

Let’s call the method once again, now with an additional schema parameter:

val dfWithSchema:DataFrame = spark.createDataFrame(rowRDD, schema)

We’ll print the schema information once again:

dfWithSchema.printSchema()
|-- Name: string (nullable = false)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = true)

We can observe that the columns are appropriately named and the data types are defined correctly.

4. Conversion Using toDF() Implicit Method

Another popular method of converting RDD to DataFrame is by using the .toDF() implicit method. Before we start, we must import the implicits from SparkSession:

import spark.implicits._

We’re now ready to convert our RDD. However, this method works only for selected types of RDDs – Int, Long, String, or any sub-classes of scala.Product. We have an RDD created using a sequence of Tuples. Let’s convert that using our imported implicit method:

val dfUsingToDFMethod = rdd.toDF("Name", "Job", "Age")

Let’s inspect the schema of our new DataFrame:

dfUsingToDFMethod.printSchema()
|-- Name: string (nullable = true)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = false)

5. Conclusion

In this tutorial, we’ve learned the different ways to convert an RDD to a DataFrame. We learned the details of each method and the types of parameters that each one takes. We can use the method createDataFrame for converting RDDs of type Row. In other cases, we can use the toDF() implicit method.

As always, the complete code used in this article is available over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.