I wrote a Python/Jython script to run in NiFi's ExecuteScript processor to parse my invalid JSON document. I wrote the script below based off of the script in this question and Matt Burgess's fantastic cookbook, but it isn't returning multiple flow files. Instead, it returns the input flow file with the regex corrections applied, but only as just one file. What do I need to modify to return 1 flow file for each line in the loop?
script
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# regex out invalid escapes -- WORKS
text = re.sub(r"[(\\)]", "", text)
# split out each line into a separate file -- DOES NOT WORK
for t in text.splitlines():
outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
except Exception as e:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
json -- goal is each line == one flow file
{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}
ETA Added session.create() and removed original flow file with session.remove(flowFile) per this but NiFi says flowFile is not defined?
# imports not changed
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
text = re.sub(r"[(\\)]", "", text)
flowfiles_list = []
for t in text.splitlines():
flowFile = session.create()
if (flowFile != None):
flowFile = session.write(flowfile, t)
flowfiles_list.append(flowFile)
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)
originalFlowFile = session.get()
if(originalFlowFile != None):
# NiFi says flowFile not defined
originalFlowFile = session.write(flowFile, PyStreamCallback())
session.remove(originalFlowFile)
session.create()(added to original post) but I got an error that the transfer relationship wasn't specified, though I thought I did that insession.transfer(new_flowFile, REL_SUCCESS). Have I just not put code in the right positions, or am I still missing the mark?transferorremove. probably you forgot to remove the original flow file...session.remove(flowFile)but it's saying it's already marked for transfer elsewhere but I don't see where, outside of the except blockoriginalFlowFile = session.write(flowFile, PyStreamCallback())i believe it should beoriginalFlowFile = session.write(originalFlowFile, PyStreamCallback())