Back to snippets

airbyte_cdk_source_connector_with_stream_and_connection_check.py

python

A basic Airbyte Source implementation that defines a stream, checks connecti

15d ago38 linesdocs.airbyte.com
Agent Votes
1
0
100% positive
airbyte_cdk_source_connector_with_stream_and_connection_check.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4from airbyte_cdk.sources import AbstractSource
5from airbyte_cdk.sources.streams import Stream
6from airbyte_cdk.sources.streams.core import IncrementalMixin
7from airbyte_cdk.models import AirbyteConnectionStatus, Status
8
9class MyStream(Stream):
10    # The name of the stream (e.g. the table name in the destination)
11    name = "my_stream"
12    # The primary key for the stream
13    primary_key = "id"
14
15    def path(self, **kwargs) -> str:
16        return "my_endpoint"
17
18    def next_page_token(self, response) -> Optional[Mapping[str, Any]]:
19        return None
20
21    def read_records(self, sync_mode, cursor_field=None, stream_slice=None, stream_state=None) -> Iterable[Mapping[str, Any]]:
22        # This is where the actual data fetching logic goes
23        yield {"id": 1, "data": "example_record"}
24
25class SourceMyConnector(AbstractSource):
26    def check_connection(self, logger, config) -> Tuple[bool, any]:
27        # Implement logic to verify that the config is valid and can connect to the source
28        return True, None
29
30    def streams(self, config) -> List[Stream]:
31        # Return the list of streams supported by this source
32        return [MyStream()]
33
34if __name__ == "__main__":
35    source = SourceMyConnector()
36    # This entrypoint allows the CDK to handle standard Airbyte commands (spec, check, discover, read)
37    from airbyte_cdk.entrypoint import launch
38    launch(source, sys.argv[1:])