1

I am new to Spark and It seems very confusing to me. I had gone through the spark documentation for Java API But couldn't figure out the way to solve my problem. I have to process a logfile in spark-Java and have very little time left for the same. Below is the log file that contains the device records(device id, decription, ip address, status) span over multiple lines. It also contains some other log information which I am not bothered about. How can I get the device information log from this huge log file. Any help is much appreciated.

Input Log Data :

!
!

!  
 device AGHK75  
  description "Optical Line Terminal"  
  ip address 1.11.111.12/10     
  status "FAILED"  
!  
 device AGHK78  
  description "Optical Line Terminal"  
  ip address 1.11.111.12/10     
  status "ACTIVE"  
!  

!  
context local  
!  
 no ip domain-lookup  
!  
 interface IPA1_A2P_1_OAM  
  description To_A2P_1_OAM  
  ip address 1.11.111.12/10     
  propagate qos from ip class-map ip-to-pd  
!  
 interface IPA1_OAM_loopback loopback  
  description SE1200_IPA-1_OAM_loopback  
  ip address 1.11.111.12/10     
   ip source-address telnet snmp ssh radius tacacs+ syslog dhcp-server tftp ftp    icmp-dest-unreachable icmp-time-exceed netop flow-ip 

What I have done so far is:
Java Code

JavaRDD<String> logData = sc.textFile("logFile").cache();
List<String> deviceRDD = logData.filter(new Function<String, Boolean>() {
Boolean check=false;
public Boolean call(String s) {         
    if(s.contains("device") ||(check == true && ( s.contains("description") || s.contains("ip address"))))
        check=true;
    else if(check==true && s.contains("status")){
        check=false;
        return true;
        }
    else
        check=false;
    return check; }
}).collect();

Current Output :

device AGHK75
description "Optical Line Terminal"   
ip address 1.11.111.12/10   
status "FAILED"   
device AGHK78   
description "Optical Line Terminal"   
ip address 1.11.111.12/10   
status "ACTIVE"  

Expected Output is:

AGHK75,"Optical Line Terminal",1.11.111.12/10,"FAILED"   
AGHK78,"Optical Line Terminal",1.11.111.12/10,"ACTIVE"

3 Answers 3

2

You can use sc.wholeTextFiles("logFile") for getting the data as key,value pair of where key will be the file name and value as data in it.

Then you can use some string operation for splitting of the data as per the start and end delimiter of single log data with "!" and do a filter first for checking if the first word is device and then do a flatMap on it which will make it as singleLog text RDD.

and then get the data from it using the map.

Please try it and let me know whether if this logic is working for you.

Added code in Spark Scala:

val ipData = sc.wholeTextFiles("abc.log")
val ipSingleLog = ipData.flatMap(x=>x._2.split("!")).filter(x=>x.trim.startsWith("device"))
val logData = ipSingleLog.map(x=>{
  val rowData = x.split("\n")
  var device = ""
  var description = ""
  var ipAddress = ""
  var status = ""
  for (data <- rowData){
    if(data.trim().startsWith("device")){
      device = data.split("device")(1)
    }else if(data.trim().startsWith("description")){
      description = data.split("description")(1)
    }else if(data.trim().startsWith("ip address")){
      ipAddress = data.split("ip address")(1)
    }else if(data.trim().startsWith("status")){
      status = data.split("status")(1)
    }
  }
  (device,description,ipAddress,status)
})
logData.foreach(println)
Sign up to request clarification or add additional context in comments.

4 Comments

@imagin, what if abc.log is couples of TB? The one shouldn't expect the file is going to be loaded into just one worker node.
@EvgeniiMorozov : This will work even if the file is huge. As if the huge data is stored into HDFS it will be stored as blocks of data. And on each block of data a worker node can work. Please let me know if this calrifies your doubt.
@imagin, the javadoc to the method wholeTextFiles says it reads file system a key value pairs, where value is a whole text file read as one String. It won't be split onto different workers, rather than different workers would receive different files. And even if that would works on HDFS because of blocks, there are other file systems that might not split files into blocks.
@AjayGupta I tried this approach and im stuck and ive posted the question here - stackoverflow.com/questions/41274040/…. Athough the format is slightly different, im unable to figure out whats wrong.
1

Spark will take each line as a separate item with sc.textFile. You can get it to split on a different char using sc.hadoopConfiguration().set("textinputformat.record.delimiter", "!").

@Test
public void test() throws ParseException, IOException {
    hadoop.write("/test.txt", "line 1\nline 2\n!\nline 3\nline 4");

    JavaSparkContext sc = spark.getContext();

    sc.hadoopConfiguration().set("textinputformat.record.delimiter", "!");
    System.out.println(sc.textFile(hadoop.getMfs().getUri() + "/test.txt").collect());

    assertThat(sc.textFile(hadoop.getMfs().getUri() + "/test.txt").count(), is(2L));
}

1 Comment

On a more complex application that reads few files with different delimiters, there might be an issue, because spark context shares hadoopConfiguration. Thus it's not possible to read other files with the same spark context, as this settings is going to be applied to all of them. Considering laziness of calculation one might never know where exactly this parameter is required.
0

I believe the only correct way that works everywhere is

Configuration hadoopConf = new Configuration();
hadoopConf.set("textinputformat.record.delimiter", "delimiter");
JavaPairRDD<LongWritable, Text> input = jsc.newAPIHadoopFile(path,
    TextInputFormat.class, LongWritable.class, Text.class, hadoopConf);

There are issues in hadoop related code. Depending on size of the input file it produces additional records: MAPREDUCE-6549,MAPREDUCE-5948. It certainly works starting with 2.7.2.

Even though as mlk suggests using spark context would perfectly work, it'll fail in case you try to read another file with different delimiter using the same spark context. By default the delimiter is new line symbol and it'll be changed as soon as this option is applied.

The reason is that spark context shares hadoopConfiguration object and it's hard to reason, where exactly this value is going to be needed. As a workaround the one might materialize RDD and cache it, but it's still might happen that the same RDD would be recomputed.

Given way would work everywhere, because every time it uses new Configuration.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.