2

Currently, my nifi process is as follows:

ListS3 -> RouteOnAttribute -> RouteOnAttribute -> FetchS3Object -> ConvertRecord -> PutDatabaseRecord

I list CSV files in the s3 bucket and then route on two different attribute processors to select the files I want to put in a Postgres database. I then fetch the CSV files from s3. Then, I use a ConvertRecord processor to convert my CSV files into lines of json objects like the following:

{"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}
{"ideez": "2", "name": "Jane Smith", "age": "25", "city": "Los Angeles"}
... enter image description here
The reason I want to do this is because there are many CSV files with a similar titles (that I route to use the correct ones) and have mostly similar columns. But there are a number of files that have different column names. Changes day to day. So inserting them as a json object into a jsonb column in Postgres would work best for me.

Here is how my JsonRecordSetWriter is setup as well as my PutDatabaseRecord:

enter image description here enter image description here

Here are some of the errors I'm receiving:

  • failed to put records to database for flowfile[filename=this/that/then/this/filename] routing to failure:java.sql.SQLDataexception: none of the fields in the record map to the columns defined by the public.test_table table NORMALIZED COLUMNS: ID,JSONCONTENT
  • putdatabaserecord[id=uuid] record does not have a value for required column ‘json_content’

If I try and encapsulate each of the lines with "json_content":

{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{"json_content": {"ideez": "2", "name": "Jane Smith", "age": "25", "city": "Los Angeles"}},
...

I get the following error:

“ERROR: invalid input syntax for type json Detail: Token “MapRecord” is invalid. Where: JSONdata, line 1: MapRecord… unnamed portal parameter $1=‘…’

Has anyone had any luck doing a similar process? Perhaps, there is a better process I should be pursuing. Thanks.

1 Answer 1

3

I think you're really close! But the issue is that your content:

{"json_content": {"ideez": "1", "name": "John Doe", "age": "30", "city": "New York"}},
{"json_content": {"ideez": "2", "name": "Jane Smith", "age": "25", "city": "Los Angeles"}}

has the json content as an actual json object, which NiFi interprets as a Record. Hence the error about MapRecord. You'll need to "stringify" this field. I think there are some solutions that I can look into, in order to make stringifying the JSON more convenient. But for now, you could do so using raw string manipulation. I'd use a ReplaceText processor just before your PutDatabaseRecord, and configure it this way:

Replacement Strategy = Regex Replace
Search Value = (^.*$)
Replacement Value = {"json_content": "${'$1':escapeJson():replace('\\', '\\\\')}"}
Character Set = UTF-8
Maximum Buffer Size = 1 MB
Evaluation Mode = Line-by-Line
Line-by-Line Evaluation Mode = All

That should handle converting it into a string and properly escaping it.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.