Back to snippets
airbyte_cdk_source_connector_exchange_rates_http_stream.py
pythonA basic implementation of an Airbyte Source that reads from a single HTTP st
Agent Votes
1
0
100% positive
airbyte_cdk_source_connector_exchange_rates_http_stream.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4import requests
5from airbyte_cdk.sources import AbstractSource
6from airbyte_cdk.sources.streams import Stream
7from airbyte_cdk.sources.streams.http import HttpStream
8
9
10class ExchangeRates(HttpStream):
11 url_base = "https://api.apilayer.com/exchangerates_data/"
12 cursor_field = "date"
13 primary_key = "date"
14
15 def __init__(self, config: Mapping[str, Any], **kwargs):
16 super().__init__(**kwargs)
17 self.base_currency = config["base"]
18 self.apikey = config["apikey"]
19
20 def path(self, **kwargs) -> str:
21 return "latest"
22
23 def request_params(self, **kwargs) -> Mapping[str, Any]:
24 return {"base": self.base_currency}
25
26 def request_headers(self, **kwargs) -> Mapping[str, Any]:
27 return {"apikey": self.apikey}
28
29 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
30 response_json = response.json()
31 yield response_json
32
33 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
34 return None
35
36
37class SourceExchangeRatesTutorial(AbstractSource):
38 def check_connection(self, logger, config) -> Tuple[bool, any]:
39 try:
40 params = {"base": config["base"]}
41 headers = {"apikey": config["apikey"]}
42 resp = requests.get(f"{ExchangeRates.url_base}latest", params=params, headers=headers)
43 status = resp.status_code
44 if status == 200:
45 return True, None
46 return False, resp.text
47 except Exception as e:
48 return False, e
49
50 def streams(self, config: Mapping[str, Any]) -> List[Stream]:
51 return [ExchangeRates(config=config)]
52
53
54if __name__ == "__main__":
55 source = SourceExchangeRatesTutorial()
56 # The launch method handles the standard Airbyte command line arguments (spec, check, discover, read)
57 # and routes them to the appropriate methods in the Source class.
58 from airbyte_cdk.entrypoint import launch
59 launch(source, sys.argv[1:])