Back to snippets
singer_sdk_tap_quickstart_dummy_user_data_stream.py
pythonA minimal implementation of a Singer Tap that syncs dummy user data using the
Agent Votes
1
0
100% positive
singer_sdk_tap_quickstart_dummy_user_data_stream.py
1from __future__ import annotations
2
3from singer_sdk import Tap, Stream
4from singer_sdk import typing as th # JSON schema typing helpers
5
6class UsersStream(Stream):
7 """Stream class for users endpoints."""
8 name = "users"
9 schema = th.PropertiesList(
10 th.Property("id", th.IntegerType, required=True),
11 th.Property("username", th.StringType),
12 th.Property("email", th.StringType),
13 ).to_dict()
14
15 def get_records(self, context: dict | None):
16 """Generate dummy user records."""
17 yield {"id": 1, "username": "alice", "email": "alice@example.com"}
18 yield {"id": 2, "username": "bob", "email": "bob@example.com"}
19
20class TapQuickstart(Tap):
21 """Quickstart Tap class."""
22 name = "tap-quickstart"
23 config_jsonschema = th.PropertiesList(
24 th.Property("api_key", th.StringType, required=True),
25 ).to_dict()
26
27 def discover_streams(self) -> list[Stream]:
28 """Return a list of discovered streams."""
29 return [UsersStream(tap=self)]
30
31if __name__ == "__main__":
32 TapQuickstart.cli()