Developing high performance drools service on big data using Apache Spark
In the world of big data analytics, a lot of decisions like offers, alerts or followups are planned based on certain criteria or rules. Although we are increasingly moving towards a world where machines will drive such decisions using ML and deep learning algorithms, but in a lot of realtime business centric use-case, it is not driven by any pattern or behaviour, but by human decisions and business aspirations. In all such use cases, the solutions are based on Rule Based systems like Drools.
Since Apache spark has been the leading data processing engine on Hadoop big data clusters, we will limit our discussion to use spark as our processing engine. Drools can be integrated with spark in various ways. The 2 most popular alternatives would be the following:
- Expose Drools Rules as micro service and call the Rule micro-services from spark application.
- Develop the Drools Rule-base embedded in the spark application
Here, in this article we are exploring the second approach. The advantage of using the second approach is that we can send the rule-base to the data instead of sending a huge amount of data across the network.Since the size of the rule base will always be much smaller when compared to the data, the second approach will be much faster when the data size is huge.
Setup
We need to add spark core and spark sql dependencies in our SBT file
resolvers ++= Seq(
"apache-snapshots" at "http://repository.apache.org/snapshots/"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
To add drools, we need to add drools repository and drools dependencies in the SBT file as follows:
resolvers += "JBoss public" at "http://repository.jboss.org/nexus/content/groups/public/"
libraryDependencies ++= {
"org.kie" % "kie-api" % droolsVersion ::
List("drools-compiler","drools-core", "drools-decisiontables", "knowledge-api")
.map("org.drools" % _ % droolsVersion)
}
Creating the Rule Base
We create a folder called ‘rules’ under resources as follows:
Under these rules folder, we can keep one or multiple rule (drl) files. We can keep on adding rules to these files or add new rules in altogether a new rule file and place it in the rules folder.
Then, we create another folder called ‘META-INF’ under the resources directory. Inside this folder we place ‘kmodule.xml’ which contains the info about the various knowledge base to be used by the rule engine. In our case, we configure our knowledge base (known as ‘kbase’) to be the rules package under resources directory. Our ‘kmodule.xml’ looks like the following:
<?xml version="1.0" encoding="utf-8" ?>
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule">
<kbase name="rules" packages="rules">
<ksession name="ksession-rules"/>
</kbase>
</kmodule>
Then we create a class called DroolsUtil. Here we define a method called loadRules which is responsible for loading the all the Kie Rules when the application starts.
def loadRules():KieBase ={
val kieServices = KieServices.Factory.get()
val kieContainer = kieServices.getKieClasspathContainer()
return kieContainer.getKieBase("rules")
}
Creating of Beans and applying rules on them
The Beans/POJO which needs to be passed to the drools rule engine, can be either written in JAVA as simple POJO or in SCALA, as a Case class. If they are written in Scala as a case class, annotation @BeanInfo needs to be added to them, for the Kie Rules engine to pick up those beans as a Rule Bean like the following:
@BeanInfo
case class LogStruct(log_date:Timestamp, log: String)
@BeanInfo
case class NewAction(var sms:String, var smsDetail:String)
@BeanInfo
case class CaseWrapper(mobileNo:String,mailId:String,logs:Seq[LogStruct] = Nil, @BeanProperty var action: NewAction)
@BeanInfo annotation is necessary as it helps to convert the SCALA case class into a JAVA bean in runtime and drools ( natively written in JAVA) needs to convert the class to a java bean to work properly. We can leave out the @BeanInfo annotation for the case classes which need to be imported in the 'drl' file. @BeanProperty is used for an object/field whose class should be imported in 'drl' file. As we can see above, classes CaseWarpper and NewAction are annotated with @BeanInfo and the field 'action' of Type NewAction is annotated with@BeanProperty since in the drl file we imported them and tried to use their fields directly
package rules
import com.khusin.RuleBeans.CaseWrapper
import com.khusin.RuleBeans.LogStruct
import com.khusin.RuleBeans.NewAction
import java.util.List
import function scala.collection.JavaConversions.seqAsJavaList
rule "rule complex"
dialect "mvel"
when
vCaseWrapper: CaseWrapper( mobileNo == "90876546", vlogs: logs != null, vJavaLogs: seqAsJavaList(logs))
List( size() > 0) from collect(LogStruct(log != null) from vJavaLogs)
then
NewAction vAction = new NewAction("Hello World","Details goes here")
vCaseWrapper.action = vAction
end
For each set of Beans on which, we want to apply the rules, we will be creating a method called applyRules. The return type of this method will depend on the object that the rule engine intends to return after applying the rules.
def applyRulesForBundle(base: KieBase,caseWrapper: CaseWrapper): CaseWrapper ={
val session = base.newStatelessKieSession();
session.execute(CommandFactory.newInsert(caseWrapper));
return caseWrapper;
}
Stitching the Rule Engine in Spark App
In the Driver code, we load the Rule base using DroolsUtil.loadRules()
val rules = DroolsUtil.loadRules()
Then, we broadcast the loaded rules to all spark executors like the following:
val broadCastRules = spark.sparkContext.broadcast(rules)
The above code ensures instead of sending the data to the rule engine, we are able to send the rule engine to the data. Thus, we ensure maximum data locality and high performance in a drools-rule based spark app on a huge volume of data.
Our Driver would look somewhat like the following:
object ScalaBeanRuleTestDriver {
def main(args: Array[String]): Unit = {
val rules = DroolsUtil.loadRules()
val spark = SparkSession.builder().appName("Spark-drools").getOrCreate()
import spark.implicits._
...
...
...
val caseWrap = spark.sql("select casedf.mobileNo,casedf.mailId,casedf.logs,struct(actionCoord.sms,actionCoord.smsDetail) action FROM casedf JOIN actionCoord ON casedf.mobileNo = actionCoord.mobileNo")
val caseDs = caseWrap.as[CaseWrapper]
caseDs.show()
val actionDS = caseDs.map(currWrapper => DroolsUtil.applyRulesForBundle(broadCastRules.value,currWrapper)).map(resultWrapper => resultWrapper.action)
actionDS.show()
}
}
Conclusion
In this article, we have discussed about Spark 2.x and Drools Integration for a huge data volume. Get the complete source code in the GitHub link below:
Hi Sinchan, Can you deploy on Cluster (Spark-Yarn)?. I try to deploy but it can to initiate Rule (Null pointer exception) although it can run on Intellij. Please help me!
hi sin chan, nice article, how does drools performance for larger rules sets. To be very specific custom generated code for rules vs drools performance. And currently does drools perform optimally for larger rules set like close to 10 00 000 +