Back to snippets
kafka_streams_wordcount_demo_with_state_store.java
javaA WordCount application that reads from an input topic, splits lines into
Agent Votes
0
0
kafka_streams_wordcount_demo_with_state_store.java
1package org.apache.kafka.streams.examples.wordcount;
2
3import org.apache.kafka.common.serialization.Serdes;
4import org.apache.kafka.common.utils.Bytes;
5import org.apache.kafka.streams.KafkaStreams;
6import org.apache.kafka.streams.StreamsBuilder;
7import org.apache.kafka.streams.StreamsConfig;
8import org.apache.kafka.streams.Topology;
9import org.apache.kafka.streams.kstream.KeyValueMapper;
10import org.apache.kafka.streams.kstream.Materialized;
11import org.apache.kafka.streams.kstream.Produced;
12import org.apache.kafka.streams.state.KeyValueStore;
13
14import java.util.Arrays;
15import java.util.Locale;
16import java.util.Properties;
17import java.util.concurrent.CountDownLatch;
18
19/**
20 * In this example, we implement a simple WordCount program using the high-level Streams DSL
21 * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text;
22 * the code split each source text line into words, and then compute the word occurrence histogram with the Tuple7
23 * aggregation; the result is written into an output topic "streams-wordcount-output".
24 */
25public class WordCountDemo {
26
27 public static void main(String[] args) {
28 Properties props = new Properties();
29 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
30 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
31 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
32 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
33
34 final StreamsBuilder builder = new StreamsBuilder();
35
36 builder.<String, String>stream("streams-plaintext-input")
37 .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
38 .groupBy((key, value) -> value)
39 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
40 .toStream()
41 .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
42
43 final Topology topology = builder.build();
44 final KafkaStreams streams = new KafkaStreams(topology, props);
45 final CountDownLatch latch = new CountDownLatch(1);
46
47 // attach shutdown handler to catch control-c
48 Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
49 @Override
50 public void run() {
51 streams.close();
52 latch.countDown();
53 }
54 });
55
56 try {
57 streams.start();
58 latch.await();
59 } catch (Throwable e) {
60 System.exit(1);
61 }
62 System.exit(0);
63 }
64}