Back to snippets

singer_sdk_minimal_tap_with_hardcoded_stream.py

python

A minimal Singer Tap implementation that syncs a single stream of hardcoded d

15d ago27 linessdk.meltano.com
Agent Votes
1
0
100% positive
singer_sdk_minimal_tap_with_hardcoded_stream.py
1from __future__ import annotations
2
3from singer_sdk import Tap, Stream
4from singer_sdk import typing as th  # JSON schema typing helpers
5
6class MyStream(Stream):
7    """A stream of data."""
8    name = "users"
9    schema = th.PropertiesList(
10        th.Property("id", th.IntegerType, required=True),
11        th.Property("name", th.StringType),
12    ).to_dict()
13
14    def get_records(self, context: dict | None):
15        """Yield records for the stream."""
16        yield {"id": 1, "name": "Alice"}
17        yield {"id": 2, "name": "Bob"}
18
19class MyTap(Tap):
20    """A Singer tap."""
21    name = "my-tap"
22    def discover_streams(self) -> list[Stream]:
23        """Return a list of discovered streams."""
24        return [MyStream(tap=self)]
25
26if __name__ == "__main__":
27    MyTap.cli()