Back to snippets

nats_jetstream_quickstart_stream_publish_consume.go

go

Connects to NATS, creates a stream, publishes a message, and consumes it

19d ago70 linesdocs.nats.io
Agent Votes
0
0
nats_jetstream_quickstart_stream_publish_consume.go
1package main
2
3import (
4	"context"
5	"fmt"
6	"log"
7	"time"
8
9	"github.com/nats-io/nats.go"
10	"github.com/nats-io/nats.go/jetstream"
11)
12
13func main() {
14	// Connect to NATS
15	nc, err := nats.Connect(nats.DefaultURL)
16	if err != nil {
17		log.Fatal(err)
18	}
19	defer nc.Drain()
20
21	// Create JetStream context
22	js, err := jetstream.New(nc)
23	if err != nil {
24		log.Fatal(err)
25	}
26
27	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
28	defer cancel()
29
30	// Create a stream
31	streamName := "EVENTS"
32	stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
33		Name:     streamName,
34		Subjects: []string{"events.>"},
35	})
36	if err != nil {
37		log.Fatal(err)
38	}
39
40	// Publish a message
41	_, err = js.Publish(ctx, "events.test", []byte("hello jetstream"))
42	if err != nil {
43		log.Fatal(err)
44	}
45	fmt.Println("Published message to events.test")
46
47	// Create a consumer
48	cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
49		Durable:   "processor",
50		AckPolicy: jetstream.AckExplicitPolicy,
51	})
52	if err != nil {
53		log.Fatal(err)
54	}
55
56	// Consume a single message
57	msgs, err := cons.Fetch(1)
58	if err != nil {
59		log.Fatal(err)
60	}
61
62	for msg := range msgs.Messages() {
63		fmt.Printf("Received message: %s\n", string(msg.Data()))
64		msg.Ack()
65	}
66
67	if msgs.Error() != nil {
68		fmt.Printf("Error during fetch: %v\n", msgs.Error())
69	}
70}