@@ -34,24 +34,25 @@ def __init__(self, window_size):
3434 self .window_size = int (window_size * 60 )
3535
3636 def expand (self , pcoll ):
37- return (pcoll
38- # Assigns window info to each Pub/Sub message based on its
39- # publish timestamp.
40- | 'Window into Fixed Intervals' >> beam .WindowInto (
41- window .FixedWindows (self .window_size ))
42- | 'Add timestamps to messages' >> (beam .ParDo (AddTimestamps ()))
43- # Use a dummy key to group the elements in the same window.
44- # Note that all the elements in one window must fit into memory
45- # for this. If the windowed elements do not fit into memory,
46- # please consider using `beam.util.BatchElements`.
47- # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
48- | 'Add Dummy Key' >> beam .Map (lambda elem : (None , elem ))
49- | 'Groupby' >> beam .GroupByKey ()
50- | 'Abandon Dummy Key' >> beam .MapTuple (lambda _ , val : val ))
37+ return (
38+ pcoll
39+ # Assigns window info to each Pub/Sub message based on its
40+ # publish timestamp.
41+ | "Window into Fixed Intervals"
42+ >> beam .WindowInto (window .FixedWindows (self .window_size ))
43+ | "Add timestamps to messages" >> beam .ParDo (AddTimestamps ())
44+ # Use a dummy key to group the elements in the same window.
45+ # Note that all the elements in one window must fit into memory
46+ # for this. If the windowed elements do not fit into memory,
47+ # please consider using `beam.util.BatchElements`.
48+ # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
49+ | "Add Dummy Key" >> beam .Map (lambda elem : (None , elem ))
50+ | "Groupby" >> beam .GroupByKey ()
51+ | "Abandon Dummy Key" >> beam .MapTuple (lambda _ , val : val )
52+ )
5153
5254
5355class AddTimestamps (beam .DoFn ):
54-
5556 def process (self , element , publish_time = beam .DoFn .TimestampParam ):
5657 """Processes each incoming windowed element by extracting the Pub/Sub
5758 message and its publish timestamp into a dictionary. `publish_time`
@@ -60,61 +61,72 @@ def process(self, element, publish_time=beam.DoFn.TimestampParam):
6061 """
6162
6263 yield {
63- 'message_body' : element .decode ('utf-8' ),
64- 'publish_time' : datetime .datetime .utcfromtimestamp (
65- float (publish_time )).strftime ("%Y-%m-%d %H:%M:%S.%f" ),
64+ "message_body" : element .decode ("utf-8" ),
65+ "publish_time" : datetime .datetime .utcfromtimestamp (
66+ float (publish_time )
67+ ).strftime ("%Y-%m-%d %H:%M:%S.%f" ),
6668 }
6769
6870
6971class WriteBatchesToGCS (beam .DoFn ):
70-
7172 def __init__ (self , output_path ):
7273 self .output_path = output_path
7374
7475 def process (self , batch , window = beam .DoFn .WindowParam ):
7576 """Write one batch per file to a Google Cloud Storage bucket. """
7677
77- ts_format = ' %H:%M'
78+ ts_format = " %H:%M"
7879 window_start = window .start .to_utc_datetime ().strftime (ts_format )
7980 window_end = window .end .to_utc_datetime ().strftime (ts_format )
80- filename = '-' .join ([self .output_path , window_start , window_end ])
81+ filename = "-" .join ([self .output_path , window_start , window_end ])
8182
82- with beam .io .gcp .gcsio .GcsIO ().open (filename = filename , mode = 'w' ) as f :
83+ with beam .io .gcp .gcsio .GcsIO ().open (filename = filename , mode = "w" ) as f :
8384 for element in batch :
84- f .write (' {}\n ' .format (json .dumps (element )).encode (' utf-8' ))
85+ f .write (" {}\n " .format (json .dumps (element )).encode (" utf-8" ))
8586
8687
8788def run (input_topic , output_path , window_size = 1.0 , pipeline_args = None ):
8889 # `save_main_session` is set to true because some DoFn's rely on
8990 # globally imported modules.
9091 pipeline_options = PipelineOptions (
91- pipeline_args , streaming = True , save_main_session = True )
92+ pipeline_args , streaming = True , save_main_session = True
93+ )
9294
9395 with beam .Pipeline (options = pipeline_options ) as pipeline :
94- (pipeline
95- | 'Read PubSub Messages' >> beam .io .ReadFromPubSub (topic = input_topic )
96- | 'Window into' >> GroupWindowsIntoBatches (window_size )
97- | 'Write to GCS' >> beam .ParDo (WriteBatchesToGCS (output_path )))
96+ (
97+ pipeline
98+ | "Read PubSub Messages"
99+ >> beam .io .ReadFromPubSub (topic = input_topic )
100+ | "Window into" >> GroupWindowsIntoBatches (window_size )
101+ | "Write to GCS" >> beam .ParDo (WriteBatchesToGCS (output_path ))
102+ )
98103
99104
100- if __name__ == ' __main__' : # noqa
105+ if __name__ == " __main__" : # noqa
101106 logging .getLogger ().setLevel (logging .INFO )
102107
103108 parser = argparse .ArgumentParser ()
104109 parser .add_argument (
105- '--input_topic' ,
106- help = 'The Cloud Pub/Sub topic to read from.\n '
107- '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".' )
110+ "--input_topic" ,
111+ help = "The Cloud Pub/Sub topic to read from.\n "
112+ '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".' ,
113+ )
108114 parser .add_argument (
109- ' --window_size' ,
115+ " --window_size" ,
110116 type = float ,
111117 default = 1.0 ,
112- help = 'Output file\' s window size in number of minutes.' )
118+ help = "Output file's window size in number of minutes." ,
119+ )
113120 parser .add_argument (
114- '--output_path' ,
115- help = 'GCS Path of the output file including filename prefix.' )
121+ "--output_path" ,
122+ help = "GCS Path of the output file including filename prefix." ,
123+ )
116124 known_args , pipeline_args = parser .parse_known_args ()
117125
118- run (known_args .input_topic , known_args .output_path , known_args .window_size ,
119- pipeline_args )
126+ run (
127+ known_args .input_topic ,
128+ known_args .output_path ,
129+ known_args .window_size ,
130+ pipeline_args ,
131+ )
120132# [END pubsub_to_gcs]
0 commit comments