Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

해당 feature에 대한 설명 #1

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
airflow
#3.8.10
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<li>upbit</li>
<li>Binance</li>
</ul>

만약 해당 dags를 실행하고자 한다면 my_package파일을 python path에 적용 시켜 주셔야 합니다.
<br>
<br>
<hr>
Expand Down
35 changes: 0 additions & 35 deletions dags/market/extrection/b4_data.py

This file was deleted.

2 changes: 2 additions & 0 deletions dags/market/extrection/coin_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from upbit_def import min_data

local_tz = pendulum.timezone("Asia/Seoul")
execution_date='{{execution_date.add(hours=9).strftime("%Y%m%d")}}/'
data_local_route='$data_route'

####이름 정보를 가져오는 dag
default_args = {
Expand Down
38 changes: 38 additions & 0 deletions dags/market/extrection/data.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dags/market/extrection/minute_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
bash_command="echo 'END'"
)

exe_date = '{{execution_date.add(hours=9).strftime("%Y%m%d%")}}'
exe_date = '{{execution_date.add(hours=9).strftime("%Y%m%d")}}'
exe_hour = '{{execution_date.add(hours=9).strftime("%H")}}'
exe_min = '{{execution_date.add(hours=9).strftime(%M)}}'
local_min = f'$HOME/tmp/data/minute/{exe_date}/{exe_hour}/{exe_min}'
Expand Down Expand Up @@ -66,4 +66,4 @@
bash_command=f'{line_mesg}'
)

start >> dir_make >> calling_name >> upload_to_dfs >> error >> end
start >> dir_make >> calling_name >> upload_to_dfs >> [error,end]
53 changes: 53 additions & 0 deletions dags/market/extrection/old_min_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime
import pendulum
from upbit_def import min_data,api_with_execute


local_tz = pendulum.timezone("Asia/Seoul")
line_mesg = "curl -X POST -H 'Authorization: Bearer 6Y3IVP0dZD9bREhqMS4pd0sZZg5QAh3N9eAcixrovns' -F 'message=this task has been error' https://notify-api.line.me/api/notify"
s_line_mesg = "curl -X POST -H 'Authorization: Bearer 6Y3IVP0dZD9bREhqMS4pd0sZZg5QAh3N9eAcixrovns' -F 'message=this task has been completed' https://notify-api.line.me/api/notify"

####DAGS
default_args = {
'owner': 'merlin',
'depends_on_past': False,
'start_date': datetime(2023, 7, 5,tzinfo=local_tz),
'retries': 5,
}

with DAG(
'b4_data',
default_args=default_args,
schedule_interval='* * * * *'
)as dag:
start=BashOperator(
task_id = 'start',
bash_command="echo 'start'"
)
end = BashOperator(
task_id = 'END',
bash_command="echo 'END'"
)
data_saver=PythonOperator(
task_id='name_roller',
python_callable=api_with_execute,
op_args=[route,name_file]
)
fail_messager=BashOperator(
task_id='messager',
bash_command=f"{line_mesg}"
)
sussced_message=BashOperator(
task_id='sussced_message',
bash_command=f"{s_line_mesg}"
)



start >> data_saver >> [fail_messager,sussced_message] >> end


Empty file removed my_package/test.ipynb
Empty file.
93 changes: 93 additions & 0 deletions my_package/upbit_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#####데이터베이스 테이블 이름은 -가 사용 불가하여 _로 변경
def min_data(savePath):
json_name = f'{savePath}/name.json'

df = pd.read_json(json_name)

df.drop(columns=['korean_name','english_name'],axis=1,inplace=True)
Expand Down Expand Up @@ -65,4 +66,96 @@ def compare_by_df(origin,new):

return print("name data didn't changed")

def status_adder(name_path):

name_df = pd.read_json(name_path)

name_df['status'] = 'non_start'

name_df.set_index('status', inplace=True)

name_df.to_json('name_path')

def status_checker(name_path):

name_df=pd.read_json(name_path)

for index, row in name_df.iterrows():

if row['status'] == 'non_started' or row['status'] == 'progress':

name_df.at[index,'status'] = 'progress'

return row['market']

def count_documents_in_folder(folder_path):

if os.path.exists(folder_path) and os.path.isdir(folder_path):

files = os.listdir(folder_path)

document_count = len(files)

return document_count

else:

return 0


def execute_date_checker(route_path,now_market):

num_file=count_documents_in_folder(route_path)

data_path=f"{route_path}/{now_market}/{num_file}"

data_df=pd.read_json(data_path)

sort_data_df=data_df.sort_values(by='candle_date_time_kst',ascending=False)

b4_slicing_date=sort_data_df.loc[0,'candle_date_time_kst']
#"candle_date_time_kst":"2023-08-18T00:59:00"
#{yyyy}-{MM}-{dd}T{hh}%3A{mm}%3A{ss}
kst_year=b4_slicing_date[:4]
kst_month=b4_slicing_date[5:7]
kst_day=b4_slicing_date[8:10]
kst_hour=b4_slicing_date[11:13]
kst_min=b4_slicing_date[14:16]


execute_date=f"{kst_year}-{kst_month}-{kst_day}T{kst_hour}%3A{kst_min}%3A00"

list_for_save=[execute_date,num_file]

return list_for_save


def api_calling_upbit(market,date_info):

url = f"https://api.upbit.com/v1/candles/minutes/1?market={market}&to={date_info}%2B09%3A00&count=180"

headers = {"accept": "application/json"}

response = requests.get(url, headers=headers)

parsed_data=json_load(response)

data_for_save=pd.DataFrame(parsed_data)

return data_for_save

def api_with_execute(route_path,name_path):

now_market=status_checker(name_path)

execute_date_info=execute_date_checker(route_path,now_market)

api_data=api_calling_upbit(now_market,execute_date_info[0])

api_data.to_json(f"{route_path}/{execute_date_info[1]}")