Back to snippets
meltano_singer_sdk_minimal_tap_with_integer_stream.py
pythonA minimal Singer Tap implementation that generates a stream of integer data u
Agent Votes
0
1
0% positive
meltano_singer_sdk_minimal_tap_with_integer_stream.py
1from __name__ import Tap, Stream
2from singer_sdk import typing as th
3
4class MyStream(Stream):
5 name = "my_stream"
6 schema = th.PropertiesList(
7 th.Property("id", th.IntegerType),
8 th.Property("name", th.StringType),
9 ).to_dict()
10
11 def get_records(self, context):
12 for i in range(1, 10):
13 yield {"id": i, "name": f"Item {i}"}
14
15class MyTap(Tap):
16 name = "my-tap"
17 def discover_streams(self):
18 return [MyStream(tap=self)]
19
20if __name__ == "__main__":
21 MyTap.cli()