3

I am working on a Nifi flow where I am getting a JSON document with multiple key-value pairs. I am using the ExecuteScript processor with python.

My goal here is to create various URLS bases on JSON keys. The keys are numerical and they look like this:

keys = [10200, 10201, 10202, ...]

The URLs I want are of 3 types and they should look like these:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

I am trying to loop through my keys[] and create 3 specific urls for each numerical keys that it contains. I have the following code where I am trying to:

read a numerical key from list --> create 3 URLs --> spit out a flow file.

...... and read the next numerical key in the list and keep looping.....

I have the following code but when I give it a JSON flowfile it does not do anything right now. Can someone please tell me what I am doing wrong?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)

1 Answer 1

6

Good question, this is a nuance of the Callback approach to the flow file API. You've created a subclass of StreamCallback but you haven't retrieved an input flow file or used it to overwrite the content via an instance of your class.

Try this after the definition of your ModJSON class:

originalFlowFile = session.get()
if(originalFlowFile != None):
    originalFlowFile = session.write(flowFile, ModJSON())
    session.remove(originalFlowFile)

This will get an input flow file (or wait for one to show up), then call your StreamCallback to overwrite the contents of your flow file. In my example you'd discard your input flow file, so if that's the right behavior for your use case, then instead you can just extend InputStreamCallback instead of StreamCallback and remove the outputStream.write(), if you are not using outputStream for anything. To do that, replace StreamCallback with InputStreamCallback and remove the "outputStream" parameter from the process() method.

In your example, once you add my snippet above, you are overwriting the input content with your json.dumps() command, as well as creating and transferring new files, all to the same relationship (success), so that could cause problems if they are not of the same format (which is why I added the session.remove()). If you need the original flow file to go out a different relationship than the rest, please consider InvokeScriptedProcessor rather than ExecuteScript. If you don't care about the input flow file after the processing (adding URL attributes is done), then follow my suggestion above. If they can all go out the same relationship (success), then replace my session.remove() with

session.transfer(originalFlowFile, REL_SUCCESS)

Check my ExecuteScript cookbook article (part 2 of 3) for more examples of these use cases in Jython (and other languages) :)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.