-
Notifications
You must be signed in to change notification settings - Fork 7
Home
Revision History 2014/03/25 0.1 Shao, Saisai 2014/03/25 0.1.1 Shane, Huang
StreamSQL is a Spark component based on Catalyst and Spark Streaming, aiming to support SQL-style queries on data streams. Our target is to advance the progress of Catalyst as well as Spark Streaming by bridging the gap between structured data queries and stream processing.
Our StreamSQL provides:
- Full SQL support on streaming data and extended time-based aggregation and join.
- Easy mutual operation between DStream and SQL.
- Table and stream mutual operation with a simple query.
Creating StreamSQLContext
StreamSQLContext is the entry point for all DStream related functionalities. It is the counterpart of SQLContext for Spark. StreamingContext can be created as below.
val ssc: StreamingContext
val streamSqlContext = new StreamSQLContext(ssc)
import streamSqlContext._
Running SQL on DStreams:
case class Person(name: String, age: String)
// Create an DStream of Person objects and register it as a stream.
val people: DStream[Person] = ssc.socketTextStream(serverIP, serverPort)
.map(_.split(","))
.map(p => Person(p(0), p(1).toInt))
people.registerAsStream("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 10 && age <= 19")
// The results of SQL queries are themselves DStreams and support all the normal operations
teenagers.map(t => "Name: " + t(0)).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
Writing Language Integrated Relational Queries:
val teenagers = people.where('age >= 10).where('age <= 19).select('name).toDstream
Combining Hive
val ssc: StreamingContext
val streamHiveContext = new StreamHiveContext(ssc)
import streamHiveContext._
sql("CREATE STREAM IF NOT EXISTS src (key INT, value STRING) LOCATION socket://host:port")
sql("SELECT key, value FROM src").print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
Most of the language definitions are based on HiveQL, and Stream QL definitions from sqlstream . We aimed at making StreamSQL a convenient extension to the existing HiveQL, by reusing the original language features as much as possible and defining new language features in a similar style. Future changes may be made per request.
Note that only changes/extensions for StreamSQL are listed in this section. Others non-modified syntaxes (e.g., database related clauses) are omitted for simplicity. Many streaming QL clauses bear a keyword "STREAM" , which is equivalent to "TABLE" in table-oriented HiveQL
.
-
Create stream
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT stream_comment] [ [ROW FORMAT row_format] [STORED AS stream_format] ] [LOCATION stream_location] (Note: socket://host:port zk://host:port ....) [TBLPROPERTIES (property_name=property_value, ...)] -
Create stream like, enable create stream like xxx Table or vice versa.
CREATE STREAM empty_key_value_store LIKE key_value_store; -
Drop stream
DROP STREAM [IF EXISTS] stream_name -
Alter stream (this part will be added in phase 2)
ALTER STREAM stream_name RENAME TO new_stream_name ALTER STREAM stream_name SET TBLPROPERTIES table_properties table_properties: (property_name = property_value, property_name = property_value, ... ) ALTER STREAM stream_name SET TBLPROPERTIES ('comment' = new_comment); ALTER STREAM stream_name SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties] ALTER STREAM stream_name SET SERDEPROPERTIES serde_properties serde_properties: (property_name = property_value, property_name = property_value, ... ) ALTER STREAM stream_name SET SERDEPROPERTIES ('field.delim' = ','); -
Show tables/streams (this part will will reuse Hive's implementation)
SHOW TABLES [IN database_name] [identifier_with_wildcards]; SHOW TBLPROPERTIES tblname; SHOW TBLPROPERTIES tblname("foo"); --> extended SHOW STREAMS ([db_name.]stream_name|view_name) -
Describe stream (the same as HiveQL)
DESCRIBE [EXTENDED|FORMATTED] [db_name.]stream_name[ DOT col_name ( [DOT field_name] | [DOT '$elem$'] | [DOT '$key$'] | [DOT '$value$'] )* ]
-
Basic select clause
SELECT STREAM [ALL | DISTINCT] select_expr, select_expr, ...FROM stream_reference [WHERE where_condition] [LIMIT number]This will query on a stream and get the result in each
batchDuration. Results will be shown on the terminal by default. -
Aggregated select clause
SELECT STREAM [ALL | DISTINCT] select_expr, select_expr, Analytic_function(select_expr) [OVER] window_als FROM stream_reference [WHERE where_condition] [GROUP BY col_list] [LIMIT number] [WINDOW window_als AS (RANGE)]If
WINDOWis existed in query, aggregation should be based onwindowDuration, otherwise aggregation is based on defaultbatchDuration. -
Join-ed select clause
- Stream-to-stream join
- Stream-to-table join
The
batchDurationorwindow Durationshould be same for two join-ed streams, that's the constraint of Spark Streaming.SELECT STREAM select-expr... FROM {stream|table} AS s [OVER] (RANGE INTERVAL '1' HOUR PRECEDING) {LEFT|RIGHT|FULL} JOIN {stream|table} AS o [OVER] (RANGE INTERVAL '1' HOUR PRECEDING) ON join-condition -
Window clause
Traditionally for table related query, windowing function focuses on row-based window, which starts with a keyword ROWS. But for stream related query, a time-based window is more important, aggregation and join requires stream sliced by time-based window. So
WINDOWclause will be expanded to support time-based window, which starts with a keyword RANGE.Time-based windowing function takes the design of sqlstream for reference.
Simple architecture diagram shows as below:
To support StreamSQL on catalyst, several modules should be added or expanded:
- Extend current SQL language to support stream.
- Extend MetaStore to support stream DDL.
- Add stream reader to connect onto stream source to inject data.
- Modify physical plan to support generating DStream chain.
Catalyst's process is like below:
We will fully emulate Catalyst's original process and expand stream support, and our process will show as below:
Here as originally Catalyst has two entrances: SQLContext and HiveContext, we will build stream equivalences as StreamSQLContext and StreamHiveContext.
Here we will extend Hive antlr parser .g files to support stream related DDL and DML, including new STREAM keyword and tokens, rule descriptions.
Basic idea here is to fully leverage all existing Hive metastore and no modification on its schema definition, and only some API changes are required for DDL tasks. Thus, StreamSQL can easily be deployed on any existing Hive system, especially sharing its meta-store. The current schema of hive meta-store is fully re-used, besides several additional extensions listed below.
- Make the all stream table as external table by default.
- Add property of STREAM=’TRUE’.
- Add several parameters to describe spark-stream properties, e.g., BATCH_DURATION, STORAGE_LEVEL, or other stream-source related params.
Considering more reasonable solution for item 1 & 2, we'd like to add new STREAM TableType to extend existing hive TableTypes(i.e., managed, external, view,index) , instead of configuring "STREAM=TRUE" as a property item.
The major difference between original Catalyst and stream supported Catalyst is the physical plan. Compared to original physical plan that generates RDD dependencies, the new stream physical plan will generate DStream dependencies.
Also considering full use of exist Spark physical plan, without writing almost the same stream equivalent codes, our stream physical plan wraps corresponding Spark physical plan with extracting RDD from DStream in leaf nodes and injecting RDD to DStream in other nodes, rather than build almost the same code from ground. The below diagram roughly shows the process:
This process and transformation will generate DStream dependencies. After that, submitting this DStream graph to StreamingContext will periodically submitting job to DAGScheduler and return results.
Spark Streaming supports Kakfa, Flume, Zeromq, socket and other input sources, including customized input sources. All these sources require different parameters, produce different data formats. Our plan is to define a uniform stream input format to fill the gaps between different input sources. This part of work is under designing, as a phase one prototype, we may not provide generic input stream source.
- Hive parser modification to support CREATE STREAM and DROP STREAM is done.
- MetaStore related work is almost done and under debug.
- Stream related Catalyst back-end work including physical operators and strategies is done.
- Generic stream input format related work is under design. A workable specific input source is almost done and under debug.
Phase 1 (before the middle of May or earlier). To complete a workable end to end solution with following features:
- Support create/drop stream by SQL queries
- Support simple queries for kinds of streaming inputs
- stream-to-table join.
Phase 2:
- Enrich stream related DDL from parser to backend.
- Build a uniform generic stream input format and stream output format.
- Add more features like time-based windowing function and others.