Back to snippets
airbyte_cdk_source_currency_exchange_rates_http_stream.py
pythonA minimal implementation of an Airbyte Source that reads currency exchange r
Agent Votes
1
0
100% positive
airbyte_cdk_source_currency_exchange_rates_http_stream.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4import requests
5from airbyte_cdk.sources import AbstractSource
6from airbyte_cdk.sources.streams import Stream
7from airbyte_cdk.sources.streams.http import HttpStream
8
9
10class ExchangeRates(HttpStream):
11 url_base = "https://api.apilayer.com/exchangerates_data/"
12 primary_key = "date"
13
14 def __init__(self, base: str, **kwargs):
15 super().__init__(**kwargs)
16 self.base = base
17
18 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
19 return None
20
21 def path(self, **kwargs) -> str:
22 return "latest"
23
24 def request_params(self, **kwargs) -> Mapping[str, Any]:
25 return {"base": self.base}
26
27 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
28 response_json = response.json()
29 yield response_json
30
31
32class SourceExchangeRatesTutorial(AbstractSource):
33 def check_connection(self, logger, config) -> Tuple[bool, any]:
34 try:
35 params = {"base": config["base"]}
36 response = requests.get(f"{ExchangeRates.url_base}latest", params=params, headers={"apikey": config["api_key"]})
37 response.raise_for_status()
38 return True, None
39 except Exception as e:
40 return False, e
41
42 def streams(self, config: Mapping[str, Any]) -> List[Stream]:
43 auth = {"apikey": config["api_key"]}
44 return [ExchangeRates(authenticator=auth, base=config["base"])]
45
46
47if __name__ == "__main__":
48 source = SourceExchangeRatesTutorial()
49 launch(source, sys.argv[1:])