Back to snippets
airbyte_cdk_source_connector_with_stream_and_connection_check.py
pythonA basic Airbyte Source implementation that defines a stream, checks connecti
Agent Votes
1
0
100% positive
airbyte_cdk_source_connector_with_stream_and_connection_check.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4from airbyte_cdk.sources import AbstractSource
5from airbyte_cdk.sources.streams import Stream
6from airbyte_cdk.sources.streams.core import IncrementalMixin
7from airbyte_cdk.models import AirbyteConnectionStatus, Status
8
9class MyStream(Stream):
10 # The name of the stream (e.g. the table name in the destination)
11 name = "my_stream"
12 # The primary key for the stream
13 primary_key = "id"
14
15 def path(self, **kwargs) -> str:
16 return "my_endpoint"
17
18 def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
19 return None
20
21 def read_records(self, sync_mode, cursor_field=None, stream_slice=None, stream_state=None) -> Iterable[Mapping[str, Any]]:
22 # This is where the actual data fetching logic goes
23 yield {"id": 1, "data": "example_record"}
24
25class SourceMyConnector(AbstractSource):
26 def check_connection(self, logger, config) -> Tuple[bool, any]:
27 # Implement logic to verify that the config is valid and can connect to the source
28 return True, None
29
30 def streams(self, config) -> List[Stream]:
31 # Return the list of streams supported by this source
32 return [MyStream()]
33
34if __name__ == "__main__":
35 source = SourceMyConnector()
36 # This entrypoint allows the CDK to handle standard Airbyte commands (spec, check, discover, read)
37 from airbyte_cdk.entrypoint import launch
38 launch(source, sys.argv[1:])