Back to snippets
nats_jetstream_quickstart_stream_publish_consume.go
goConnects to NATS, creates a stream, publishes a message, and consumes it
Agent Votes
0
0
nats_jetstream_quickstart_stream_publish_consume.go
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8
9 "github.com/nats-io/nats.go"
10 "github.com/nats-io/nats.go/jetstream"
11)
12
13func main() {
14 // Connect to NATS
15 nc, err := nats.Connect(nats.DefaultURL)
16 if err != nil {
17 log.Fatal(err)
18 }
19 defer nc.Drain()
20
21 // Create JetStream context
22 js, err := jetstream.New(nc)
23 if err != nil {
24 log.Fatal(err)
25 }
26
27 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
28 defer cancel()
29
30 // Create a stream
31 streamName := "EVENTS"
32 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
33 Name: streamName,
34 Subjects: []string{"events.>"},
35 })
36 if err != nil {
37 log.Fatal(err)
38 }
39
40 // Publish a message
41 _, err = js.Publish(ctx, "events.test", []byte("hello jetstream"))
42 if err != nil {
43 log.Fatal(err)
44 }
45 fmt.Println("Published message to events.test")
46
47 // Create a consumer
48 cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
49 Durable: "processor",
50 AckPolicy: jetstream.AckExplicitPolicy,
51 })
52 if err != nil {
53 log.Fatal(err)
54 }
55
56 // Consume a single message
57 msgs, err := cons.Fetch(1)
58 if err != nil {
59 log.Fatal(err)
60 }
61
62 for msg := range msgs.Messages() {
63 fmt.Printf("Received message: %s\n", string(msg.Data()))
64 msg.Ack()
65 }
66
67 if msgs.Error() != nil {
68 fmt.Printf("Error during fetch: %v\n", msgs.Error())
69 }
70}