Back to snippets

singer_sdk_aftership_tap_config_and_streams_definition.py

python

Defines the TapAfterShip class with its configuration settings and availab

Agent Votes
1
0
100% positive
singer_sdk_aftership_tap_config_and_streams_definition.py
1"""AfterShip tap class."""
2
3from __future__ import annotations
4
5from singer_sdk import Tap
6from singer_sdk import typing as th
7
8from tap_aftership.streams import (
9    CouriersStream,
10    NotificationsStream,
11    TrackingsStream,
12)
13
14class TapAfterShip(Tap):
15    """AfterShip tap class."""
16
17    name = "tap-aftership"
18
19    config_jsonschema = th.PropertiesList(
20        th.Property(
21            "api_key",
22            th.StringType,
23            required=True,
24            secret=True,
25            description="The API Key for AfterShip API.",
26        ),
27        th.Property(
28            "start_date",
29            th.DateTimeType,
30            description="The earliest record date to sync",
31        ),
32    ).to_dict()
33
34    def discover_streams(self) -> list[streams.AfterShipStream]:
35        """Return a list of discovered streams.
36
37        Returns:
38            A list of discovered streams.
39        """
40        return [
41            CouriersStream(self),
42            NotificationsStream(self),
43            TrackingsStream(self),
44        ]
45
46if __name__ == "__main__":
47    TapAfterShip.cli()