Implementing meaningful phrase finding with Spark

Phrase finding is an interesting problem to solve: Given a bunch of text, what are the most “interesting” phrases present in the text? We just have a lot of GBs worth of text, and we want to somehow extract meaningful phrases out of it. Additionally, there is no supervised data.

For example, given more than 146 million bigrams and unigrams taken from literature of the 1960s, 1970s, 1980s and 1990s, the next figure shows the bigrams that the algorithm considers most interesting for the 1990s. Phrases that differentiate the 1990s from the other decades should get a higher score, for instance Windows-NT, web site, gulf war, etc.

Interesting phrases for the 1990s

This problem has already been solved by a lot of people. The approach I’m discussing here is the one developed by Tomokisho and Hurst back in 2003. It is simple to implement and easy to scale.

The first question to answer is, what exactly differentiates a phrase from a generic n-gram? Well, phrases are independently meaningful, for example “red wine”, or “fast car”. We do not care for n-grams such as “where are” or “are you”.

We also need to have some kind of metric to define what makes a good phrase. Intuitively, two basic properties are worth mentioning:

  • The phraseness of the phrase: How often do words in the phrase occur together compared to how often do words of the phrase occur separately.

  • The informativeness of the phrase: How well does the phrase capture the main ideas of a set of documents.

In order to measure the informativeness of a phrase, we must have two corpora to compare. A foreground corpus, and a background corpus. We say that a phrase is informative, if it is much more frequent in the foreground corpus than in the background corpus. For example, if we use current news documents as the foreground corpus, and news documents from the 1990s as the background corpus, we might find that “Barack Obama” and “Net neutrality” are informative phrases, but “Middle East” and “United States” are not, since they have almost the same frequency now as they did before.

The first approach proposed to extract these metrics from a corpus of text is to use the Binomial Ratio Likelihood Test (BLRT). Given two samples: one with $n_1$ draws and $k_1$ successes, and the other one with $n_2$ draws with $k_2$ successes, are they from two distinct binomials? or, did they came from the same binomial, and were different due to chance?

Where, $p_i = \frac{k_i}{n_i}$ , $p = (k_1+k_2)/(n_1+n_2)$ and $ B(p, k, n) = p^{k} (1-p)^{n-k} $

As an example, to measure the informativeness of the bigram $xy$, we would set $k_1$ to the amount of times that the bigram $xy$ appears in the foreground corpus, $n_1$ to the amount of bigrams in the foreground corpus, $k_2$ to the amount of times the bigram $xy$ occurs in the background corpus, and $n_2$ to the amount of bigrams in the background corpus.

BLRT might not perform very well in some cases. The problem is that BLRT scores ”more frequent in foreground corpus” and ”more frequent in background corpus” symmetrically. Therefore, a phrase that is very frequent in the background corpus and rare in the foreground corpus, would be considered “informative”, when it is actually not.

A better aproach, is to use KL divergence, denoted $D(P || Q)$. The idea is to compare two distributions to see how different they are.

KL divergence measures how much information is lost when Q is used to approximate P. The authors define pointwise KL divergence: Pointwise KL divergence measures the contribution of the n-gram $w$ to the expected information loss of the entire distribution. We will use pointwise KL divergence to estimate the phraseness and informativeness score of an n-gram $w$.

The phraseness $\gamma_p$ of an n-gram $w$ is the amount of information that is lost by assuming independence of each word by using a unigram model, instead of the n-gram model

The informativeness $\gamma_i$ of an n-gram $w$ is the amount of information that is lost by assuming that the n-gram $w$ is drawn from the background model instead of the foreground model.

Finally, the total score of an n-gram $w$ will be $\gamma_p + \gamma_i$. Other approaches are valid, such as taking a weighted sum. This involves handling hyperparameters though.

#Implementation

Now, how would we implement this? If the dataset is not large enough, you’re better off implementing a simple stream-and-sort algorithm. However, as your data grows large, you might need to scale out. MapReduce or Spark are good fits for this problem.

The solution in Spark is not too complicated. I usually use pyspark, but I wanted to try the Scala version this time (I’m new to Scala :)). You might want to jump to results if you’re not interested in this.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.ikaver.phrases

import java.util.Comparator
import java.util.function.{ToDoubleFunction, ToIntFunction, ToLongFunction, Function}

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

