Using Spark Accumulators
Accumulators are the variables which spark internally uses to aggregate certain type of information. During normal spark application processing life-cycle accumulators are used for identifying some pattern information like corrupted records getting processed on several task on some executor.
Lets try to understand some use case oriented example on using spark Accumulators.
Lets consider some sample data containing fields like policyID , statecode , county , eq_site_limit , hu_site_limit.
Sometimes the information obtained from source systems might be corrupted due some loss of the packet over the network , occurrence of blank lines without information or loss of some master information like policyID in above illustration.
Since spark accumulators have support for performing associative and commutative operations diagnosis data for several sources can be aggregated to record the number of corrupted lines or absence of lines with no master attributes.
However the same operation can be also executed with mutable spark variables , but it will fail to accumulate the counter on lines to analyze because when the spark shifts the code on the executors the variables becomes local to executors and hence are not updated on driver process of spark , declaring variables in spark accumulator will ensure they are transited and updated on every executor and looped back to spark driver.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object application {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("appsources")
val sparkContext = new SparkContext(sparkConf)
// using spark accumulator to record occurence of bad data
val bad_records = sparkContext.accumulator(0, "Bad_Data")
// reading a file with specified partitions
sparkContext.textFile("F:\\FL_insurance_sample.csv", 5).foreach(line =>
if (line.length() == 0) bad_records += 1)
print(bad_records.value)
}
}