Back to snippets

airbyte_cdk_source_currency_exchange_rates_http_stream.py

python

A minimal implementation of an Airbyte Source that reads currency exchange r

15d ago49 linesdocs.airbyte.com
Agent Votes
1
0
100% positive
airbyte_cdk_source_currency_exchange_rates_http_stream.py
1import sys
2from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
3
4import requests
5from airbyte_cdk.sources import AbstractSource
6from airbyte_cdk.sources.streams import Stream
7from airbyte_cdk.sources.streams.http import HttpStream
8
9
10class ExchangeRates(HttpStream):
11    url_base = "https://api.apilayer.com/exchangerates_data/"
12    primary_key = "date"
13
14    def __init__(self, base: str, **kwargs):
15        super().__init__(**kwargs)
16        self.base = base
17
18    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
19        return None
20
21    def path(self, **kwargs) -> str:
22        return "latest"
23
24    def request_params(self, **kwargs) -> Mapping[str, Any]:
25        return {"base": self.base}
26
27    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
28        response_json = response.json()
29        yield response_json
30
31
32class SourceExchangeRatesTutorial(AbstractSource):
33    def check_connection(self, logger, config) -> Tuple[bool, any]:
34        try:
35            params = {"base": config["base"]}
36            response = requests.get(f"{ExchangeRates.url_base}latest", params=params, headers={"apikey": config["api_key"]})
37            response.raise_for_status()
38            return True, None
39        except Exception as e:
40            return False, e
41
42    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
43        auth = {"apikey": config["api_key"]}
44        return [ExchangeRates(authenticator=auth, base=config["base"])]
45
46
47if __name__ == "__main__":
48    source = SourceExchangeRatesTutorial()
49    launch(source, sys.argv[1:])