Pyspark to remove null value in not null Column
Null Value Present in Not Null Column
There may be chances when the null values can be inserted into Not null column of a pyspark dataframe/RDD.
For instance, Consider we are creating an RDD by reading csv file, replace the empty values into None and converts into Dataframe. The dataframe which schema is defined as non nullable will cause an issue of null present in column when we try to operate the dataframe
rdd = spark.sparkContext.textFile(<<csv_location>>) # Reading a fil
fields = structField(<<name>>, <<data_type>>, <<nullable>>) # Applying N as Nullable
schema = StructType(fields) # Creating schema with Fields
df = spark.createDataFrame(rdd, schema) # Create a DF using RDD and schema
In the above scenario, After the dataframe is created, we may be experiencing with the below error whenever we try to perform any operation on the dataframe.
valueError: field <<field_name>> : This field is not nullable, but got None
Solutions
There are many solutions can be applied to remove null values in the nullable column of dataframe however the generic solutions may not work for the not nullable columns
Recommended by LinkedIn
df = df.na.drop(df.na.drop(subset=[“<<column_name>>”])
df = df.filter(df[<<column_name>>].isNull()).show()
df = df.where(df.<<column_name>>.isNotNull())
df.createOrReplaceTempView(“temp_table”)
df = spark.sql(“””select * from temp_table where <<column_name>> is not null”””)
df.createOrReplaceTempView(“temp_table”) df = spark.sql(“””select * from temp_table where nvl(cast(<<column_name>> as string),’’) <> ‘’”””)
df = df.where(“nvl(rtrim(ltrim(cast(<<column_name>> as string))),’’) <> ‘’”))
The above solutions work perfectly to remove null values when the column is nullable. When the dataframe is not filtering out the Null/None values, we can examine some scenarios to filter them out.
Solution Scenario 1
If you are sure all the columns are not nullable, then we can remove the null entries(Null, None, ‘’, “”) when we read them as RDD
rdd = spark.sparkContext.textFile(<<csv_location>>)
.filter(lambda line: “Null”” not in line)\
Solution Scenario 2
If there is a chance that there are columns which are allowed to be null or empty, Then the solution can be applied after creating database schemas and before converting it dataframe using non nullable schemas.
# Prepare datatypes and nullable values of columns in order
rdd = spark.sparkContext.textFile(<<csv_location>>) \
.map(lambda line: filter_null(line, nullable_values)
def filter_null(line, nullable_values):
line = [] for idx, field in enumerate(row):
if not field:
if ‘Y’ in nullable_filter[idx]:
line.append(None)
else:
return []
return line.