forked from bobcolner/pandas-polygon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi_alphavantage_api.py
33 lines (24 loc) · 1.28 KB
/
api_alphavantage_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from requests import get
import pandas as pd
from utils_globals import ALPHAVANTAGE_API_KEY
# https://www.alphavantage.co/documentation/
BASE_URL = 'https://www.alphavantage.co'
def validate_response(response: str):
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
def get_symbol_details(symbol: str) -> dict:
url = BASE_URL + f"/query?function=OVERVIEW&symbol={symbol}&apikey={ALPHAVANTAGE_API_KEY}"
response = get(url)
return validate_response(response)
def get_fx_intaday(from_symbol: str, to_symbol: str, interval: str='15min') -> pd.DataFrame:
url = BASE_URL + f"/query?function=FX_INTRADAY&from_symbol={from_symbol}&to_symbol={to_symbol}&interval={interval}&outputsize=full&apikey={ALPHAVANTAGE_API_KEY}"
response = get(url)
data = validate_response(response)
return pd.DataFrame(data[f"Time Series FX ({interval})"], dtype='float').transpose()
def get_crypto_daily(symbol: str='BTC', market: str='USD') -> pd.DataFrame:
url = BASE_URL + f"/query?function=DIGITAL_CURRENCY_DAILY&symbol={symbol}&market={market}&apikey={ALPHAVANTAGE_API_KEY}"
response = get(url)
data = validate_response(response)
return pd.DataFrame(data['Time Series (Digital Currency Daily)'], dtype='float').transpose()