1

I have "a.txt" which is in csv format and is separated by tabs:

16777216    16777471        -33.4940    143.2104
16777472    16778239    Fuzhou  26.0614 119.3061

Then I run:

sc.textFile("path/to/a.txt").map(line => line.split("\t")).toDF("startIP", "endIP", "City", "Longitude", "Latitude")

THen I got:

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. Old column names (1): value New column names (5): startIP, endIP, City, Longitude, Latitude at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:376) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:40) ... 47 elided

If I just run:

res.map(line => line.split("\t")).take(2)

I got:

rdd: Array[Array[String]] = Array(Array(16777216, 16777471, "", -33.4940, 143.2104), Array(16777472, 16778239, Fuzhou, 26.0614, 119.3061))

What is wrong here?

3 Answers 3

3

As @user7881163 notes, the error occurs because your split produces a single column whose value (hence the value name given by Spark) is the array of tokens produced by the split.

However, per comments from @zero323, just make sure you use the version of collect @user7881163 uses (the one that takes a partial function) if you are operating at scale because the other, far more commonly used collect will move all your data to the driver and overwhelm that machine. And if you aren't operating at scale, why use Spark at all?

This is a slightly different approach that also allows for missing city data:

sc.textFile("path/to/a.txt")
  .map(_.split("\t"))
  .map {
      case Array(startIP, endIP, city, longitude, latitude) => (startIP, endIP, Some(city), longitude, latitude)
      case Array(startIP, endIP, longitude, latitude) => (startIP, endIP, None, longitude, latitude)
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")
Sign up to request clarification or add additional context in comments.

5 Comments

collect transformation doesn't move any data to the driver. Also making pattern match exhaustive would be a good idea.
First, as noted in the documentation, collect is an action--not a transformation, which matters--that does this: "Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data." That would be bad at scale as noted.
Second, yes, pattern matches should be exhaustive. It didn't occur to me to veer outside the scope of the question to "Scala best practices"; maybe I should have. I only accounted for the data formats @derek indicated. So yeah, @derek, if you take this approach, make your pattern match exhaustive or manage exceptions properly with logging, Try, etc.
You're not talking about the same collect :) github.com/apache/spark/blob/… which is equivalent of Seq.collect in the Scala collection API.
Ahh OK. I have never seen that version of collect anywhere, but yes, that does keep things distributed. I've edited accordingly. Of course, this whole point isn't really germane to the question, and it would've been more aligned with Stack Overflow guidelines to simply edit the answer to make it better than engage in a long comment thread on a topic that isn't even essential to the question about the shape of the data being ingested.
1

Try:

sc
  .textFile("path/to/a.txt")
  .map(line => line.split("\t"))
  .collect { case Array(startIP, endIP, City, Longitude, Latitude) => 
    (startIP, endIP, City, Longitude, Latitude) 
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")

or just use csv source:

spark.read.option("delimiter", "\t").csv("path/to/a.txt")

Your current code creates a DataFrame with a single column of type array<string>. This is why it fails when you pass 5 names.

1 Comment

This should be {case Array(...) => ... } not {case Seq(...) => ... }
0

You can try this example:

dataDF = sc.textFile("filepath").map(x=>x.split('\t').toDF();

data = dataDF.selectExpr("_1 as startIP", "_2 as endIP", "_3 as City", "_4 as Longitude", "_5 as Latitude");

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.