주도주 리스트가 갱신될 경우 Local server에서 S3에 갱신된 주도주 리스트를 업로드하고 Fastapi를 호출해 React에 웹소켓으로 알림을 줄 것을 요청한다. 해당 요청으로 실시간으로 주도주 리스트가 업데이트된다.
Local server에서 실시간 1분봉 데이터를 받아와 /predict를 호출해 모델 서빙을 한다. 매수해야 할 경우 React에 웹소켓으로 알림을 준다. 사용자는 실시간으로 매수 알림을 받을 수 있다.
React에서 초기 렌더링할 때 해당 주도주의 뉴스 분석 결과가 존재하지 않는다면 /crawl-and-analyze를 호출해 해당 주도주의 뉴스를 네이버 금융으로부터 가져와 Gemini api를 통해 긍정 부정 분석을 한뒤 제공한다. 마찬가지로 실시간 업데이트를 위해 React에 웹소켓으로 알림을 준다.
폴더 구조
여러 Router들이 모듈화 되어있고 main.py에서 한번에 실행한다.
main.py
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config.settings import IP_NUM, PORT_NUM
from app.routers.root import router as root_router
from app.routers.crawl_and_analyze import router as crawl_router
from app.routers.model_predict import router as model_router
from app.routers.websocket import router as websocket_router
from app.routers.alert_list import router as alert_list_router
from app.routers.alert_chart import router as alert_chart_router
from app.routers.get_indices import router as indices_router
app = FastAPI()
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 모든 도메인에서의 요청을 허용
allow_credentials=True,
allow_methods=["*"], # 모든 HTTP 메서드 (GET, POST, PUT, DELETE 등)를 허용
allow_headers=["*"], # 모든 헤더를 허용
)
# 라우터 포함
app.include_router(root_router)
app.include_router(crawl_router)
app.include_router(model_router)
app.include_router(websocket_router)
app.include_router(alert_list_router)
app.include_router(alert_chart_router)
app.include_router(indices_router)
if __name__ == '__main__':
uvicorn.run(
"app.main:app",
host=IP_NUM,
port=int(PORT_NUM),
workers=1,
ssl_keyfile="/etc/ssl/private/privkey.pem",
ssl_certfile="/etc/ssl/certs/fullchain.pem"
)
모든 HTTP 메소드를 사용할 수 있도록 CORS 설정을 해주었고 Mixed content Error가 발생해 key와 ca파일을 추가해 https 단위로 fastapi 서버를 열어주었다.
Docker로 배포
# 1. 베이스 이미지 설정 (Python 3.11-slim 사용)
FROM python:3.11-slim
# 2. 작업 디렉토리 설정
WORKDIR /code
# 3. 필요한 패키지 설치 (Chrome 및 의존성 포함)
RUN apt-get update && apt-get install -y \
wget \
curl \
gnupg \
ca-certificates \
unzip \
locales \
&& rm -rf /var/lib/apt/lists/*
# 로케일 설정
RUN locale-gen ko_KR.UTF-8
ENV LANG=ko_KR.UTF-8 \
LANGUAGE=ko_KR:ko \
LC_ALL=ko_KR.UTF-8
# 4. Google Chrome의 GPG 키 및 리포지토리 추가
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - && \
echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list && \
apt-get update && apt-get install -y google-chrome-stable
# 5. pytorch 다운로드
RUN pip install torch==2.5.0 --index-url https://download.pytorch.org/whl/cpu
# 6. 인증서 파일 복사
COPY ./app/fullchain.pem /etc/ssl/certs/fullchain.pem
COPY ./app/privkey.pem /etc/ssl/private/privkey.pem
# 7. 파이썬 의존성 파일 복사 및 설치
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
# 8. 애플리케이션 파일 복사
COPY ./app /code/app
# 9. FastAPI에서 사용할 포트 열기
EXPOSE 8080
# 10. FastAPI 서버 실행 (uvicorn 사용)
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080", "--ssl-keyfile", "/etc/ssl/private/privkey.pem", "--ssl-certfile", "/etc/ssl/certs/fullchain.pem"]
# CMD ["python", "app/main.py"]
4-2 캡스톤 디자인 프로젝트에서 제작할 웹사이트에 제공하기 위한 주가의 3% 상승 여부를 판단하는 모델을 학습시켰다.
매수 타이밍 예측을 위한 Target Model로는 Non-Stationary Transformer 모델을 사용하였다. 이 모델은 시계열 데이터의 비정상성(Non-Stationarity)을 효과적으로 처리할 수 있으며, 여타 모델 실험 결과(LSTM, Dlinear-I, Dlinear-S, TimesNet, PatchTST) 중에서 가장 높은 정확도(Best Prediction 75%)를 보였다. 최종적으로 해당 모델을 웹사이트에 서빙할 것이다.
4-2 캡스톤 디자인 프로젝트에서 만들 웹사이트의 기능 중 하나인 뉴스 긍정 부정 분석 결과 그리고 코스피, 코스닥 지수들을 실시간으로 제공하기 위한 API를 구현하였다.
뉴스 크롤링 및 분석
Selenium과 BeautifulSoup을 사용해 네이버 뉴스에서 데이터를 수집하고, 이를 Gemini API로 분석 및 요약한 뒤 AWS S3에 저장하도록 구현하였다.
저장된 분석 결과는 React에서 호출되어 사용자에게 보여질 것이다.
Gemini api 호출
import os
import google.generativeai as genai
from dotenv import load_dotenv
import typing_extensions as typing
from fastapi import APIRouter
from typing import Dict, Any, List
import json
from fastapi import APIRouter, HTTPException
class Analysis(typing.TypedDict):
evaluation: str
reason: str
summary: str
router = APIRouter()
def configure_gemini_api():
load_dotenv()
GOOGLE_API_KEY = os.getenv('MY_KEY')
genai.configure(api_key=GOOGLE_API_KEY)
def analyze_article(article_content: str, company_name: str, article_link: str, article_title: str):
if not article_content:
return {
"evaluation": "Error",
"summary": "분석 실패",
"link": article_link, # 링크는 결과에만 포함,
"title": article_title
}
prompt = f"""
다음의 Article을 바탕으로, 이 기사가 ${company_name} 종목에 대해 긍정적인 평가를 내리고 있는지, 부정적인 평가를 내리고 있는지, 또는 종목과 관련이 없는지 판단해줘. [긍정 / 관련 없음 / 부정] 중 하나의 단어로 평가를 내려서 evaluation 필드에 저장해줘. 또한 이 기사에 포함된 종목 관련 정말 중요한 기사의 핵심 내용을 한 번에 알아볼 수 있도록 한 줄로 간략하게 요약해서 'summary'에 저장해줘. 반환 결과는 반드시 JSON 형식이어야 해.
여기서 'summary' 필드 예시를 들어보자면 다음과 같아.
Article Example:
{{'evaluation': '긍정, 'summary': '${company_name}가 3분기 실적에서 견조한 매출을 기록 및 '헬시 플레저' 트렌드에 부합하는 제품 출시 및 좋은 성적 도출'}}
Article:
{article_content}
반환 형식은 아래와 같아:
Analysis = {{'evaluation': str, 'summary': str}}
Return: Analysis
"""
try:
model = genai.GenerativeModel("gemini-1.5-flash")
result = model.generate_content(prompt)
# candidates가 비어있는지 먼저 확인
if not result.candidates or not result.candidates[0].content.parts:
print(prompt);
print(result);
raise ValueError("API 응답이 비어 있습니다.")
# API에서 반환된 텍스트 정리
result_text = result.candidates[0].content.parts[0].text
cleaned_response = result_text.replace("```json", "").replace("```", "").strip()
# JSON 형식으로 변환
analysis_result = json.loads(cleaned_response)
analysis_result['link'] = article_link # 링크는 결과에만 포함
analysis_result['title'] = article_title # 링크는 결과에만 포함
return analysis_result # 분석 결과 반환
except Exception as e:
print(f"Error during analysis: {e}")
return {
"evaluation": "Error",
"summary": "API 요청에 실패했습니다.",
"link": article_link, # 실패 시에도 링크를 반환
"title": article_title
}
# 뉴스 기사 분석 함수
def analyze_news(articles: List[Dict[str, str]], company_name: str) -> List[Dict[str, str]]:
try:
configure_gemini_api()
except Exception as e:
raise HTTPException(status_code=500, detail=f"API 설정 오류: {str(e)}")
analyzed_articles = []
for article in articles:
# 각 기사에 'content' 필드가 없거나 비어 있을 경우 처리
if 'content' not in article or not article['content']:
analyzed_articles.append({
"evaluation": "Error",
"summary": "기사 내용이 없습니다.",
"link": article.get('link', 'unknown'), # 링크 필드 추가
"title": article.get('title', 'unknown') # 링크 필드 추가
})
continue
# 각 기사를 분석하여 결과를 추가
analysis = analyze_article(article['content'], company_name, article['link'], article['title'])
analyzed_articles.append(analysis)
filtered_articles = [article for article in analyzed_articles if article.get("evaluation") in ["긍정", "부정"]]
return filtered_articles
뉴스 크롤링
from bs4 import BeautifulSoup
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
import datetime
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any
# 크롤링 요청 데이터 모델
class CrawlRequest(BaseModel):
company_code: str
page: int
# 라우터 설정
router = APIRouter()
class CrawlError(Exception):
"""Custom exception for errors during crawling."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
# Selenium 설정 함수
def get_browser():
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=chrome_options)
return driver
class CrawlError(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
def crawl_news(company_code: str, page: int) -> List[Dict[str, str]]:
driver = None # driver 초기화
try:
if(len(company_code) < 6):
print("종목 코드가 올바른 형식이 아닙니다.")
raise CrawlError("종목 코드가 올바른 형식이 아닙니다.")
if( page < 1):
print("페이지 번호는 1과 200 사이여야 합니다.")
raise CrawlError("페이지 번호는 1과 200 사이여야 합니다.")
# URL / 요청 헤더 설정
url = f'https://finance.naver.com/item/news.naver?code={company_code}&page={page}'
driver = get_browser()
driver.get(url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'news_frame')))
driver.switch_to.frame('news_frame')
source_code = driver.page_source
html = BeautifulSoup(source_code, "html.parser")
# 중복 뉴스 제거
for tr in html.select('tr.relation_lst'):
tr.decompose()
# 기사 item의 신문사 / 날짜 / 뉴스 주소 갖고 오기
infos = html.select('.info')
dates = html.select('.date')
aTags = html.select('td.title a')
links = [a.attrs['href'] for a in aTags]
articles = []
for i, full_url in enumerate(links):
try:
driver.get(full_url)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'article')))
except Exception as e:
raise CrawlError(f"기사 페이지 로딩 실패: {full_url}")
new_page_source = driver.page_source
soup = BeautifulSoup(new_page_source, 'html.parser')
for div in soup.select('div.vod_player_wrap._VIDEO_AREA_WRAP'):
div.decompose()
for div in soup.select('div.artical-btm'):
div.decompose()
for br in soup.find_all("br"):
br.replace_with("\n")
article_content = soup.select_one('article').text.strip()
article_title = soup.select_one('#title_area span').text.strip()
article = {
'title': article_title,
'publisher': infos[i].text.strip() if i < len(infos) else 'Unknown',
'date': dates[i].text.strip() if i < len(dates) else 'Unknown',
'link': full_url,
'content': article_content,
}
articles.append(article)
driver.quit()
# 모든 크롤링 작업이 끝난 후 브라우저 종료
except CrawlError as e:
if driver:
driver.quit()
raise e # CrawlError를 다시 발생시켜서 상위 함수에서 처리
except Exception as e:
if driver:
driver.quit()
raise CrawlError(f"크롤링 중 에러 발생: {str(e)}") # 모든 에러를 CrawlError로 감싸서 던짐
return articles
호출 Router
import json
import boto3
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from .crawl_news import crawl_news, CrawlError
from .analyze_news import analyze_news
from .websocket import notify_clients
router = APIRouter()
s3_client = boto3.client("s3")
# 분석 응답 모델 정의
class AnalysisResDTO(BaseModel):
evaluation: str
summary: str
link: str
title: str
class CrawlAndAnalyzeRequest(BaseModel):
company_code: str
page: int
company_name: str
class CrawlAndAnalyzeResponse(BaseModel):
status: str
total_articles: int
analysis: List[AnalysisResDTO]
@router.post("/crawl-and-analyze", response_model=CrawlAndAnalyzeResponse)
async def crawl_and_analyze(request: CrawlAndAnalyzeRequest) -> CrawlAndAnalyzeResponse:
company_code = request.company_code
page = request.page
company_name = request.company_name
try:
articles = crawl_news(company_code, page)
except CrawlError as e:
raise HTTPException(status_code=400, detail=f"Crawling error: {e.message}")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Crawling error: {e.message}")
try:
analyzed_articles = analyze_news(articles, company_name)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Gemini API error: {e.message}")
analysis_dto = [AnalysisResDTO(**article) for article in analyzed_articles]
s3_bucket_name = "dev-jeus-bucket"
file_name = f"{company_name}.json"
# 기존 JSON 파일 가져오기
try:
response = s3_client.get_object(Bucket=s3_bucket_name, Key=file_name)
existing_data = json.loads(response['Body'].read())
except s3_client.exceptions.NoSuchKey:
# 파일이 없는 경우 빈 리스트로 초기화
existing_data = []
# 기존 데이터에서 링크를 기준으로 중복 검사
existing_links = {item["link"] for item in existing_data}
# 중복되지 않는 새 데이터를 필터링하여 추가
new_data = [article.dict() for article in analysis_dto if article.link not in existing_links]
# 기존 데이터에 새로운 비중복 데이터 추가
updated_data = existing_data + new_data
# JSON 데이터를 다시 S3에 업로드
try:
s3_client.put_object(
Bucket=s3_bucket_name,
Key=file_name,
Body=json.dumps(updated_data, ensure_ascii=False, indent=4),
ContentType='application/json'
)
await notify_clients('newsupdate') # 클라이언트 알림
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")
return CrawlAndAnalyzeResponse(
status="success",
total_articles=len(new_data),
analysis=analysis_dto
)
결과
실제로 고려아연 종목의 뉴스를 분석한 결과이다.
[
{
"evaluation": "부정",
"summary": "고려아연이 유상증자 계획을 철회하고 최윤범 회장이 이사회 의장직에서 물러나겠다고 밝히며 경영권 방어를 위해 주주들의 지지를 호소했지만, MBK파트너스·영풍과의 지분 격차는 더욱 벌어졌다.",
"link": "https://n.news.naver.com/mnews/article/025/0003400388",
"title": "고려아연 유상증자 철회 결정…최윤범 “이사회 의장직 사퇴”"
},
{
"evaluation": "부정",
"summary": "고려아연이 2조 5천억 원 규모의 유상증자를 철회하면서 경영권 분쟁이 임시 주총에서 의결권 대결로 이어질 것으로 예상됨",
"link": "https://n.news.naver.com/mnews/article/056/0011837556",
"title": "고려아연, 유상증자 철회…주총서 경영권 판가름 날 듯"
},
{
"evaluation": "긍정",
"summary": "고려아연은 유상증자를 철회하고 사외이사에게 의장직을 맡겨 독립 경영을 추진하며 주주 친화적인 정책을 강화한다. 분기 배당 도입, 소수주주 다수결 제도 도입, 소액주주 의사 반영 등을 통해 주주와의 소통 및 지지를 확보하려는 전략을 펼치고 있다.",
"link": "https://n.news.naver.com/mnews/article/658/0000088195",
"title": "고려아연, 유증 철회…최윤범 회장은 이사회 의장 물러난다"
},
{
"evaluation": "긍정",
"summary": "고려아연 최윤범 회장은 경영권 분쟁 승리를 자신하며 주주친화 정책을 통해 지지를 호소, 주주총회에서 승리할 가능성을 높였다.",
"link": "https://n.news.naver.com/mnews/article/092/0002352347",
"title": "주총 표대결 앞둔 최윤범 \"충분히 이길 수 있는 싸움\""
},
]
지수 크롤링 및 분석
KOSPI, KOSDAQ, KOSPI 200 지수를 동일한 방법으로 Selenium과 BeautifulSoup을 사용해 네이버 증권에서 수집 후 S3에 저장하도록 구현하였다.
from fastapi import APIRouter, HTTPException, Query
import requests
import urllib3
# from bs4 import BeautifulSoup as soup
from typing import List
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
import datetime
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any
from .websocket import notify_clients
import json
import boto3
s3_client = boto3.client("s3")
urllib3.disable_warnings()
router = APIRouter()
# 기본 URL 및 필요한 태그 이름 정의
baseurl = 'https://m.stock.naver.com/domestic/index/'
nametag = 'zzDege'
indexvaluetag = 'YMlKec fxKbKc'
tickers = ['KOSPI', 'KOSDAQ', 'KPI200']
# 지수 정보를 가져오는 함수
from bs4 import BeautifulSoup as bs
# Selenium 설정 함수
def get_browser():
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=chrome_options)
return driver
def get_index_info(ticker: str):
try:
url = baseurl + ticker + '/total'
driver = get_browser()
driver.get(url)
driver.implicitly_wait(15)
WebDriverWait(driver, 10).until(
lambda d: d.execute_script('return document.readyState') == 'complete'
)
# print('html 가져오기')
source_code = driver.page_source
page_html = bs(source_code, "html.parser")
# page_html = requests.get(baseurl + ticker + '/total', verify=False)
# print(page_html.text);
# print('이름 가져오기')
name = ticker # 심볼로 이름 대신 사용
# print(name);
# print('지수 가져오기')
strong_tag = page_html.find('strong', class_='GraphMain_price__GT8dV')
if strong_tag is None:
raise ValueError("Price strong tag not found")
price = strong_tag.text.strip()
# print(price);
# print('상승폭 가져오기')
change_value = page_html.find('span', class_='VGap_iconArrow__4xqtu VGap_RISING__hzXRB')
if change_value:
change_value = change_value.get_text()
# print(change_value);
else:
change_value = "0" # 값이 없을 경우 기본값 설정
# print('변화율 가져오기')
change_rate = page_html.find_all('span', class_='VGap_gap__LQYpL')
# print(change_rate[1]);
if change_rate:
change_rate = float(change_rate[1].contents[0].get_text());
print(change_rate)
else:
change_rate = 0
return name, price, change_value, change_rate
except Exception as e:
return None, None, None, f"Error fetching data for {ticker}: {str(e)}"
# 라우터 정의: 지수 데이터를 가져오는 엔드포인트
@router.get("/get_indices/")
async def get_indices(tickers: List[str] = Query(default=tickers)):
s3_bucket_name = "dev-jeus-bucket"
file_name = "kospi_kosdaq_kpi200.json"
result_array = []
for ticker in tickers:
name, price, change_value, change_rate = get_index_info(ticker)
if name and price and change_value and change_rate:
result_array.append({
"name": name,
"price": price,
"change_value": change_value,
"change_rate": change_rate
})
else:
result_array.append({
"Error": change_rate # change_rate가 에러 메시지로 사용됨
})
print(result_array)
try:
s3_client.put_object(
Bucket=s3_bucket_name,
Key=file_name,
Body=json.dumps(result_array, ensure_ascii=False, indent=4),
ContentType='application/json'
)
await notify_clients("kospi_update") # 클라이언트 알림
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")
return result_array
현재 4-2 캡스톤 프로젝트에선 매수 알림 기능을 위해 pytorch로 만들어진 모델을 서빙해주는 프레임워크가 필요했다.
그 역할에 제격인게 바로 같은 Python 프레임워크인 Fastapi였다.
본격적으로 모델을 만들기 전에 테스트로 어떤 식으로 동작하는지 알고 싶었고 간단하게 Iris dataset을 Random Forest 방법으로 학습시켜 만든 모델로 진행하였다.
모델
import joblib
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
# Load the iris dataset
iris = load_iris()
X, y = iris.data, iris.target
# Train a random forest classifier
model = RandomForestClassifier()
model.fit(X, y)
# Save the trained model
joblib.dump(model, 'model.joblib')
Fastapi
from fastapi import FastAPI
import joblib
import numpy as np
from sklearn.datasets import load_iris # 추가
app = FastAPI()
# Load the trained model
model = joblib.load('model.joblib')
# Load the iris dataset
iris = load_iris() # 추가
@app.get("/")
def read_root():
return {"message": "Welcome to the ML Model API V2"}
@app.post("/predict/")
def predict(data: dict):
features = np.array(data['features']).reshape(1, -1)
prediction = model.predict(features)
class_name = iris.target_names[prediction][0]
return {"class": class_name}
만들어진 모델을 실고 /predict 라우터에 post 요청으로 딕셔너리 데이터가 도착하면 해당 데이터가 어떤 이미지인지 클래스 값을 리턴으로 돌려준다.
Docker
# Use the official Python image
FROM python:3.9
# Set the working directory in the container
WORKDIR /app
# Copy the local code to the container
COPY . .
# Install FastAPI and Uvicorn
RUN pip install --no-cache-dir -r requirements.txt
# Expose the port the app runs on
EXPOSE 8000
# Command to run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
우리의 프로젝트에선 모든 서비스를 컨테이너 단위로 올리기 때문에 도커 이미지로 만들어줄 필요가 있었다.
Docker hub upload
작동 test
Local에서 실행
/ endpoint로 접근
모델 호출을 위해 /predict로 접근
iris dataset중 setosa에 해당한다는 결과값을 받을 수 있었다.
AWS EC2에서 실행
Flow
보안그룹에서 8000 port tcp 연결 추가 → docker image pull 후 실행 → 브라우저에서 ip 단위로 접근 → 모델 호출
CI / CD
이번 프로젝트에서 devops 역할을 맡았기 때문에 만들어지는 모든 서비스를 자동으로 배포하고 싶었다.