코드로 우주평화
SQLAlchemy를 이용해 csv파일 DB업로드 하는 방법(feat. psql) 본문
CSV
대량의 데이터를 디비에 업로드하기 위해 사용되는 것 중에 하나가 csv파일입니다.
csv파일은 데이터들이 콤마로 구분되어진 파일을 뜻하는데 엑셀이나 스프레드 시트 등에서 쉽게 변환시켜 다운로드할 수 있습니다.
오늘은 실습으로 krx 한국거래소에서 제공하는 종목 데이터를 디비에 업로드해보겠습니다.
먼저 krx 한국거래소에서 받은 종목 데이터 파일은 다음과 같습니다. (csv로 다운받을 수 있습니다)
# stock.csv
종목코드,종목명,시장구분,소속부,종가,대비,등락률,시가,고가,저가,거래량,거래대금,시가총액,상장주식수
60310,3S,KOSDAQ,중견기업부,3445,15,0.44,3450,3485,3430,61292,211021945,159405362285,46271513
95570,AJ네트웍스,KOSPI,,6520,90,1.4,6440,6570,6440,23293,151484170,305281363400,46822295
6840,AK홀딩스,KOSPI,,23050,-400,-1.71,23450,23450,23000,19143,442882750,305356281050,13247561
54620,APS홀딩스,KOSDAQ,중견기업부,13300,350,2.7,12950,13500,12950,67914,899716350,271243139300,20394221
265520,AP시스템,KOSDAQ,우량기업부,23250,100,0.43,23250,23450,23150,59236,1379146250,355293038250,15281421
211270,AP위성,KOSDAQ,벤처기업부,14650,50,0.34,14550,14650,14450,26025,378654550,220955753600,15082304
27410,BGF,KOSPI,,5510,-10,-0.18,5520,5550,5410,219250,1205419400,527399518410,95716791
282330,BGF리테일,KOSPI,,181000,-3500,-1.9,185000,186000,180000,16466,2991162000,3128386986000,17283906
...
이러한 형태를 가진 파일을 csv라고 부릅니다.
실습
자 그럼 이런 데이터들을 어떻게 디비에 옮길 수 있을까요?
먼저 db_uploader.py라는 파일을 만들겠습니다.
그리고 csv를 import합니다.
# db_uploader.py
import csv
csv를 다룰 수 있도록 돕는 패키지입니다.(설치 불필요)
그다음 stock.csv 파일을 불러오는 코드를 작성해보겠습니다.
# db_uploader.py
import csv
from sqlalchemy import create_engine
from ap_toy.config import settings
with open("stock.csv", "r", encoding="utf-8") as data:
conn = create_engine(settings.POSTGRES_DSN).raw_connection()
with open명령을 통해 stock.csv를 읽어 data라고 이름을 붙여줍니다.
그 밑은 디비를 연결하는 과정입니다.
create_engine 안에는 디비 주소를 넣어주세요.
raw_connection()는 conn 이 cursor를 다룰 수 있도록 해줍니다.
이제 cursor()를 통해 cursor를 생성합니다.
# db_uploader.py
import csv
from sqlalchemy import create_engine
from ap_toy.config import settings
with open("stock.csv", "r", encoding="utf-8") as data:
conn = create_engine(settings.POSTGRES_DSN).raw_connection()
cursor = conn.cursor()
data = csv.reader(data)
next(data)
csv.reader() 메서드를 통해 open 한 csv파일을 data라는 변수에 할당합니다.
또한 맨 위줄을 제외하기 위해 next(data)를 합니다.
next(data)을 출력해보면 첫 번째 줄이 호출되는 것을 볼 수 있습니다.
print(next(data))
>>> ['종목코드', '종목명', '시장구분', '소속부', '종가', '대비', '등락률', '시가', '고가', '저가', '거래량', '거래대금', '시가총액', '상장주식수']
이제 data는 첫번째 줄을 제외한 다음 줄부터 반환하게 됩니다.
data를 for문을 통해 하나의 라인씩 불러옵시다.
# db_uploader.py
import csv
from sqlalchemy import create_engine
from ap_toy.config import settings
with open("stock.csv", "r", encoding="utf-8") as data:
conn = create_engine(settings.POSTGRES_DSN).raw_connection()
cursor = conn.cursor()
data = csv.reader(data)
next(data)
for line in data:
if line[2] in ("KOSPI", "KOSDAQ"):
insert_stock(line[:3], cursor)
count += 1
conn.commit()
line의 3번째 인덱스는 시장을 의미합니다.
저는 "KOSPI", "KOSDAQ" 중에서만 디비에 저장할 것이므로 이 둘에 해당되는지 if를 추가합니다.
만약 맞다면 insert_stock라는 직접 만든 함수에 line의 3개의 요소만 넣어줍니다.
여기서 3개의 요소는 ‘종목코드’, ‘종목 이름’, ‘시장’ 을 의미합니다.
그리고 함수에서 사용할 cursor도 함께 넣어줍니다.
# db_uploader.py
def insert_stock(data, cursor):
query = "INSERT INTO stock(code, name, market) VALUES (%s, %s, %s)\\
ON CONFLICT DO NOTHING;"
cursor.execute(query, data)
함수 안에는 디비에 삽입하는 쿼리문을 담았습니다.
현재 종목코드와 종목이름 컬럼은 unique 설정을 하였습니다.
만약 중복되는 데이터가 들어온다면 에러를 반환합니다.
이렇게 해두면 여러번 스크립트를 돌리더라도 디비에 중복 데이터가 들어가지 않습니다.
하지만 하나라도 중복이 되면 에러를 반환하기 때문에 중복되지 않은 데이터도 들어가지 못합니다.
그래서 쿼리문 뒤에 ‘ ON CONFLICT DO NOTHING’을 붙였습니다.
이 명령어는 만약 충돌이 날 경우 아무것도 하지 않는다는 뜻입니다.
즉, 중복된 값은 그대로 두고 다음으로 넘어가 디비에 없는 종목만 추가합니다.
코드
앞서 작성한 전체 코드는 다음과 같습니다.
# db_uploader.py
import csv
from sqlalchemy import create_engine
from ap_toy.config import settings
def insert_stock(data, cursor):
query = "INSERT INTO stock(code, name, market) VALUES (%s, %s, %s)\\
ON CONFLICT DO NOTHING;"
cursor.execute(query, data)
if __name__ == "__main__":
print("종목을 데이터베이스에 업로드 합니다.")
with open("stock.csv", "r", encoding="utf-8") as data:
conn = create_engine(settings.POSTGRES_DSN).raw_connection()
cursor = conn.cursor()
data = csv.reader(data)
next(data)
for line in data:
if line[2] in ("KOSPI", "KOSDAQ"):
insert_stock(line[:3], cursor)
conn.commit()
print("종목 업로드가 완료되었습니다.")
print를 사용해 업로드 시작과 끝을 구분해보았습니다.
또한 해당 스크립트를 직접 실행할 때만 코드가 수행되도록 ‘if name == "main":’ 코드를 추가하였습니다.
마지막으로 디비에 넣은 데이터들을 저장하기 위해 commit 하는 것을 잊지 마세요!
commit을 해주지 않으면 디비에 저장되지 않습니다.
결과
이제 파일을 실행해봅시다!
python db_uploader.py
>>> 종목을 데이터베이스에 업로드 합니다.
>>> 종목 업로드가 완료되었습니다.
print가 정상 출력되었습니다.
디비는 테이블 플러스(GUI)로 확인해보겠습니다.
밑에 rows 수를 보시면 2,494개의 종목이 정상 업로드된 것을 확인할 수 있습니다.
만약 디비에 대량의 데이터를 업로드해야 한다면 csv를 활용해보시기 바랍니다~!
'나는 이렇게 학습한다 > Library' 카테고리의 다른 글
SQLAlchemy 1.x 와 2.0의 Query 스타일 비교 (0) | 2022.04.25 |
---|---|
SQLAlchemy에서의 비동기 쿼리 (feat. 2.0 Style) (0) | 2022.04.24 |
SQLAlchemy, ‘PasswordType’으로 손쉽게 암호화하자 (0) | 2022.04.12 |
Poetry로 프로젝트 패키지를 관리하자 (0) | 2022.04.11 |
pre-commit 에서 ‘flake8’ 과 ‘black’ 커스텀 문제 (0) | 2022.04.07 |