TF-IDF (Term Frequency, Inverse Document Frequency) is a basic technique to compute the relevancy of a document with respect to a particular term. "Term" is a generalized element contains within a document. A "term" is a generalized idea of what a document contains. (e.g. a term can be a word, a phrase, or a concept). Intuitively, the relevancy of a document to a term can be calculated from the percentage of that term shows up in the document (ie: the count of the term in that document divide by the total number of terms in it). We called this the "term frequency" On the other hand, if this is a very common term which appears in many other documents, then its relevancy should be reduced. (ie: the count of documents having this term divided by total number of documents). We called this the "document frequency" The overall relevancy of a document with respect to a term can be computed using both the term frequency and document frequency. relevancy = term frequency * log (1 / document frequency) This is called tf-idf. A "document" can be considered as a multi-dimensional vector where each dimension represents a term with the tf-idf as its value. Compute TF-IDF using Map/Reduce To extract the terms from a document, the following process is common Extract words by tokenize the input streams Make the words case-insensitive (e.g. transform to all lower case) Apply n-gram to extract phrases (e.g. statistically frequent n-grams is likely a phrase) Filter out stop words Stemming (e.g. transform cat, cats, kittens to cat) To keep the term simple, each word itself is a term in our example below. We use multiple rounds of Map/Reduce to gradually compute โฆ the word count of per word/doc combination the total number of words per doc the total number of docs per word. And finally compute the TF-IDF Implementation in Apache PIG There are many ways to implement the Map/Reduce paradigm above. Apache Hadoop is a pretty popular approach using Java or other programming language (ie: Hadoop Streaming). Apache PIG is another approach based on a higher level language with parallel processing construct built in. Here is the 3 rounds of map/reduce logic implemented in PIG Script REGISTER rickyudf.jar /* Build up the input data stream */ A1 = LOAD 'dirdir/data.txt' AS (words:chararray); DocWordStream1 = FOREACH A1 GENERATE 'data.txt' AS docId, FLATTEN(TOKENIZE(words)) AS word; A2 = LOAD 'dirdir/data2.txt' AS (words:chararray); DocWordStream2 = FOREACH A2 GENERATE 'data2.txt' AS docId, FLATTEN(TOKENIZE(words)) AS word; A3 = LOAD 'dirdir/data3.txt' AS (words:chararray); DocWordStream3 = FOREACH A3 GENERATE 'data3.txt' AS docId, FLATTEN(TOKENIZE(words)) AS word; InStream = UNION DocWordStream1, DocWordStream2, DocWordStream3; /* Round 1: word count per word/doc combination */ B = GROUP InStream BY (word, docId); Round1 = FOREACH B GENERATE group AS wordDoc, COUNT(InStream) AS wordCount; /* Round 2: total word count per doc */ C = GROUP Round1 BY wordDoc.docId; WW = GROUP C ALL; C2 = FOREACH WW GENERATE FLATTEN(C), COUNT(C) AS totalDocs; Round2 = FOREACH C2 GENERATE FLATTEN(Round1), SUM(Round1.wordCount) AS wordCountPerDoc, totalDocs; /* Round 3: Compute the total doc count per word */ D = GROUP Round2 BY wordDoc.word; D2 = FOREACH D GENERATE FLATTEN(Round2), COUNT(Round2) AS docCountPerWord; Round3 = FOREACH D2 GENERATE $0.word AS word, $0.docId AS docId, com.ricky.TFIDF(wordCount, wordCountPerDoc, totalDocs, docCountPerWord) AS tfidf; /* Order the output by relevancy */ ORDERRound3 = ORDER Round3 BY word ASC, tfidf DESC; DUMP ORDERRound3; Here is the corresponding User Defined Function in Java (contained in rickyudf.jar) package com.ricky; import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; public class TFIDF extends EvalFunc { @Override public Double exec(Tuple input) throws IOException { // TODO Auto-generated method stub long wordCount = (Long) input.get(0); long wordCountPerDoc = (Long) input.get(1); long totalDocs = (Long) input.get(2); long docCountPerWord = (Long) input.get(3); double tf = (wordCount * 1.0) / wordCountPerDoc; double idf = Math.log((totalDocs * 1.0) / docCountPerWord); return tf * idf; } }