4

I need to create a DataFrame whose rows include around 30 members (int, double and string). What I did was to create one row of DataFrame and it works:

var res_df = sc.parallelize(Seq((
  results_combine(0),
  results_combine(1),
  results_combine(2),
  results_combine(3),
  results_combine(4),
  results_combine(5),
  results_combine(6),
  results_combine(7),
  results_combine(8),
  results_combine(9),
  results_combine(10)
))).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")

However, when I tried to add more elements to the tuple inside of the Seq, I got an error because of 22 element limit. How can I do this?

3 Answers 3

6

So here's an example using explicit Row and schema definition APIs.

The (mildy) annoying part is setting up the schema object. See StructField and StructType.

Hopefully this works under Scala 2.10.x!

scala> import org.apache.spark.sql.{DataFrame,Row}
import org.apache.spark.sql.{DataFrame, Row}

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val alphabet = ('a' to 'z').map( _ + "" ) // for column labels
alphabet: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)

scala> val row1 = Row( 1 to 26 : _* )
row1: org.apache.spark.sql.Row = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26]

scala> val row2 = Row( 26 to 1 by -1 : _* )
row2: org.apache.spark.sql.Row = [26,25,24,23,22,21,20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1]

scala> val schema = StructType( alphabet.map( label =>  StructField(label, IntegerType, false) ) )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false), StructField(d,IntegerType,false), StructField(e,IntegerType,false), StructField(f,IntegerType,false), StructField(g,IntegerType,false), StructField(h,IntegerType,false), StructField(i,IntegerType,false), StructField(j,IntegerType,false), StructField(k,IntegerType,false), StructField(l,IntegerType,false), StructField(m,IntegerType,false), StructField(n,IntegerType,false), StructField(o,IntegerType,false), StructField(p,IntegerType,false), StructField(q,IntegerType,false), StructField(r,IntegerType,false), StructField(s,IntegerType,false), StructField(t,IntegerType,false), StructField(u,IntegerType,false), StructField(v,IntegerTyp...

scala> val rdd = hiveContext.sparkContext.parallelize( Seq( row1, row2 ) )
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[5] at parallelize at <console>:23

scala> val df = hiveContext.createDataFrame( rdd, schema )
df: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int, d: int, e: int, f: int, g: int, h: int, i: int, j: int, k: int, l: int, m: int, n: int, o: int, p: int, q: int, r: int, s: int, t: int, u: int, v: int, w: int, x: int, y: int, z: int]

scala> df.show()
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|  g|  h|  i|  j|  k|  l|  m|  n|  o|  p|  q|  r|  s|  t|  u|  v|  w|  x|  y|  z|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26|
| 26| 25| 24| 23| 22| 21| 20| 19| 18| 17| 16| 15| 14| 13| 12| 11| 10|  9|  8|  7|  6|  5|  4|  3|  2|  1|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
Sign up to request clarification or add additional context in comments.

Comments

2

Here's a quick and dirty function that takes a Seq of tuples and builds your schema based on it. The idea is that you take your field names and zip them with your first row of data. The function uses the type of the data to build the correct StructField.

def toStructType(schema: Seq[(String,Any)]) : StructType = {
  StructType(schema.map(v => {
    StructField(v._1, v._2 match {
      case i: Int => IntegerType
      case l: Long => LongType
      case s: String => StringType
      case d: Double => DoubleType
      case f: Float => FloatType
      case x => StringType
    })
  })) 
}

var pseudoSchema = Seq[(String,Any)](("test", 123))

toStructType(pseudoSchema)
// res17: org.apache.spark.sql.types.StructType = StructType(StructField(test,IntegerType,true))

I'm probably missing some types, but you get the idea. The following gets you 26 columns, named a-z, expecting Ints

toStructType(('a' to 'z').map(_.toString).map((_,1)))

Comments

1

Probably the easiest way is just to use case classes to define the contents of your rows. Presuming a SparkContext sc and HIveContext hiveContext are already established, and omitting some ugly log messages...

scala> case class Alphabet (
     | a : Int = 1,
     | b : Int = 2,
     | c : Int = 3,
     | d : Int = 4,
     | e : Int = 5,
     | f : Int = 6,
     | g : Int = 7,
     | h : Int = 8,
     | i : Int = 9,
     | j : Int = 10,
     | k : Int = 11,
     | l : Int = 12,
     | m : Int = 13,
     | n : Int = 14,
     | o : Int = 15,
     | p : Int = 16,
     | q : Int = 17,
     | r : Int = 18,
     | s : Int = 19,
     | t : Int = 20,
     | u : Int = 21,
     | v : Int = 22,
     | w : Int = 23,
     | x : Int = 24,
     | y : Int = 25,
     | z : Int = 26
     | )
defined class Alphabet

scala> val rdd = sc.parallelize( Seq( new Alphabet() ) )
rdd: org.apache.spark.rdd.RDD[Alphabet] = ParallelCollectionRDD[1] at parallelize at <console>:16

scala> import hiveContext.implicits._
import hiveContext.implicits._

scala> val df = rdd.toDF()
df: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int, d: int, e: int, f: int, g: int, h: int, i: int, j: int, k: int, l: int, m: int, n: int, o: int, p: int, q: int, r: int, s: int, t: int, u: int, v: int, w: int, x: int, y: int, z: int]

scala> df.show()
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|  g|  h|  i|  j|  k|  l|  m|  n|  o|  p|  q|  r|  s|  t|  u|  v|  w|  x|  y|  z|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

An alternative approach would be to use Spark's explicit Row and schema definition APIs.

5 Comments

Thanks for the answer. But the problem with your approach is that defining a case class with moree than 22 fields is possible in Scala 2.11 but not in version 2.10 which is the version Apache Spark uses. Thus, in spark-shell trying to define that case class gives: "error: Implementation restriction: case classes cannot have more than 22 parameters."
Case class with more than 22 elements isn't supported in scala 2.10 and I believe the OP is using that version of scala
Spark libraries are available and supported in Scala 2.11 now, so it works for me... but I guess under 2.10 it wouldn't!
As a side question, can I change the Scala version that Spark uses?
I just define an SBT project using 2.11 libraries, and set up the context objects myself, to get 2.11 spark.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.