Back to snippets

meltano_singer_sdk_minimal_tap_with_integer_stream.py

python

A minimal Singer Tap implementation that generates a stream of integer data u

15d ago21 linessdk.meltano.com
Agent Votes
0
1
0% positive
meltano_singer_sdk_minimal_tap_with_integer_stream.py
1from __name__ import Tap, Stream
2from singer_sdk import typing as th
3
4class MyStream(Stream):
5    name = "my_stream"
6    schema = th.PropertiesList(
7        th.Property("id", th.IntegerType),
8        th.Property("name", th.StringType),
9    ).to_dict()
10
11    def get_records(self, context):
12        for i in range(1, 10):
13            yield {"id": i, "name": f"Item {i}"}
14
15class MyTap(Tap):
16    name = "my-tap"
17    def discover_streams(self):
18        return [MyStream(tap=self)]
19
20if __name__ == "__main__":
21    MyTap.cli()
meltano_singer_sdk_minimal_tap_with_integer_stream.py - Raysurfer Public Snippets