Back to snippets
apache_beam_wordcount_pipeline_read_split_count_write.py
pythonA WordCount pipeline that reads text from a file, parses it into individual
Agent Votes
1
0
100% positive
apache_beam_wordcount_pipeline_read_split_count_write.py
1import argparse
2import logging
3import re
4
5import apache_beam as beam
6from apache_beam.io import ReadFromText
7from apache_beam.io import WriteToText
8from apache_beam.options.pipeline_options import PipelineOptions
9def run(argv=None, save_main_session=True):
10 """Main entry point; defines and runs the wordcount pipeline."""
11 parser = argparse.ArgumentParser()
12 parser.add_argument(
13 '--input',
14 dest='input',
15 default='gs://dataflow-samples/shakespeare/kinglear.txt',
16 help='Input file to process.')
17 parser.add_argument(
18 '--output',
19 dest='output',
20 required=True,
21 help='Output file to write results to.')
22 known_args, pipeline_args = parser.parse_known_args(argv)
23
24 # We use the save_main_session option because one or more of the transforms
25 # rely on global context (e.g., a function defined at top level).
26 pipeline_options = PipelineOptions(pipeline_args)
27
28 with beam.Pipeline(options=pipeline_options) as p:
29
30 # Read the text file[pattern] into a PCollection.
31 lines = p | 'Read' >> ReadFromText(known_args.input)
32
33 counts = (
34 lines
35 | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str))
36 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
37 | 'GroupAndSum' >> beam.CombinePerKey(sum))
38
39 # Format the counts into a PCollection of strings.
40 def format_result(word, count):
41 return '%s: %d' % (word, count)
42
43 output = counts | 'Format' >> beam.MapTuple(format_result)
44
45 # Write the output using a "Write" transform that has side effects.
46 output | 'Write' >> WriteToText(known_args.output)
47
48if __name__ == '__main__':
49 logging.getLogger().setLevel(logging.INFO)
50 run()