nlp@scale
Introduction:
Language is a useful communication mechanism that helps humans exchange thoughts.
To an extent, understanding a language means that you are able to decode and make “sense” of the message that is being conveyed, likewise you might be able to encode your own thought representations and transfer them back.
Natural Language Processing (NLP), is a machine learning technique that attempts to transform text representations into a signatures that can be understood and processed by a machine to solve tasks, such as: translation, summarization, generation, sense disambiguation, document similarities, question-answering, sentiment analysis, recommendations, goal-oriented dialogue, causal-inference, amongst others.
For this demo we have decided to show a few NLP methods, mainly in the realm of text similarity, because it is one of the most widely adopted techniques, and in many cases it provides a basis for more complex tasks.
This is a 2 part blog series, where we first show the algorithmic implementation and then serving ML models in production.
For a general purpose example, we will leverage a news article and essays dataset, found here: https://components.one/datasets/all-the-news-articles-dataset/.
Part 1 - Algorithmic Implementation
We will cover NLP model training and building “signature buckets” that will classify similar items. We then stream messages through the model and observe how the model classifies and recommends similar articles. We will show that the “MinHash” method is not as efficient for streaming data. We also demonstrate how to build a “SimHash” model that can perform better on streams.
It’s worth mentioning that there are some interesting probabilistic approaches to streaming, such as: Count-Min Sketch, HyperLogLog, BloomFilters, etc.
Locality sensitive hashing (LSH):
We use 2 LSH algorithms, commonly known as SimHash and MinHash, the power of an LSH implementation can be summarized into:
1. reducing the high dimensional features to smaller dimensions while preserving differentiability
2. classify similar objects into buckets with high probability
Some applications:
- Recommender systems
- NLP (fast retrieval, near duplicate detection)
- Hierarchical clustering
- Genome-wide association study
- Image similarity identification
- Audio similarity identification
- Digital video fingerprinting
- Routing
A) CountVector - MinHash - Jaccard Similarity
We implement standard NLP preprocessing, followed by a count vectorizer. We then transform our featurized data with minhash algorithm, this transform can be used for dimensionality reduction.
We then extend the idea to approximate nearest neighbor search, for similar item search, this idea is used for recommendation engines.
A third variant can be used to create an approximate similarity join with jaccard distance to find near duplicate document pairs.
Overview:
1.- preprocess and implement count vectorizer pipeline
2.- Load data
3.- Build minhash model
4.- Persist models and transformations
5.- Query similar items with approximate nearest neighbours
6.- Stream messages and show performance
the code for this implementation can be found in the last section of this artice
B) TFIDF - SimHash - CosineSimilarity
We implement standard NLP preprocessing, followed by a tfidf vectorizer. We then transform our featurized data with bucketed random projection algorithm (a.k.a simhash), this transform can be used for dimensionality reduction.
We then perform nearest neighbor search and approximate similarity join with euclidean distance.
Overview:
1.- preprocess and implement tfidf pipeline
2.- Load data
3.- Build bucketed random projection model
4.- Persist models and transformations
5.- Query similar items with approximate nearest neighbours
6.- Stream messages and show performance
Code Implementation:
For this implementation we use: scala, spark, mapr distributed db, mapr distributed fs
A) MinHash :
1.- preprocess pipeline
// PREPROCESSING
val tokenizer = new Tokenizer()
.setInputCol("content")
.setOutputCol("words")
// stopwords
val remover = new StopWordsRemover()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("filtered")
// hash trick
val hashingTF = new HashingTF()
.setInputCol(remover.getOutputCol)
.setOutputCol("rawFeatures")
.setNumFeatures(20)
// cv vectorizer
val cv = new CountVectorizer()
.setInputCol(remover.getOutputCol)
.setOutputCol("features")
.setVocabSize(100000)
.setMinDF(10)
// PIPELINES
val idfPipeline = new Pipeline()
.setStages(Array(tokenizer, remover, hashingTF, idf))
2.- Load data
// LOAD DATA
val corpusData = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("data/articles.csv").drop("id","title","publication","author","date","year","month","url").na.drop()
3.- Build minhash model
// MODELS FIT-TRANSFORM'S
// fit pipelines
val cvModel = cvPipeline.fit(corpusData)
// model transforms
val cvRescaledData = cvModel.transform(corpusData).filter(isNoneZeroVector(col("features"))).select(col("label"), col("features"))
// LSH
//minhash model
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashValues")
// LSH FIT-TRANSFORM'S
// fit
val mhModel = mh.fit(cvRescaledData)
// transform (can be used for dimensionality reduction compared with jaccard)
val mhRescaledData = mhModel.transform(cvRescaledData)
println("mhRescaledData transform:")
mhRescaledData.show(20)
4.- Persist model and tranforms to disk and distributed-DB
//SAVE
//save models and feature transformations to disk or db
/*
idfModel.write.overwrite().save("/user/mapr/models/idfModel")
cvModel.write.overwrite().save("/user/mapr/models/cvModel")
mhModel.write.overwrite().save("/user/mapr/models/mhModel")
shModel.write.overwrite().save("/user/mapr/models/shModel")
tfidfRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/tfidfRescaledData")
cvRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/cvRescaledData")
mhRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/mhRescaledData")
shRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/shRescaledData")
*/
5.- Query new messages with approximate nearest neighbor search
//CONFIG PARAMS
val n : Int = 25 //recommendatinos
val queryDataPath : String = "data/aricleQuery.csv" //stream data
//model paths
//transformed vectors
//LOAD
//load transforms from disk or db
//load models from disk or db
//load data, clean, drop nulls
val queryData = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load(queryDataPath).drop("id","title","publication","author","date","year","month","url").na.drop()
//PREP QUERY
// transform data to be queried
val rescaledQueryData = mhModel.transform(queryData).drop("label","content","words","rawFeatures")
.withColumnRenamed("features", "queryVector")
// get sparse vector representation of query data (this is only case for 1 message, feel free to iterate the DataFrame)
val sparse_key = rescaledQueryData.select(col("queryVector")).first.getAs[org.apache.spark.ml.linalg.SparseVector](0)
//QUERIES
//minhash
val minhashDF = mhModel.approxNearestNeighbors(cvRescaledData, sparse_key, n)
minhashDF.show()// k nearest neighbours of the sparse_key search.
6.- Stream messages and show performance
import Streams._
// test load Stream Class, pass in sparkSession as param
val streams = Streams(spark).testFunct("text to streams")
A very good stream implementation can be found here:
B) Bucketed Random Projection:
1.- preprocess pipeline
// PREPROCESSING
val tokenizer = new Tokenizer()
.setInputCol("content")
.setOutputCol("words")
// stopwords
val remover = new StopWordsRemover()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("filtered")
// hash trick
val hashingTF = new HashingTF()
.setInputCol(remover.getOutputCol)
.setOutputCol("rawFeatures")
.setNumFeatures(20)
// idf vectorizer
val idf = new IDF()
.setInputCol(hashingTF.getOutputCol)
.setOutputCol("features")
//UDF's
//l2 norm
def calcNorm(vectorA: SparseVector): Double = {
var norm = 0.0
for (i <- vectorA.indices){ norm += vectorA(i)*vectorA(i) }
(math.sqrt(norm))
}
val calcNormDF = udf[Double,SparseVector](calcNorm)
//cosine sim
def cosineSimilarity(vectorA: SparseVector, vectorB:SparseVector) :(Double) = {
var dotProduct = 0.0
for (i <- vectorA.indices){ dotProduct += vectorA(i) * vectorB(i) }
val div = (calcNorm(vectorA) * calcNorm(vectorB))
if( div == 0 ) (0)
else (dotProduct / div)
}
val calcCosine = udf[Double,SparseVector,SparseVector](cosineSimilarity)
//use only nonzero vectors
val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)
// PIPELINES
val idfPipeline = new Pipeline()
.setStages(Array(tokenizer, remover, hashingTF, idf))
2.- Load data
// LOAD DATA
val corpusData = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("data/articles.csv").drop("id","title","publication","author","date","year","month","url").na.drop()
3.- Build bucketed random projection model
// MODELS FIT-TRANSFORM'S
// fit pipelines
val idfModel = idfPipeline.fit(corpusData)
// model transforms
val tfidfRescaledData = idfModel.transform(corpusData).drop("content","filtered","words","rawFeatures")
// LSH
//brp model
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(5.0)
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashValues")
// LSH FIT-TRANSFORM'S
// fit
val shModel = brp.fit(tfidfRescaledData)
// transform (can be used for dimensionality with eculidean or cosine)
val shRescaledData = shModel.transform(tfidfRescaledData)
println("shRescaledData transform:")
shRescaledData.show(20)
4.- Persist model and tranforms to disk and MapR-DB
//SAVE
//save models and feature transformations to disk or db
/*
idfModel.write.overwrite().save("/user/mapr/models/idfModel")
cvModel.write.overwrite().save("/user/mapr/models/cvModel")
mhModel.write.overwrite().save("/user/mapr/models/mhModel")
shModel.write.overwrite().save("/user/mapr/models/shModel")
tfidfRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/tfidfRescaledData")
cvRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/cvRescaledData")
mhRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/mhRescaledData")
shRescaledData.write.format("org.apache.spark.sql.json").mode(SaveMode.Append).save("/user/mapr/transformed/shRescaledData")
*/
5.- Query new messages with approximate nearest neighbor search or cosine similarity (for cosine you can use the provided udf)
//CONFIG PARAMS
val n : Int = 25 //recommendatinos
val queryDataPath : String = "data/aricleQuery.csv" //stream data
//model paths
//transformed vectors
//LOAD
//load transforms from disk or db
//load models from disk or db
//load data, clean, drop nulls
val queryData = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load(queryDataPath).drop("id","title","publication","author","date","year","month","url").na.drop()
//PREP QUERY
// transform data to be queried
val rescaledQueryData = idfModel.transform(queryData).drop("label","content","words","rawFeatures")
.withColumnRenamed("features", "queryVector")
// get dense representations of query data (this is only case for 1 message)
val sparse_key = rescaledQueryData.select(col("queryVector")).first.getAs[org.apache.spark.ml.linalg.SparseVector](0)
val dense_Key = sparse_key.toDense
//QUERIES
// cosine distance transforms with l2 norm
val crossjoined = tfidfRescaledData.crossJoin(rescaledQueryData)
val cosine = crossjoined.withColumn("similarity", calcCosine($"features", $"queryVector"))
val cosineDF = cosine.sort(desc("similarity")).select("similarity","label").limit(n)
cosineDF.show()//show most similar objects
//bucketed random projection
println("Approximately searching dfA for 2 nearest neighbors of the sparse_key:")
val brpDF = shModel.approxNearestNeighbors(tfidfRescaledData, dense_Key, n)
brpDF.show()// approximate nearest neighbor search
6.- Stream messages and show performance
import Streams._
// test load Stream Class, pass in sparkSession as param
val streams = Streams(spark).testFunct("text to streams")
A very good stream implementation can be found here:
Thank you !
Darian Harrison: darian.harrison@hpe.com, darianharrison89@gmail.com.
Maikel Pereira: maikel.pereira@hpe.com
MapR PS Team
References:
https://mapr.com/blog/similar-document-search-using-apache-spark-with-tf-idf/