From 38027bd8b82433042ce77c7754d24ebfbf064cfe Mon Sep 17 00:00:00 2001 From: agarwalpratikkumar Date: Tue, 12 Nov 2019 15:08:36 +0100 Subject: [PATCH 1/4] Clustering algorithm based on HashingTF LSH method --- .../algorithms/LocalitySentiveHashing.scala | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala diff --git a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala new file mode 100644 index 0000000..26fd3dd --- /dev/null +++ b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala @@ -0,0 +1,144 @@ +package net.sansa_stack.template.spark.rdf + +import org.apache.spark.sql.SparkSession +import org.apache.spark.rdd.RDD +import org.apache.jena.graph.Triple +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dataset +import org.apache.spark.ml.feature._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.mllib.linalg.Vector +import org.graphframes._ +import org.graphframes.GraphFrame +import org.apache.spark.ml.evaluation.ClusteringEvaluator + +/* + * + * Clustering + * @triplesType - List of rdf subjects and their predicates. + * @return - cluster of similar subjects. + */ + +class LocalitySensitiveHashing(spark: SparkSession, nTriplesRDD: RDD[Triple], dir_path: String) extends Serializable { + + def run() = { + val parsedTriples = getParsedTriples() + val extractedEntity = getOnlyPredicates(parsedTriples) + val featuredData_Df: DataFrame = vectoriseText(extractedEntity) + val (model: MinHashLSHModel, transformedData_Df: DataFrame) = minHashLSH(featuredData_Df) + val dataset: Dataset[_] = approxSimilarityJoin(model, transformedData_Df) + clusterFormation(dataset, featuredData_Df) + } + + def getParsedTriples(): RDD[(String, String, Object)] = { + // Extracting last part of Triples + return nTriplesRDD.distinct() + .map(f => { + val s = f.getSubject.getURI.split("/").last + val p = f.getPredicate.getURI.split("/").last + + if (f.getObject.isURI()) { + val o = f.getObject.getURI.split("/").last + (s, p, o) + } else { + val o = f.getObject.getLiteralValue + (s, p, o) + } + }) + } + + def getOnlyPredicates(parsedTriples: RDD[(String, String, Object)]): RDD[(String, String)] = { + return parsedTriples.map(f => { + val key = f._1 + "" // Subject is the key + val value = f._2 + "" // Predicates are the values + (key, value.replace(",", " ").stripSuffix(" ").distinct) + }).reduceByKey(_ + " " + _) + } + + def removeStopwords(tokenizedData_Df: DataFrame): DataFrame = { + val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered_words") + val removed_df = remover.transform(tokenizedData_Df) + return remover.transform(tokenizedData_Df) + } + + def vectoriseText(entities: RDD[(String, String)]): DataFrame = { + val entityProfile_Df = spark.createDataFrame(entities).toDF("entities", "attributes") + val tokenizer = new Tokenizer().setInputCol("attributes").setOutputCol("words") + val tokenizedData_Df = tokenizer.transform(entityProfile_Df) + val cleanData_Df = removeStopwords(tokenizedData_Df).distinct + val cleandfrdd = cleanData_Df.select("filtered_words").distinct.rdd + val vocab_size = calculateVocabsize(cleandfrdd) + val hashingTf = new HashingTF().setInputCol("filtered_words"). + setOutputCol("raw_Features").setNumFeatures(Math.round(0.90 * vocab_size).toInt) + val isNonZeroVector = udf({ v: Vector => v.numNonzeros > 0 }, DataTypes.BooleanType) + val featuredData_hashedDf = hashingTf.transform(cleanData_Df).filter(isNonZeroVector(col("raw_Features"))) + val idf = new IDF().setInputCol("raw_Features").setOutputCol("features") + val idfModel = idf.fit(featuredData_hashedDf) + val rescaledHashedData = idfModel.transform(featuredData_hashedDf). + filter(isNonZeroVector(col("features"))) + + return rescaledHashedData + } + + def calculateVocabsize(cleandfrdd: RDD[Row]): Int = { + val vocab = cleandfrdd.map(_.mkString).reduce(_ + ", " + _).split(", ").toSet + return (vocab.size) + + } + + def minHashLSH(featuredData_Df: DataFrame): (MinHashLSHModel, DataFrame) = { + val mh = new MinHashLSH().setNumHashTables(3).setInputCol("features").setOutputCol("HashedValues") + val model = mh.fit(featuredData_Df) + val transformedData_Df = model.transform(featuredData_Df) + return (model, transformedData_Df) + } + + def approxSimilarityJoin(model: MinHashLSHModel, transformedData_Df: DataFrame): Dataset[_] = { + val threshold = 0.40 + return model.approxSimilarityJoin(transformedData_Df, transformedData_Df, threshold) + } + + def clusterFormation(dataset: Dataset[_], featuredData_Df: DataFrame) = { + val featuredData = featuredData_Df.drop("attributes", "words", "filtered_words") + + val refined_entities_dataset = dataset + .filter("datasetA.entities != datasetB.entities") + .select(col("datasetA.entities").alias("src"), col("datasetB.entities").alias("dst")) + + import spark.implicits._ + val c_1 = refined_entities_dataset.select("src") + val c_2 = refined_entities_dataset.select("dst") + val vertexDF = c_1.union(c_2).distinct().toDF("id") + + val g = GraphFrame(vertexDF, refined_entities_dataset) + g.persist() + spark.sparkContext.setCheckpointDir(dir_path) + + // Connected Components are the generated clusters. + val connected_components = g.connectedComponents.run() + val connected_components_ = connected_components.withColumnRenamed("component", "prediction"). + withColumnRenamed("id", "entities") + clusterQuality(connected_components_, featuredData) + } + + /* + * Calculating Silhouette score, which will tell how good clusters are. + * Silhouette values ranges from [-1,1]. + * Values closer to 1 indicates better clusters + */ + + def clusterQuality(connectedComponents: DataFrame , featuredData: DataFrame) = { + var silhouetteInput = connectedComponents.join(featuredData, "entities") + val evaluator = new ClusteringEvaluator().setPredictionCol("prediction"). + setFeaturesCol("features").setMetricName("silhouette") + val silhouette = evaluator.evaluate(silhouetteInput) + println(s"Silhouette for LSH = $silhouette") + } + +} + +object LocalitySensitiveHashing { + def apply(spark: SparkSession, nTriplesRDD: RDD[Triple], dir_path: String): LocalitySensitiveHashing = new LocalitySensitiveHashing(spark, nTriplesRDD, dir_path) +} \ No newline at end of file From 794ba69a74d8a5382e6e081934e8f339261ebb43 Mon Sep 17 00:00:00 2001 From: agarwalpratikkumar Date: Wed, 1 Jan 2020 16:33:45 +0100 Subject: [PATCH 2/4] updated messages --- .../spark/clustering/algorithms/LocalitySentiveHashing.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala index 26fd3dd..0461fb2 100644 --- a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala +++ b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala @@ -133,12 +133,11 @@ class LocalitySensitiveHashing(spark: SparkSession, nTriplesRDD: RDD[Triple], di var silhouetteInput = connectedComponents.join(featuredData, "entities") val evaluator = new ClusteringEvaluator().setPredictionCol("prediction"). setFeaturesCol("features").setMetricName("silhouette") - val silhouette = evaluator.evaluate(silhouetteInput) - println(s"Silhouette for LSH = $silhouette") + val silhouette = evaluator.evaluate(silhouetteInput) } } object LocalitySensitiveHashing { def apply(spark: SparkSession, nTriplesRDD: RDD[Triple], dir_path: String): LocalitySensitiveHashing = new LocalitySensitiveHashing(spark, nTriplesRDD, dir_path) -} \ No newline at end of file +} From f1fb2fa65546f6c1a811d1fde435947433322379 Mon Sep 17 00:00:00 2001 From: agarwalpratikkumar Date: Wed, 12 Feb 2020 11:32:36 +0100 Subject: [PATCH 3/4] Udated triple parsing --- .../clustering/algorithms/LocalitySentiveHashing.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala index 0461fb2..2262342 100644 --- a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala +++ b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala @@ -36,11 +36,11 @@ class LocalitySensitiveHashing(spark: SparkSession, nTriplesRDD: RDD[Triple], di // Extracting last part of Triples return nTriplesRDD.distinct() .map(f => { - val s = f.getSubject.getURI.split("/").last - val p = f.getPredicate.getURI.split("/").last + val s = f.getSubject.getLocalName + val p = f.getPredicate.getLocalName if (f.getObject.isURI()) { - val o = f.getObject.getURI.split("/").last + val o = f.getObject.getLocalName (s, p, o) } else { val o = f.getObject.getLiteralValue From 00218a9a4ed027073d0a80d1afcf7ca0b5f75e3e Mon Sep 17 00:00:00 2001 From: agarwalpratikkumar Date: Thu, 20 Feb 2020 13:31:53 +0100 Subject: [PATCH 4/4] Added directory removal function in LocalitySensitiveHashing. Added dependency of graphframes and test file for unit-testing --- sansa-ml-spark/pom.xml | 6 + ...g.scala => LocalitySensitiveHashing.scala} | 35 ++++- .../src/test/resources/Cluster/testSample.nt | 124 ++++++++++++++++++ .../LocalitySensitiveHashingTest.scala | 26 ++++ 4 files changed, 187 insertions(+), 4 deletions(-) rename sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/{LocalitySentiveHashing.scala => LocalitySensitiveHashing.scala} (87%) create mode 100644 sansa-ml-spark/src/test/resources/Cluster/testSample.nt create mode 100644 sansa-ml-spark/src/test/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashingTest.scala diff --git a/sansa-ml-spark/pom.xml b/sansa-ml-spark/pom.xml index 2638ae8..626e892 100644 --- a/sansa-ml-spark/pom.xml +++ b/sansa-ml-spark/pom.xml @@ -61,6 +61,12 @@ spark-mllib_${scala.binary.version} ${spark.version} + + + graphframes + graphframes + 0.7.0-spark2.4-s_2.11 + com.github.fommil.netlib diff --git a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashing.scala similarity index 87% rename from sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala rename to sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashing.scala index 2262342..bdbb7ea 100644 --- a/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySentiveHashing.scala +++ b/sansa-ml-spark/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashing.scala @@ -1,4 +1,4 @@ -package net.sansa_stack.template.spark.rdf +package net.sansa_stack.ml.spark.clustering.algorithms import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD @@ -13,6 +13,9 @@ import org.apache.spark.mllib.linalg.Vector import org.graphframes._ import org.graphframes.GraphFrame import org.apache.spark.ml.evaluation.ClusteringEvaluator +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file._ +import java.io._ /* * @@ -85,7 +88,6 @@ class LocalitySensitiveHashing(spark: SparkSession, nTriplesRDD: RDD[Triple], di def calculateVocabsize(cleandfrdd: RDD[Row]): Int = { val vocab = cleandfrdd.map(_.mkString).reduce(_ + ", " + _).split(", ").toSet return (vocab.size) - } def minHashLSH(featuredData_Df: DataFrame): (MinHashLSHModel, DataFrame) = { @@ -118,17 +120,42 @@ class LocalitySensitiveHashing(spark: SparkSession, nTriplesRDD: RDD[Triple], di // Connected Components are the generated clusters. val connected_components = g.connectedComponents.run() + + //Removing the graphframes checkpoint directory + val file_path = Paths.get(dir_path) + removePathFiles(file_path) + val connected_components_ = connected_components.withColumnRenamed("component", "prediction"). withColumnRenamed("id", "entities") clusterQuality(connected_components_, featuredData) } - + + /* + * Removing the graphframes checkpoint directory. + */ + + def removePathFiles(root: Path): Unit = { + if (Files.exists(root)) { + Files.walkFileTree(root, new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + }) + } + } + /* * Calculating Silhouette score, which will tell how good clusters are. * Silhouette values ranges from [-1,1]. * Values closer to 1 indicates better clusters */ - + def clusterQuality(connectedComponents: DataFrame , featuredData: DataFrame) = { var silhouetteInput = connectedComponents.join(featuredData, "entities") val evaluator = new ClusteringEvaluator().setPredictionCol("prediction"). diff --git a/sansa-ml-spark/src/test/resources/Cluster/testSample.nt b/sansa-ml-spark/src/test/resources/Cluster/testSample.nt new file mode 100644 index 0000000..0c8ed99 --- /dev/null +++ b/sansa-ml-spark/src/test/resources/Cluster/testSample.nt @@ -0,0 +1,124 @@ + . + "FullProfessor0" . + . + . + . + . + . + . + . + "FullProfessor1@Department0.University0.edu" . + "xxx-xxx-xxxx" . + "Research12" . + . + . + "Publication10" . + . + . + . + "Publication11" . + . + . + . + "Publication12" . + . + . + "Thing" . + "The Product Type of all Products" . + . + "2000-07-04"^^ . + . + "amour dupable" . + "lengthened galling outposts emulatively looming deplanes stinters forehanded interdisciplinary manoeuvred frankers pederasty heralds disrupts fishnet falsifiable absorbable appreciators undefinable draftable revindicates flashlamp insertion expurgations tarsier" . + . + . + "2000-06-26"^^ . + . + . + . + "8344.69"^^ . + "2008-04-05T00:00:00"^^ . + "2008-07-18T00:00:00"^^ . + "4"^^ . + . + . + "2008-04-19"^^ . + . + . + . + "3063.54"^^ . + "2008-03-01T00:00:00"^^ . + "2008-05-15T00:00:00"^^ . + "2"^^ . + . + . + "2008-04-03"^^ . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + "Phantomville" . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + . + "HomeMade" . + . + . + . + . + . + "21791" . + . + "larry-j-bird" . + . + . + . + . + . + . + . + . + . + . + "Ramone Moore"@en . \ No newline at end of file diff --git a/sansa-ml-spark/src/test/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashingTest.scala b/sansa-ml-spark/src/test/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashingTest.scala new file mode 100644 index 0000000..1510e30 --- /dev/null +++ b/sansa-ml-spark/src/test/scala/net/sansa_stack/ml/spark/clustering/algorithms/LocalitySensitiveHashingTest.scala @@ -0,0 +1,26 @@ +package net.sansa_stack.ml.spark.clustering.algorithms + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.jena.riot.Lang +import org.apache.spark.rdd.RDD +import org.scalatest.FunSuite + +import net.sansa_stack.ml.spark.clustering.algorithms._ + +class LocalitySensitiveHashingTest extends FunSuite with DataFrameSuiteBase { + + import net.sansa_stack.rdf.spark.io._ + + val path = getClass.getResource("/Cluster/testSample.nt").getPath + val graphframeCheckpointPath = "/sansa-ml-spark_2.11/src/main/scala/net/sansa_stack/ml/spark/clustering/algorithms/graphframeCheckpoints" + + test("performing clustering using HashingTF method") { + val triples = spark.rdf(Lang.NTRIPLES)(path) + + val cluster_ = new LocalitySensitiveHashing(spark, triples, graphframeCheckpointPath) + cluster_.run() + assert(true) + + } + +} \ No newline at end of file