0

I wrote a Python/Jython script to run in NiFi's ExecuteScript processor to parse my invalid JSON document. I wrote the script below based off of the script in this question and Matt Burgess's fantastic cookbook, but it isn't returning multiple flow files. Instead, it returns the input flow file with the regex corrections applied, but only as just one file. What do I need to modify to return 1 flow file for each line in the loop?

script

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    # regex out invalid escapes -- WORKS
    text = re.sub(r"[(\\)]", "", text)
    # split out each line into a separate file -- DOES NOT WORK
    for t in text.splitlines():
        outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        log.error('Something went wrong', e)
        session.transfer(flowFile, REL_FAILURE)
# implicit return at the end

json -- goal is each line == one flow file

{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}

ETA Added session.create() and removed original flow file with session.remove(flowFile) per this but NiFi says flowFile is not defined?

# imports not changed 
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    text = re.sub(r"[(\\)]", "", text)
    flowfiles_list = []
    for t in text.splitlines():
        flowFile = session.create()
        if (flowFile != None):
            flowFile = session.write(flowfile, t)
            flowfiles_list.append(flowFile)
    for flow in flowfiles_list:
        session.transfer(flow, REL_SUCCESS)

originalFlowFile = session.get()
if(originalFlowFile != None):
    # NiFi says flowFile not defined
    originalFlowFile = session.write(flowFile, PyStreamCallback()) 
    session.remove(originalFlowFile)
7
  • from the same cookbook check session.create() examples community.cloudera.com/t5/Community-Articles/… Commented Feb 25, 2020 at 18:57
  • @daggett I attempted to incorporate session.create() (added to original post) but I got an error that the transfer relationship wasn't specified, though I thought I did that in session.transfer(new_flowFile, REL_SUCCESS). Have I just not put code in the right positions, or am I still missing the mark? Commented Feb 25, 2020 at 19:52
  • you have to do something with all flow files - transfer or remove. probably you forgot to remove the original flow file... Commented Feb 25, 2020 at 20:43
  • @daggett modified op but I added session.remove(flowFile) but it's saying it's already marked for transfer elsewhere but I don't see where, outside of the except block Commented Feb 25, 2020 at 20:50
  • 1
    originalFlowFile = session.write(flowFile, PyStreamCallback()) i believe it should be originalFlowFile = session.write(originalFlowFile, PyStreamCallback()) Commented Feb 26, 2020 at 16:50

1 Answer 1

0

Return a list of flowFiles.
you have a exemple here : https://community.cloudera.com/t5/Support-Questions/Split-one-Nifi-flow-file-into-Multiple-flow-file-based-on/m-p/203388/highlight/true#M165391

Sign up to request clarification or add additional context in comments.

2 Comments

that looks like what I need, but I don't know Groovy so I'm not sure how to implement something similar in Python. Additionally, my actual JSONs are very large files and when run through other processors/attempts have caused memory errors--will loading it into a list cause a memory error, or will this be negated by the fact that it's in a stream?
do your code in python. just note how to work with session and flow files in example provided.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.