How to execute python script in NiFi

Day 3:

Today we are going to see how to execute a python script in NiFi. To execute the python script I am going to use an ExecuteScript processor.

No alt text provided for this image

Here we are going to parse JSON flow file in the ExecuteScript processor.

Sample Python code for JSON parsing. Please refer JSON parsing post here(https://www.garudax.id/pulse/json-handling-python-rajadurai-ramasamy).

Import json

  with open('input.json','r') as inputjsonfile:
    inputjson = json.load(inputjsonfile)
    Outputjson = inputjson         
No alt text provided for this image

In the highlighted area, we have selected python as the Script Engine, then we have to place python script in the Script Body.

Before starting these, we have to convert python script to Jython script because the ExecuteScript processor does not support all native python libraries. 

To read flow files and perform JSON parsing, we have to convert our python code to Jython below.

import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os


class ModJSON(StreamCallback):
  def __init__(self):
    pass
  def process(self, inputStream, outputStream):
    input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    inputjson = json.loads(input_text)
    Outputjson = inputjson
    text = IOUtils.toString(Outputjson )
    outputStream.write(text)
flowFile = session.get()
if (flowFile != None):
    try:
        flowFile = session.write(flowFile, ModJSON())
        flowFile = session.putAttribute(flowFile, "filename", '_jsonname.json')
        session.transfer(flowFile, REL_SUCCESS)
     except:
         session.transfer(flowFile, REL_FAILURE)
session.commit()        

To read and write into folw files:


flowFile = session.get()
if (flowFile != None):
    try:
        flowFile = session.write(flowFile, ModJSON())
        flowFile = session.putAttribute(flowFile, "filename", '_jsonname.json')
        session.transfer(flowFile, REL_SUCCESS)
     except:
         session.transfer(flowFile, REL_FAILURE)
session.commit()        

To read file into JSON:


input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputjson = json.loads(input_text)        

Final output will be released in either a success or failure relationship.


if I have developed a python script like below: insert_table.py def insert_record (id): insert into table (columnA) value (id) return select max(columnA) from table Can I use Nifi to execute above intert_table.py file and pass the variable and get the return?

Like
Reply

To view or add a comment, sign in

More articles by Rajadurai Ramasamy

Explore content categories