Skip to content

Commit 2c18513

Browse files
committed
Add examples for reading HBase and Cassandra InputFormats from Python
1 parent b65606f commit 2c18513

File tree

4 files changed

+177
-1
lines changed

4 files changed

+177
-1
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import sys
19+
20+
from pyspark import SparkContext
21+
22+
"""
23+
Create data in Cassandra fist
24+
(following: https://wiki.apache.org/cassandra/GettingStarted)
25+
26+
cqlsh> CREATE KEYSPACE test
27+
... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
28+
cqlsh> use test;
29+
cqlsh:test> CREATE TABLE users (
30+
... user_id int PRIMARY KEY,
31+
... fname text,
32+
... lname text
33+
... );
34+
cqlsh:test> INSERT INTO users (user_id, fname, lname)
35+
... VALUES (1745, 'john', 'smith');
36+
cqlsh:test> INSERT INTO users (user_id, fname, lname)
37+
... VALUES (1744, 'john', 'doe');
38+
cqlsh:test> INSERT INTO users (user_id, fname, lname)
39+
... VALUES (1746, 'john', 'smith');
40+
cqlsh:test> SELECT * FROM users;
41+
42+
user_id | fname | lname
43+
---------+-------+-------
44+
1745 | john | smith
45+
1744 | john | doe
46+
1746 | john | smith
47+
"""
48+
if __name__ == "__main__":
49+
if len(sys.argv) != 4:
50+
print >> sys.stderr, """
51+
Usage: cassandra_inputformat <host> <keyspace> <cf>
52+
53+
Run with example jar:
54+
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
55+
Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
56+
"""
57+
exit(-1)
58+
59+
host = sys.argv[1]
60+
keyspace = sys.argv[2]
61+
cf = sys.argv[3]
62+
sc = SparkContext(appName="HBaseInputFormat")
63+
64+
conf = {"cassandra.input.thrift.address":host,
65+
"cassandra.input.thrift.port":"9160",
66+
"cassandra.input.keyspace":keyspace,
67+
"cassandra.input.columnfamily":cf,
68+
"cassandra.input.partitioner.class":"Murmur3Partitioner",
69+
"cassandra.input.page.row.size":"3"}
70+
cass_rdd = sc.newAPIHadoopRDD(
71+
"org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
72+
"java.util.Map",
73+
"java.util.Map",
74+
keyConverter="org.apache.spark.examples.CassandraCQLKeyConverter",
75+
valueConverter="org.apache.spark.examples.CassandraCQLValueConverter",
76+
conf=conf)
77+
output = cass_rdd.collect()
78+
for (k, v) in output:
79+
print (k, v)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import sys
19+
20+
from pyspark import SparkContext
21+
22+
"""
23+
Create test data in HBase first:
24+
25+
hbase(main):016:0> create 'test', 'f1'
26+
0 row(s) in 1.0430 seconds
27+
28+
hbase(main):017:0> put 'test', 'row1', 'f1', 'value1'
29+
0 row(s) in 0.0130 seconds
30+
31+
hbase(main):018:0> put 'test', 'row2', 'f1', 'value2'
32+
0 row(s) in 0.0030 seconds
33+
34+
hbase(main):019:0> put 'test', 'row3', 'f1', 'value3'
35+
0 row(s) in 0.0050 seconds
36+
37+
hbase(main):020:0> put 'test', 'row4', 'f1', 'value4'
38+
0 row(s) in 0.0110 seconds
39+
40+
hbase(main):021:0> scan 'test'
41+
ROW COLUMN+CELL
42+
row1 column=f1:, timestamp=1401883411986, value=value1
43+
row2 column=f1:, timestamp=1401883415212, value=value2
44+
row3 column=f1:, timestamp=1401883417858, value=value3
45+
row4 column=f1:, timestamp=1401883420805, value=value4
46+
4 row(s) in 0.0240 seconds
47+
"""
48+
if __name__ == "__main__":
49+
if len(sys.argv) != 3:
50+
print >> sys.stderr, """
51+
Usage: hbase_inputformat <host> <table>
52+
53+
Run with example jar:
54+
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py <host> <table>
55+
Assumes you have some data in HBase already, running on <host>, in <table>
56+
"""
57+
exit(-1)
58+
59+
host = sys.argv[1]
60+
table = sys.argv[2]
61+
sc = SparkContext(appName="HBaseInputFormat")
62+
63+
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
64+
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
65+
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
66+
"org.apache.hadoop.hbase.client.Result",
67+
valueConverter="org.apache.spark.examples.HBaseConverter",
68+
conf=conf)
69+
output = hbase_rdd.collect()
70+
for (k, v) in output:
71+
print (k, v)

examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job
3232

3333
import org.apache.spark.{SparkConf, SparkContext}
3434
import org.apache.spark.SparkContext._
35+
import org.apache.spark.api.python.Converter
36+
37+
class CassandraCQLKeyConverter extends Converter {
38+
import collection.JavaConversions._
39+
override def convert(obj: Any) = {
40+
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
41+
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
42+
}
43+
}
44+
45+
class CassandraCQLValueConverter extends Converter {
46+
import collection.JavaConversions._
47+
override def convert(obj: Any) = {
48+
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
49+
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
50+
}
51+
}
3552

3653
/*
3754
Need to create following keyspace and column family in cassandra before running this example

examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,21 @@
1717

1818
package org.apache.spark.examples
1919

20-
import org.apache.hadoop.hbase.client.HBaseAdmin
20+
import org.apache.hadoop.hbase.client.{Result, HBaseAdmin}
2121
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
2222
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
2323

2424
import org.apache.spark._
2525
import org.apache.spark.rdd.NewHadoopRDD
26+
import org.apache.spark.api.python.Converter
27+
import org.apache.hadoop.hbase.util.Bytes
28+
29+
class HBaseConverter extends Converter {
30+
override def convert(obj: Any) = {
31+
val result = obj.asInstanceOf[Result]
32+
Bytes.toStringBinary(result.value())
33+
}
34+
}
2635

2736
object HBaseTest {
2837
def main(args: Array[String]) {

0 commit comments

Comments
 (0)