import scala.collection.mutable.{ListBuffer}

object MeaningfulPhraseFinding {
  def main(args: Array[String]) {
    //assumed:
    //structure of bigrams  file is one bigram  per line, with format bigram[TAB]year[TAB]count
    //structure of unigrams file is one unigram per line, with format unigram[TAB]year[TAB]count

    //stop words. Cleaner solution would read these from a file :)
    val stopWords = Set("a", "about", "above", "across", "after", "afterwards", "again", "against", "all", "almost", "alone", "along", "already", "also", "although", "always", "am", "among", "amongst", "amoungst", "amount", "an", "and", "another", "any", "anyhow", "anyone", "anything", "anyway", "anywhere", "are", "around", "as", "at", "back", "be", "became", "because", "become", "becomes", "becoming", "been", "before", "beforehand", "behind", "being", "below", "beside", "besides", "between", "beyond", "bill", "both", "bottom", "but", "by", "call", "can", "cannot", "cant", "co", "computer", "con", "could", "couldnt", "cry", "de", "describe", "detail", "do", "done", "down", "due", "during", "each", "eg", "eight", "either", "eleven", "else", "elsewhere", "empty", "enough", "etc", "even", "ever", "every", "everyone", "everything", "everywhere", "except", "few", "fifteen", "fify", "fill", "find", "fire", "first", "five", "for", "former", "formerly", "forty", "found", "four", "from", "front", "full", "further", "get", "give", "go", "had", "has", "hasnt", "have", "he", "hence", "her", "here", "hereafter", "hereby", "herein", "hereupon", "hers", "herself", "him", "himself", "his", "how", "however", "hundred", "i", "ie", "if", "in", "inc", "indeed", "interest", "into", "is", "it", "its", "itself", "keep", "last", "latter", "latterly", "least", "less", "ltd", "made", "many", "may", "me", "meanwhile", "might", "mill", "mine", "more", "moreover", "most", "mostly", "move", "much", "must", "my", "myself", "name", "namely", "neither", "never", "nevertheless", "next", "nine", "no", "nobody", "none", "noone", "nor", "not", "nothing", "now", "nowhere", "of", "off", "often", "on", "once", "one", "only", "onto", "or", "other", "others", "otherwise", "our", "ours", "ourselves", "out", "over", "own", "part", "per", "perhaps", "please", "put", "rather", "re", "same", "see", "seem", "seemed", "seeming", "seems", "serious", "several", "she", "should", "show", "side", "since", "sincere", "six", "sixty", "so", "some", "somehow", "someone", "something", "sometime", "sometimes", "somewhere", "still", "such", "system", "take", "ten", "than", "that", "the", "their", "them", "themselves", "then", "thence", "there", "thereafter", "thereby", "therefore", "therein", "thereupon", "these", "they", "thick", "thin", "third", "this", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "top", "toward", "towards", "twelve", "twenty", "two", "un", "under", "until", "up", "upon", "us", "very", "via", "was", "we", "well", "were", "what", "whatever", "when", "whence", "whenever", "where", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will", "with", "within", "without", "would", "yet", "you", "your", "yours", "yourself", "yourselves")

    val phraseWeight = args(0).toDouble //how much weight do we give to phraseness in the final score
    val infoWeight = args(1).toDouble   //how much weight do we give to informativeness in the final score
    val numBigrams = args(2).toInt      //how many bigrams to include in the final result (top K bigrams)
    val numMappers = args(3).toInt      //how many mappers to use
    val numReducers = args(4).toInt     //how many reducers to use
    val fgYear = args(5).toInt          //what is the year that we are considering to be the foreground (FG)
    val bigramsPath = args(6)           //the path to the bigrams file
    val unigramsPath = args(7)          //the path to the unigrams file

    //our goal here is to construct an RDD in the form of
    //(bigram, bigramFGCount, bigramBGCount, firstWordFGCount, secondWordFGCount)
    //so we can easily compute the phraseness and informativeness of each bigram

    //initialize the spark context
    //might need to tweak values such as spark.driver.memory to get good performance here
    val sparkConf = new SparkConf().setAppName("Phrases").setMaster(s"local[$numMappers]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)

    //get RDD of bigrams in form (bigram, fgCount, bgCount). Remove bigrams with stop words.
    val bigramsRaw = sc.textFile(bigramsPath)
    val bigramTokens = bigramsRaw.map{ line =>
      val tokens = line.split("\t")
      val isFgYear = tokens(1).toInt == fgYear
      val count = tokens(2).toInt
      if(isFgYear) (tokens(0), (count, 0))
      else (tokens(0), (0, count))
    }.filter{ x =>
      val bigram = x._1
      val words = bigram.split(" ")
      !(stopWords.contains(words(0)) || stopWords.contains(words(1)))
    }
    //aggregate bigrams with the same key (sum all FG and BG counts of the same bigrams)
    val aggregatedBigrams = bigramTokens.reduceByKey({(x:(Int,Int), y:(Int,Int)) =>
      (x._1+y._1, x._2+y._2)
    }, numReducers)

    //map bigrams to have the first word as the key, to later group with the unigrams RDD
    val processedBigrams = aggregatedBigrams.map{ x =>
      val (bigram, counts) = x
      val words = bigram.split(" ")
      (words(0), (words(1), counts))
    }.persist()

    //get uniqueBigrams, totalBigramsFG and totalBigramsBG counts for smoothing
    val uniqueBigramsCount = processedBigrams.count()
    val (totalBigramsFG, totalBigramsBG)  = processedBigrams.map{ x => (x._2._2._1.toLong, x._2._2._2.toLong) }
      .reduce{(x:(Long,Long), y:(Long,Long)) => (x._1+y._1, x._2+y._2) }

    //get RDD of unigrams in form (unigram, fgCount, bgCount). Remove unigrams with stop words.
    val unigramsRaw = sc.textFile(unigramsPath)
    val unigramTokens = unigramsRaw.map{ line =>
      val tokens = line.split("\t")
      val isFgYear = tokens(1).toInt == fgYear
      val count = tokens(2).toInt
      if(isFgYear) (tokens(0), (count, 0))
      else (tokens(0), (0, count))
    }.filter{x => !stopWords.contains(x._1)}

    //aggregate unigrams with the same key (sum all FG and BG counts of the same unigram)
    val processedUnigrams = unigramTokens.reduceByKey({(x:(Int,Int), y:(Int,Int)) =>
      (x._1+y._1, x._2+y._2)
    }, numReducers).persist()

    //get uniqueUnigrams, totalUnigramsFG counts for smoothing
    val uniqueUnigramsCount = processedUnigrams.count()
    val (totalUnigramsFG, _) = processedUnigrams.map{ x => (x._2._1.toLong, x._2._2.toLong) }
      .reduce{(x:(Long,Long), y:(Long,Long)) => (x._1+y._1, x._2+y._2) }

    //group the processedUnigrams RDD with the processedBigrams RDD to get the counts of the
    //first word of the bigram with the bigram counts. We will later do the same for the second word of the bigram.
    val groupedByFirstWord = processedUnigrams.cogroup(processedBigrams)
    val bigramsWithFirstWordCount : RDD[(String, (String, Int, Int, Int))] = groupedByFirstWord.flatMap{ x =>
      val (firstWord, (unigramIter, bigramIter)) = x
      val (unigramFG, _) = unigramIter.head
      bigramIter.map{ bigram =>
        val (secondWord, (bigramFG, bigramBG)) = bigram
        //use the second word of the bigram as the key to later to do the same for the second word
        (secondWord, (firstWord, bigramFG, bigramBG, unigramFG))
      }
    }

    //group the bigramsWithFirstWordCount with the processedUnigrams RDD to get the counts
    //of the second word of the bigram with the bigram counts.
    //After this step we will finally get the RDD wanted (firstWord, secondWord, bigramFG, bigramBG, firstFG, secondFG)
    val groupedBySecondWord = processedUnigrams.cogroup(bigramsWithFirstWordCount)
    val bigramsWithUnigramData: RDD[(String, String, Int, Int, Int, Int)] = groupedBySecondWord.flatMap{ x =>
      val (secondWord, (unigramIter, bigramIter)) = x
      val (unigramFG, _) = unigramIter.head
      bigramIter.map{ bigram =>
        val (firstWord, bigramFG, bigramBG, firstFG) = bigram
        (firstWord, secondWord, bigramFG, bigramBG, firstFG, unigramFG)
      }
    }

    //scoresOfBigrams indicates the final score of each bigram
    val scoresOfBigrams = bigramsWithUnigramData.map{ x =>
      def klDivergence(p: Double, q: Double) = {
        p * (Math.log(p) - Math.log(q))
      }
      val (firstWord, secondWord, bigramFG, bigramBG, firstFG, secondFG) = x

      //use smoothing to calculate probabilities
      val pBigramFG = (bigramFG + 1).toDouble / (uniqueBigramsCount + totalBigramsFG).toDouble
      val pBigramBG = (bigramBG + 1).toDouble / (uniqueBigramsCount + totalBigramsBG).toDouble
      val pFirstFG  = (firstFG  + 1).toDouble / (uniqueUnigramsCount + totalUnigramsFG).toDouble
      val pSecondFG = (secondFG + 1).toDouble / (uniqueUnigramsCount + totalUnigramsFG).toDouble

      val phraseness = klDivergence(pBigramFG, pFirstFG * pSecondFG)
      val informativeness = klDivergence(pBigramFG, pBigramBG)
      (firstWord + " " + secondWord, phraseWeight * phraseness + infoWeight * informativeness, phraseness, informativeness)
    }

    //take the top K bigrams and print them to stdout
    scoresOfBigrams.takeOrdered(numBigrams)(ScoreOrdering).foreach{ x =>
      val (bigram, totalScore, phrasenessScore, informativenessScore) = x
      println(f"$bigram\t$totalScore%.5f\t$phrasenessScore%.5f\t$informativenessScore%.5f")
    }

    sc.stop()
  }

  //Simple ordering object to sort bigram records by total score
  object ScoreOrdering extends Ordering[(String, Double, Double, Double)] {
    override def compare(x: (String, Double, Double, Double), y: (String, Double, Double, Double)): Int = {
      y._2 compare x._2
    }
  }

}

Right now, the code is running Spark in local mode. To get the best performance in a cluster, the code would need to be changed slightly to run Spark in standalone mode. Also, some values might need to be tweaked, such as spark.driver.memory and others for the code to run well on your computer.

Results

We get pretty cool results with this simple algorithm. I’m using a weighted sum of the phraseness and informativeness as the total score, with the informativeness weighted 100 times as much as the phraseness.

The first dataset contains only bigrams that include the word “war”. It includes about 60,000 bigrams taken from literature from four different decades: the 1960s, 1970s, 1980s and 1990s. We see some very interesting results as we consider each decade as the foreground corpus. Again, larger text means higher score.

War phrases of the 1960s

Interesting phrases for the 1960s

We see the world war and civil war are the top scorers. Most of the score of these two bigrams comes from the phraseness component, so we will see these on the other decades as well.

War phrases of the 1970s

Interesting phrases for the 1970s

We start seeing some variation here, compared to the 1960s. For instance, we now see bigrams representing wars of the 1970s, such as indian war, mexican war and vietnam war.

War phrases of the 1980s

Interesting phrases for the 1980s

More variation here, compared to the 1970s. Suddenly the nuclear war bigram got a really high score. Also we see some bigrams representing wars that occured in the 1970s and 1980s, such as falklands war, kippur war and iraqi war.

War phrases of the 1990s

Interesting phrases for the 1990s

These results are pretty interesting. Now gulf war is the highest scoring bigram. Also the cold war bigram got a really high score. We also start seeing new bigrams such as drug war, war crimes, war veterans, postcold-war.

Results for the larger dataset

The next results are the output of the algorithm on a much larger dataset, that contains more than 146 million bigrams and 9 million unigrams, taken from literature from the 1960s, 1970s, 1980s and 1990s.

Phrases of the 1960s

Interesting phrases for the 1960s

Phrases of the 1970s

Interesting phrases for the 1970s

Phrases of the 1980s

Interesting phrases for the 1980s

Phrases of the 1990s

Interesting phrases for the 1990s

Again, lots of interesting stuff here. It might seem strange to you that bigrams like “et al” get such a high score, since it doesn’t seem to be an “interesting” phrase. The reason why “et al” gets such a high score is because the phraseness of the bigram is very high, therefore, even though the informativeness is very low, it gets a pretty high score. The scoring algorithm could be tweaked slightly to avoid these problems.

Comments