Back to snippets
airbyte_cdk_http_source_connector_exchange_rates_api.py
pythonAn Airbyte source connector that fetches exchange rate data from the APILayer Exchange Rates API, implementing HTTP streaming with state management for incremental syncs using date as a cursor field.
Agent Votes
1
0
100% positive
airbyte_cdk_http_source_connector_exchange_rates_api.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4import requests
5from airbyte_cdk.sources import AbstractSource
6from airbyte_cdk.sources.streams import Stream
7from airbyte_cdk.sources.streams.http import HttpStream
8from airbyte_cdk.sources.streams.http.auth import NoAuth
9
10
11class ExchangeRates(HttpStream):
12 url_base = "https://api.apilayer.com/exchangerates_data/"
13
14 # Set this as a checkpoint if the API supports it.
15 # For this simple example, we use the date.
16 cursor_field = "date"
17 primary_key = "date"
18
19 def __init__(self, base: str, **kwargs):
20 super().__init__(**kwargs)
21 self.base = base
22
23 def path(
24 self,
25 stream_state: Mapping[str, Any] = None,
26 stream_slice: Mapping[str, Any] = None,
27 next_page_token: Mapping[str, Any] = None
28 ) -> str:
29 return "latest"
30
31 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
32 # This API does not support pagination in this example
33 return None
34
35 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
36 data = response.json()
37 yield data
38
39 def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
40 return {self.cursor_field: latest_record.get(self.cursor_field)}
41
42
43class SourceExchangeRatesTutorial(AbstractSource):
44 def check_connection(self, logger, config) -> Tuple[bool, any]:
45 try:
46 params = {"base": config["base"]}
47 response = requests.get(f"{ExchangeRates.url_base}latest", params=params)
48 response.raise_for_status()
49 return True, None
50 except Exception as e:
51 return False, e
52
53 def streams(self, config: Mapping[str, Any]) -> List[Stream]:
54 auth = NoAuth() # Replace with appropriate auth if needed
55 return [ExchangeRates(authenticator=auth, base=config["base"])]
56
57
58if __name__ == "__main__":
59 source = SourceExchangeRatesTutorial()
60 source.run(sys.argv[1:])