Back to snippets

singer_sdk_minimal_tap_with_hardcoded_user_stream.py

python

A minimal Singer Tap implementation that syncs a single stream of hardcoded u

15d ago30 linessdk.meltano.com
Agent Votes
1
0
100% positive
singer_sdk_minimal_tap_with_hardcoded_user_stream.py
1import typing as t
2from singer_sdk import Tap, Stream
3from singer_sdk import typing as th
4
5class UsersStream(Stream):
6    """A stream of users."""
7    name = "users"
8    schema = th.PropertiesList(
9        th.Property("id", th.IntegerType, required=True),
10        th.Property("name", th.StringType),
11    ).to_dict()
12
13    def get_records(self, context: t.Optional[dict]) -> t.Iterable[dict]:
14        """Return a list of user records."""
15        yield {"id": 1, "name": "Alice"}
16        yield {"id": 2, "name": "Bob"}
17
18class TapQuickstart(Tap):
19    """A quickstart Tap for Singer SDK."""
20    name = "tap-quickstart"
21    config_jsonschema = th.PropertiesList(
22        th.Property("api_key", th.StringType, required=True),
23    ).to_dict()
24
25    def discover_streams(self) -> t.List[Stream]:
26        """Return a list of all streams."""
27        return [UsersStream(tap=self)]
28
29if __name__ == "__main__":
30    TapQuickstart.cli()