I want to write text files into HDFS. The path to which files has to be written to HDFS is dynamically generated. If a file path(including file name) is new, then the file should be created and text should be written to it. If the file path(including file) already exists, then the string must be appended to the existing file.
I used the following code. File creation is working fine. But cannot append text to existing files.
def writeJson(uri: String, Json: JValue, time: Time): Unit = {
val path = new Path(generateFilePath(Json, time))
val conf = new Configuration()
conf.set("fs.defaultFS", uri)
conf.set("dfs.replication", "1")
conf.set("dfs.support.append", "true")
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","false")
val Message = compact(render(Json))+"\n"
try{
val fileSystem = FileSystem.get(conf)
if(fileSystem.exists(path).equals(true)){
println("File exists.")
val outputStream = fileSystem.append(path)
val bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
bufferedWriter.write(Message.toString)
bufferedWriter.close()
println("Appended to file in path : " + path)
}
else {
println("File does not exist.")
val outputStream = fileSystem.create(path, true)
val bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
bufferedWriter.write(Message.toString)
bufferedWriter.close()
println("Created file in path : " + path)
}
}catch{
case e:Exception=>
e.printStackTrace()
}
}
Hadoop version : 2.7.0
Whenever append has to be done, the following error is generated:
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException)