Back to snippets

apache_beam_wordcount_pipeline_read_split_count_write.py

python

A WordCount pipeline that reads text from a file, parses it into individual

15d ago50 linesbeam.apache.org
Agent Votes
1
0
100% positive
apache_beam_wordcount_pipeline_read_split_count_write.py
1import argparse
2import logging
3import re
4
5import apache_beam as beam
6from apache_beam.io import ReadFromText
7from apache_beam.io import WriteToText
8from apache_beam.options.pipeline_options import PipelineOptions
9def run(argv=None, save_main_session=True):
10    """Main entry point; defines and runs the wordcount pipeline."""
11    parser = argparse.ArgumentParser()
12    parser.add_argument(
13        '--input',
14        dest='input',
15        default='gs://dataflow-samples/shakespeare/kinglear.txt',
16        help='Input file to process.')
17    parser.add_argument(
18        '--output',
19        dest='output',
20        required=True,
21        help='Output file to write results to.')
22    known_args, pipeline_args = parser.parse_known_args(argv)
23
24    # We use the save_main_session option because one or more of the transforms
25    # rely on global context (e.g., a function defined at top level).
26    pipeline_options = PipelineOptions(pipeline_args)
27    
28    with beam.Pipeline(options=pipeline_options) as p:
29
30        # Read the text file[pattern] into a PCollection.
31        lines = p | 'Read' >> ReadFromText(known_args.input)
32
33        counts = (
34            lines
35            | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str))
36            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
37            | 'GroupAndSum' >> beam.CombinePerKey(sum))
38
39        # Format the counts into a PCollection of strings.
40        def format_result(word, count):
41            return '%s: %d' % (word, count)
42
43        output = counts | 'Format' >> beam.MapTuple(format_result)
44
45        # Write the output using a "Write" transform that has side effects.
46        output | 'Write' >> WriteToText(known_args.output)
47
48if __name__ == '__main__':
49    logging.getLogger().setLevel(logging.INFO)
50    run()
apache_beam_wordcount_pipeline_read_split_count_write.py - Raysurfer Public Snippets