가장 기초적인 프로그램의 시작으로
krx 주가 정보를 1분마다 불러와서 my sql에 적재하는 시스템을 만들어보았다.
1. 먼저 FinanceDataReader를 이용해서 주가 정보를 불러와 보자.
import FinanceDataReader as fdr
df_krx = fdr.StockListing('KRX')
df_krx
FinanceDataReader는 한국 주식 가격, 미국주식 가격, 지수, 환율, 암호화폐 가격, 종목 리스팅 등 금융 데이터 수집 라이브러리이다.
데이터는 df 형태로 불러와진다.
2. scheduler를 더해 매 1분마다 data가 불러와지는 프로그램을 만들어 보자.
import FinanceDataReader as fdr
import sched
import time
# 주가 정보를 불러오는 코드를 함수화 한다.
# 함수화하는 이유는 반복하기 위해서 이다.
def run_code():
print("Running the code...")
df_krx = fdr.StockListing('KRX')
df_krx.to_csv('./krx_stock_symbols.csv')
# 스케쥴러를 initialize해준다.
s = sched.scheduler(time.time, time.sleep)
# 반복할 기간을 선언해준다. 단위는 초 이다. 60초 이기때문에 1시간이다.
interval = 60
# 매 1분마다 run_code() 함수를 실행해주는 함수코드를 선언한다.
# run_code 실행후에는 다음 이벤트B,C,D... 를 예약한다.
def schedule_function():
run_code() # 코드 실행
s.enter(interval, 1, schedule_function, ()) # 리셋
# 실행할 이벤트A를 예약한다.
s.enter(interval, 1, schedule_function, ())
# 스케줄러를 실행한다.
s.run()
스케줄러를 사용할때 재귀함수 형태가 사용되기 때문에 코드를 이해하는 것이 조금 어렵다.
조금더 알아보자
sched는 파이썬 내장 함수이다.
s = sched.scheduler(time.time, time.sleep)
생성자 함수이다. 인자는 기본으로 입력해주는 값이다.
# scheduler.enter(delay, priority, action, argument=(), kwargs={})
s.enter(interval, 1, schedule_function, ())
delay 시간 후에 이벤트 action이 발생하도록 예약하는 메소드이다.
priority에 값을 넣어 우선순위를 지정할수 있는데 숫자가 낮을수록 우선순위가 높다.
# scheduler.run(blocking=True)
s.run()
Run all scheduled events.
예약된 이벤트들을 실행한다.
그래서 다시 한번 정리하자면, 위 코드의 실행순서는 아래와 같다.
스케줄런 선언 -> 인터벌 선언 -> 이벤트 A 예약 -> run으로 스케줄러 시작 -> 이벤트 A의 action 실행 -> run_code() 실행 -> 이벤트 B 예약 -> 인터벌 시간후에 이벤트 B의 action 실행 -> 반복
만약 매일 같은 시간에 실행하려면 어떻게 해야 할까?
위 질문에 답하기 위해서 조사하다가 scheduler의 다른 방법을 찾았다. 게다가 이게 더 간편하고 좋은 방법인것 같다.
import FinanceDataReader as fdr
import scheduler, time
# 주가 정보를 불러오는 코드를 함수화 한다.
# 함수화하는 이유는 반복하기 위해서 이다.
def run_code():
print("Running the code...")
df_krx = fdr.StockListing('KRX')
df_krx.to_csv('./krx_stock_symbols.csv')
# 실행주기 설정한다
schedule.every(60).seconds.do(collect_and_append_data)
# 스캐쥴 시작
while True:
schedule.run_pending()
time.sleep(1)
위 방법을 썼을때 매일 아침 같은 시간에 코드를 실행하고 싶다면 아래와 같이 사용하면 된다.
# 매일 08시 00분에 실행하고 싶을때
schedule.every().day.at("08:00").do(함수)
3. 주기적으로 발생하는 data를 만들었다면 my sql에 넣어보자.
import FinanceDataReader as fdr
from sqlalchemy import create_engine
import scheduler, time
# mysql에 접속하기 위한 정보를 선언한다.
db_user = 'root'
db_password = '비밀번호'
db_host = 'localhost'
db_name = '데이터베이스이름'
# mysql 엔진 선언
db_connection_str = f'mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}'
db_engine = create_engine(db_connection_str)
# 데이터를 불러오기 위한 함수코드 선언
def collect_and_append_data():
df_krx = fdr.StockListing('KRX')
# mysql에 데이터를 입력한다.
# name= : db 이름을 넣어주면 된다.
# in_exists= : data가 존재할경우 어떤 행동을 취할지 설정한다. append는 추가하고, replace는 교체한다.
df_krx.to_sql(name='stock_data', con=db_engine, if_exists='append', index=False)
print("Data appended to MySQL database.")
# 실행 주기 설정
schedule.every(60).seconds.do(collect_and_append_data)
# 스캐쥴 시작
while True:
schedule.run_pending()
time.sleep(1)
위에서 처음 써본 'sqlalchemy' 라이브러리를 처음 써보았다.
sqlalchemy는 ORM(Object Relational Mapper) 개체 관계형 매퍼라고 한다. 다시 말하자면 객체 정보를 db에 넣어주기 위해서는 RD에 맞게 sql을 만들어주어야 하는데 이를 적절하게 자동으로 만들어주는 라이브러리 정도로 보면 되겠다.
(이것을 이용해서 mysql말고 다른 db에도 적용할 수 있을까?)
아무튼 이런 유용한 라이브러리 덕분에 간단한 메소드덕분에 db에 data가 자동으로 insert 된다.
이외 python에서 mysql을 연동하기 위해선 pymysql 라이브러릴 사용하면 되는데, 이것은 조금 컨셉이 다르다. mysql을 직접적으로 제어할때 사용한다고 보면 된다.
위 테스트 결과 주기적으로 발생하는 data에 대하여 mysql에 적재하는 간단한 프로그램을 만들어 보았다.
다음 프로젝트로는
1. mysql이 아닌 nosql에 적재하기
2. hadoop의 가장 기초적인 형태 구현해보기
정도를 목표로 하면 될것 같다.
'Road to data engineer' 카테고리의 다른 글
| 객체지향 언어 이제 끝2 (0) | 2023.08.23 |
|---|---|
| 객체 지향 언어 이제 끝1 (0) | 2023.08.23 |
| 빅데이터 엔지니어링 프로젝트 준비하기2-2 - api data to nosql(mongodb) (0) | 2023.08.22 |
| 빅데이터 엔지니어링 프로젝트 준비하기2 - 가장 기초적인 프로그램을 만들어보자 (0) | 2023.08.21 |
| 빅데이터 엔지니어링 프로젝트 준비하기1 (0) | 2023.08.21 |