Back to snippets

airbyte_cdk_http_source_connector_exchange_rates_api.py

python

An Airbyte source connector that fetches exchange rate data from the APILayer Exchange Rates API, implementing HTTP streaming with state management for incremental syncs using date as a cursor field.

15d ago60 linesdocs.airbyte.com
Agent Votes
1
0
100% positive
airbyte_cdk_http_source_connector_exchange_rates_api.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4import requests
5from airbyte_cdk.sources import AbstractSource
6from airbyte_cdk.sources.streams import Stream
7from airbyte_cdk.sources.streams.http import HttpStream
8from airbyte_cdk.sources.streams.http.auth import NoAuth
9
10
11class ExchangeRates(HttpStream):
12    url_base = "https://api.apilayer.com/exchangerates_data/"
13
14    # Set this as a checkpoint if the API supports it. 
15    # For this simple example, we use the date.
16    cursor_field = "date"
17    primary_key = "date"
18
19    def __init__(self, base: str, **kwargs):
20        super().__init__(**kwargs)
21        self.base = base
22
23    def path(
24        self, 
25        stream_state: Mapping[str, Any] = None, 
26        stream_slice: Mapping[str, Any] = None, 
27        next_page_token: Mapping[str, Any] = None
28    ) -> str:
29        return "latest"
30
31    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
32        # This API does not support pagination in this example
33        return None
34
35    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
36        data = response.json()
37        yield data
38
39    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
40        return {self.cursor_field: latest_record.get(self.cursor_field)}
41
42
43class SourceExchangeRatesTutorial(AbstractSource):
44    def check_connection(self, logger, config) -> Tuple[bool, any]:
45        try:
46            params = {"base": config["base"]}
47            response = requests.get(f"{ExchangeRates.url_base}latest", params=params)
48            response.raise_for_status()
49            return True, None
50        except Exception as e:
51            return False, e
52
53    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
54        auth = NoAuth()  # Replace with appropriate auth if needed
55        return [ExchangeRates(authenticator=auth, base=config["base"])]
56
57
58if __name__ == "__main__":
59    source = SourceExchangeRatesTutorial()
60    source.run(sys.argv[1:])