8

I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.

I tried to do this:

public class SimpleApp 
{   
    public static void main(String[] args) throws Exception 
    {       
        String master = "local[2]";
        String csvInput = "/home/userName/Downloads/countrylist.csv";
        String csvOutput = "/home/userName/Downloads/countrylist";

        JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));

        JavaRDD<String> csvData = sc.textFile(csvInput, 1);
        JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });

        words.saveAsTextFile(csvOutput);
    }
}

This should split the lines and return an ArrayList. But I am not sure about this. I get this error:

SimpleApp.java:[43,58] wrong number of type arguments; required 2
9
  • 1
    You need to add the input type argument too: new Function <String, List<String>>() Commented Nov 8, 2014 at 14:11
  • 1
    BTW, you most probably want to use flatMap and FlatMapFunction (cs.berkeley.edu/~pwendell/strataconf/api/core/spark/api/java/…). Commented Nov 8, 2014 at 14:12
  • btw2, you should learn some Scala. Makes using Spark so much easier: csvData.map(line => line.split("\\s*,\\s*")) Commented Nov 8, 2014 at 14:16
  • thank you all! @maasg: I do this for an university project, where i have to use java. Commented Nov 8, 2014 at 14:18
  • 1
    @satish Unfortunately I am not so good at explaining things, probably you can find good tutorials by searching through the internet. Basically from a list<a> with a function a->list<a> map generates a list<list<a>>, while flatMap flattens the inner lists and it becomes just a list<a>. Commented Dec 14, 2015 at 10:26

3 Answers 3

9

So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:

JavaRDD<String> words = rdd.flatMap(
  new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
      return Arrays.asList(s.split("\\s*,\\s*"));
    }});
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks, but its my fault! I want to create an RDD of Lists of words. I fixed it now with @Gábor Bakos comment. Also i changd JavaRDD<String> to JavaRDD<List<String>>
Oh rad. In that case yah just do
how can we return an rdd of vector?
@Holden could you please advice what is wrong here in reduce function stackoverflow.com/questions/63843599/…
2

This is the sample of code from https://opencredo.com/data-analytics-using-cassandra-and-spark/ tutorial in Java.

Scala code :

  /* 1*/    val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/    val now = new Date();
/* 3*/    sc.cassandraTable("cc", "cc_transactions")
/* 4*/      .select("customerid", "amount", "card", "status", "id")
/* 5*/      .where("id < minTimeuuid(?)", now)
/* 6*/      .filter(includedStatuses contains _.getString("status"))
/* 7*/      .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/      .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/      .reduceByKey(_ + _)
/*10*/      .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/      .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))

Java code :

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
        JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
                .select("customerid", "amount", "card", "status", "id")
                .where("id < minTimeuuid(?)", date)
                .filter( row -> row.getString("status").equals("COMPLETED") )
                .keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
                .mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
                .reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
                .flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
                        List<Balance> list = new ArrayList<Balance>();
                        list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
                        return list.iterator();
                    }
                }).cache();

Where ProjectPropertie.context is SparkContext Here is how you can get SparkContext (only one context per JVM you should use):

   SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
                            .set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
                            .set("spark.cassandra.connection.port", "9042")
                            .set("spark.cassandra.auth.username", "cassandra")
                            .set("spark.cassandra.auth.password", "cassandra");
   SparkContext context = new SparkContext(conf);

For datasource I'm using Cassandra, where 172.17.0.2 is docker container where my Cassandra node is running and 127.0.0.1 is the host (in this case is local)

1 Comment

could you please advice what is wrong here in reduce function stackoverflow.com/questions/63843599/…
0

This is what you should do...

        //======Using flatMap(RDD of words)==============
       JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<String> counts = csvData.flatMap(new FlatMapFunction<String, String>() {
          //line 43
              @Override
              public Iterable<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            });     

    //======Using map(RDD of Lists of words)==============
        JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
        JavaRDD<List<String>> counts = csvData.map(new Function <String, List<String>>() { //line 43
              @Override
              public List<String> call(String s) {
                return Arrays.asList(s.split("\\s*,\\s*"));
              }
            }); 

    //=====================================

    counts.saveAsTextFile(GlobalConstants.STR_OUTPUT_FILE_PATH);

1 Comment

could you please advice what is wrong here in reduce function stackoverflow.com/questions/63843599/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.