-
Notifications
You must be signed in to change notification settings - Fork 598
Enron Emails Example
This sample can be run from the root directory with ./gradlew enronEmails. This will download the Enron email corpus and import them
automatically in to mongodb. You can manually download a copy of the data set
here.
Abbreviated code snippets shown below, but you may also check out the full source.
Each document in the data set contains a single e-mail, including headers for sender and recipients. In this example we will build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from
the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key.
MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it
can be serialized across Hadoop nodes and sorted.
@Override
public void map(NullWritable key, BSONObject val, final Context context)
throws IOException, InterruptedException{
if (val.containsKey("headers")) {
BSONObject headers = (BSONObject)val.get("headers");
if (headers.containsKey("From") && headers.containsKey("To")){
String from = (String)headers.get("From");
String to = (String)headers.get("To");
String[] recips = to.split(",");
for(int i=0;i<recips.length;i++){
String recip = recips[i].trim();
if (recip.length() > 0) {
context.write(new MailPair(from, recip), new IntWritable(1));
}
}
}
}
}The reduce class will take the collected values for each key, sum them together, and record the output.
@Override
public void reduce( final MailPair pKey,
final Iterable<IntWritable> pValues,
final Context pContext )
throws IOException, InterruptedException{
int sum = 0;
for ( final IntWritable value : pValues ){
sum += value.get();
}
BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
BSONWritable pkeyOut = new BSONWritable(outDoc);
pContext.write( pkeyOut, new IntWritable(sum) );
}To accomplish the same with pig, but with much less work:
REGISTER ../mongo-2.10.1.jar;
REGISTER ../core/target/mongo-hadoop-core_cdh4.3.0-1.1.0.jar
REGISTER ../pig/target/mongo-hadoop-pig_cdh4.3.0-1.1.0.jar
raw = LOAD 'file:///Users/mike/dump/enron_mail/messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;
send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;
send_recip_filtered = FILTER send_recip BY to IS NOT NULL;
send_recip_split = FOREACH send_recip_filtered GENERATE from as from, FLATTEN(TOKENIZE(to)) as to;
send_recip_split_trimmed = FOREACH send_recip_split GENERATE from as from, TRIM(to) as to;
send_recip_grouped = GROUP send_recip_split_trimmed BY (from, to);
send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;
STORE send_recip_counted INTO 'file:///tmp/enron_emailcounts.bson' using com.mongodb.hadoop.pig.BSONStorage;
The MongoDB Hadoop Connector can also read data from S3 buckets. You can exercise this functionality through this example by doing the following:
- Download the enron data set and put
messages.bsoninto an S3 bucket. - Set the environment variables
AWS_SECRET_ACCESS_KEYandAWS_ACCESS_KEY_IDper your AWS account. - Run
./examples/elastic-mapreduce/update_s3.shfrom the root of this project. This will placeemr-bootstrap.shand the necessary JAR files into the S3 bucket and sets their permissions so that they're readable. Note: This script requires thes3cputility, which you can install usingget install s3cpor clone from Github.emr-bootstrap.shwill download dependencies like the MongoDB Java Driver and the MongoDB Hadoop Connector core jar as well as set the classpath. - Run
./examples/elastic-mapreduce/run_emr_job.shto submit the job to Elastic MapReduce. Note: This requireselastic-mapreduce-ruby.