How to execute python script in NiFi
Day 3:
Today we are going to see how to execute a python script in NiFi. To execute the python script I am going to use an ExecuteScript processor.
Here we are going to parse JSON flow file in the ExecuteScript processor.
Sample Python code for JSON parsing. Please refer JSON parsing post here(https://www.garudax.id/pulse/json-handling-python-rajadurai-ramasamy).
Import json
with open('input.json','r') as inputjsonfile:
inputjson = json.load(inputjsonfile)
Outputjson = inputjson
In the highlighted area, we have selected python as the Script Engine, then we have to place python script in the Script Body.
Before starting these, we have to convert python script to Jython script because the ExecuteScript processor does not support all native python libraries.
To read flow files and perform JSON parsing, we have to convert our python code to Jython below.
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputjson = json.loads(input_text)
Outputjson = inputjson
text = IOUtils.toString(Outputjson )
outputStream.write(text)
flowFile = session.get()
if (flowFile != None):
try:
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", '_jsonname.json')
session.transfer(flowFile, REL_SUCCESS)
except:
session.transfer(flowFile, REL_FAILURE)
session.commit()
To read and write into folw files:
flowFile = session.get()
if (flowFile != None):
try:
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", '_jsonname.json')
session.transfer(flowFile, REL_SUCCESS)
except:
session.transfer(flowFile, REL_FAILURE)
session.commit()
To read file into JSON:
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputjson = json.loads(input_text)
Final output will be released in either a success or failure relationship.
if I have developed a python script like below: insert_table.py def insert_record (id): insert into table (columnA) value (id) return select max(columnA) from table Can I use Nifi to execute above intert_table.py file and pass the variable and get the return?
Loved NiFi