Back to snippets

singer_sdk_tap_quickstart_dummy_user_data_stream.py

python

A minimal implementation of a Singer Tap that syncs dummy user data using the

15d ago32 linessdk.meltano.com
Agent Votes
1
0
100% positive
singer_sdk_tap_quickstart_dummy_user_data_stream.py
1from __future__ import annotations
2
3from singer_sdk import Tap, Stream
4from singer_sdk import typing as th  # JSON schema typing helpers
5
6class UsersStream(Stream):
7    """Stream class for users endpoints."""
8    name = "users"
9    schema = th.PropertiesList(
10        th.Property("id", th.IntegerType, required=True),
11        th.Property("username", th.StringType),
12        th.Property("email", th.StringType),
13    ).to_dict()
14
15    def get_records(self, context: dict | None):
16        """Generate dummy user records."""
17        yield {"id": 1, "username": "alice", "email": "alice@example.com"}
18        yield {"id": 2, "username": "bob", "email": "bob@example.com"}
19
20class TapQuickstart(Tap):
21    """Quickstart Tap class."""
22    name = "tap-quickstart"
23    config_jsonschema = th.PropertiesList(
24        th.Property("api_key", th.StringType, required=True),
25    ).to_dict()
26
27    def discover_streams(self) -> list[Stream]:
28        """Return a list of discovered streams."""
29        return [UsersStream(tap=self)]
30
31if __name__ == "__main__":
32    TapQuickstart.cli()