no image
[Fastapi] Https 적용 및 도커로 최종 배포
전체 Flow chart  전체 백엔드 flow chart이다. 다음과 같은 흐름으로 전개된다.주도주 리스트가 갱신될 경우 Local server에서 S3에 갱신된 주도주 리스트를 업로드하고 Fastapi를 호출해 React에 웹소켓으로 알림을 줄 것을 요청한다. 해당 요청으로 실시간으로 주도주 리스트가 업데이트된다.Local server에서 실시간 1분봉 데이터를 받아와 /predict를 호출해 모델 서빙을 한다. 매수해야 할 경우 React에 웹소켓으로 알림을 준다. 사용자는 실시간으로 매수 알림을 받을 수 있다.React에서 초기 렌더링할 때 해당 주도주의 뉴스 분석 결과가 존재하지 않는다면 /crawl-and-analyze를 호출해 해당 주도주의 뉴스를 네이버 금융으로부터 가져와 Gemini ap..
2024.12.07
no image
[Fastapi] 웹소켓 구축
개요4-2 캡스톤 디자인 프로젝트에서 제공하는 웹 서비스에서 가장 중요한 것은 실시간 업데이트이다. 여러 해결 방식이 있겠지만 websocket을 통해 실시간 업데이트를 구현하였다. React와 S3를 웹소켓으로 양방향 연결을 하고 용도에 따라 여러 개의 트리거로 나눠 연결된 웹소켓을 활용하였다. 작동 방식은 동일하다. 특정 작업이 끝내면 사용자에게 실시간으로  알림을 준다.  코드from fastapi import APIRouter, WebSocket, WebSocketDisconnectfrom fastapi.middleware.cors import CORSMiddlewarerouter = APIRouter()# Connection 관리class ConnectionManager: def __ini..
2024.12.07
no image
[Fastapi] non_transformer 모델 서빙
개요4-2 캡스톤 디자인 프로젝트에서 제작할 웹사이트에 제공하기 위한 주가의 3% 상승 여부를 판단하는 모델을 학습시켰다. 매수 타이밍 예측을 위한 Target Model로는 Non-Stationary Transformer 모델을 사용하였다. 이 모델은 시계열 데이터의 비정상성(Non-Stationarity)을 효과적으로 처리할 수 있으며, 여타 모델 실험 결과(LSTM, Dlinear-I, Dlinear-S, TimesNet, PatchTST) 중에서 가장 높은 정확도(Best Prediction 75%)를 보였다. 최종적으로 해당 모델을 웹사이트에 서빙할 것이다.  모델 서빙 코드from fastapi import APIRouter, UploadFile, File, HTTPExceptionimport..
2024.12.07
no image
[Fastapi] 뉴스, 지수 크롤링 서버 구축
개요 4-2 캡스톤 디자인 프로젝트에서 만들 웹사이트의 기능 중 하나인 뉴스 긍정 부정 분석 결과 그리고 코스피, 코스닥 지수들을 실시간으로 제공하기 위한 API를 구현하였다.   뉴스 크롤링 및 분석Selenium과 BeautifulSoup을 사용해 네이버 뉴스에서 데이터를 수집하고, 이를 Gemini API로 분석 및 요약한 뒤 AWS S3에 저장하도록 구현하였다. 저장된 분석 결과는 React에서 호출되어 사용자에게 보여질 것이다.Gemini api 호출 import osimport google.generativeai as genaifrom dotenv import load_dotenvimport typing_extensions as typingfrom fastapi import APIRouterf..
2024.12.07
no image
[Fastapi] 모델 서빙과 CI/CD 테스트
개요현재 4-2 캡스톤 프로젝트에선 매수 알림 기능을 위해 pytorch로 만들어진 모델을 서빙해주는 프레임워크가 필요했다.그 역할에 제격인게 바로 같은 Python 프레임워크인 Fastapi였다.본격적으로 모델을 만들기 전에 테스트로 어떤 식으로 동작하는지 알고 싶었고 간단하게 Iris dataset을 Random Forest 방법으로 학습시켜 만든 모델로 진행하였다. 모델 import joblibfrom sklearn.datasets import load_irisfrom sklearn.ensemble import RandomForestClassifier# Load the iris datasetiris = load_iris()X, y = iris.data, iris.target# Train a rand..
2024.12.06

전체 Flow chart

 

 

전체 백엔드 flow chart이다. 다음과 같은 흐름으로 전개된다.

  • 주도주 리스트가 갱신될 경우 Local server에서 S3에 갱신된 주도주 리스트를 업로드하고 Fastapi를 호출해 React에 웹소켓으로 알림을 줄 것을 요청한다. 해당 요청으로 실시간으로 주도주 리스트가 업데이트된다.
  • Local server에서 실시간 1분봉 데이터를 받아와 /predict를 호출해 모델 서빙을 한다. 매수해야 할 경우 React에 웹소켓으로 알림을 준다. 사용자는 실시간으로 매수 알림을 받을 수 있다.
  • React에서 초기 렌더링할 때 해당 주도주의 뉴스 분석 결과가 존재하지 않는다면 /crawl-and-analyze를 호출해 해당 주도주의 뉴스를 네이버 금융으로부터 가져와 Gemini api를 통해 긍정 부정 분석을 한뒤 제공한다. 마찬가지로 실시간 업데이트를 위해 React에 웹소켓으로 알림을 준다.

폴더 구조

 

 

여러 Router들이 모듈화 되어있고 main.py에서 한번에 실행한다.

 

main.py

 

import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.config.settings import IP_NUM, PORT_NUM
from app.routers.root import router as root_router
from app.routers.crawl_and_analyze import router as crawl_router
from app.routers.model_predict import router as model_router
from app.routers.websocket import router as websocket_router
from app.routers.alert_list import router as alert_list_router
from app.routers.alert_chart import router as alert_chart_router
from app.routers.get_indices import router as indices_router
app = FastAPI()

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 모든 도메인에서의 요청을 허용
    allow_credentials=True,
    allow_methods=["*"],  # 모든 HTTP 메서드 (GET, POST, PUT, DELETE 등)를 허용
    allow_headers=["*"],  # 모든 헤더를 허용
)

# 라우터 포함
app.include_router(root_router)
app.include_router(crawl_router)
app.include_router(model_router)
app.include_router(websocket_router)
app.include_router(alert_list_router)
app.include_router(alert_chart_router)
app.include_router(indices_router)

if __name__ == '__main__':
    uvicorn.run(
        "app.main:app", 
        host=IP_NUM, 
        port=int(PORT_NUM), 
        workers=1, 
        ssl_keyfile="/etc/ssl/private/privkey.pem", 
        ssl_certfile="/etc/ssl/certs/fullchain.pem"
    )

 

모든 HTTP 메소드를 사용할 수 있도록 CORS 설정을 해주었고 Mixed content Error가 발생해 key와 ca파일을 추가해 https 단위로 fastapi 서버를 열어주었다.

 

Docker로 배포

 

# 1. 베이스 이미지 설정 (Python 3.11-slim 사용)
FROM python:3.11-slim


# 2. 작업 디렉토리 설정
WORKDIR /code

# 3. 필요한 패키지 설치 (Chrome 및 의존성 포함)
RUN apt-get update && apt-get install -y \
    wget \
    curl \
    gnupg \
    ca-certificates \
    unzip \
    locales \
    && rm -rf /var/lib/apt/lists/*

# 로케일 설정
RUN locale-gen ko_KR.UTF-8
ENV LANG=ko_KR.UTF-8 \
    LANGUAGE=ko_KR:ko \
    LC_ALL=ko_KR.UTF-8

# 4. Google Chrome의 GPG 키 및 리포지토리 추가
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - && \
    echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list && \
    apt-get update && apt-get install -y google-chrome-stable

# 5. pytorch 다운로드
RUN pip install torch==2.5.0 --index-url https://download.pytorch.org/whl/cpu

# 6. 인증서 파일 복사
COPY ./app/fullchain.pem /etc/ssl/certs/fullchain.pem
COPY ./app/privkey.pem /etc/ssl/private/privkey.pem

# 7. 파이썬 의존성 파일 복사 및 설치
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

# 8. 애플리케이션 파일 복사
COPY ./app /code/app

# 9. FastAPI에서 사용할 포트 열기
EXPOSE 8080


# 10. FastAPI 서버 실행 (uvicorn 사용)
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080", "--ssl-keyfile", "/etc/ssl/private/privkey.pem", "--ssl-certfile", "/etc/ssl/certs/fullchain.pem"]
# CMD ["python", "app/main.py"]

 

다음과 같이 태그를 나눠 docker hub에 배포했고 ec2에서 내려받아 사용했다.

 

[Fastapi] 웹소켓 구축

dlwpdnr213
|2024. 12. 7. 02:55

개요

4-2 캡스톤 디자인 프로젝트에서 제공하는 웹 서비스에서 가장 중요한 것은 실시간 업데이트이다.

 

여러 해결 방식이 있겠지만 websocket을 통해 실시간 업데이트를 구현하였다.

 

React와 S3를 웹소켓으로 양방향 연결을 하고 용도에 따라 여러 개의 트리거로 나눠 연결된 웹소켓을 활용하였다.

 

작동 방식은 동일하다. 특정 작업이 끝내면 사용자에게 실시간으로  알림을 준다. 

 

코드

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware

router = APIRouter()

# Connection 관리
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    async def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except WebSocketDisconnect:
                await self.disconnect(connection)

manager = ConnectionManager()

@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        await manager.disconnect(websocket)

async def notify_clients(message: str):
    await manager.broadcast(message)

 

각각 모듈화된 라우터에서 해당 웹소켓 라우터를 import한 뒤 notify_clients를 호출해서 사용한다.

결과

 

Fast api log와 Chrome 개발자 도구를 통해 웹소켓이 정상 연결된 것을 확인 가능하다.

 

 

 

개요

4-2 캡스톤 디자인 프로젝트에서 제작할 웹사이트에 제공하기 위한 주가의 3% 상승 여부를 판단하는 모델을 학습시켰다.

 

매수 타이밍 예측을 위한 Target Model로는 Non-Stationary Transformer 모델을 사용하였다. 이 모델은 시계열 데이터의 비정상성(Non-Stationarity)을 효과적으로 처리할 수 있으며, 여타 모델 실험 결과(LSTM, Dlinear-I, Dlinear-S, TimesNet, PatchTST) 중에서 가장 높은 정확도(Best Prediction 75%)를 보였다. 최종적으로 해당 모델을 웹사이트에 서빙할 것이다.

 

 

모델 서빙 코드

from fastapi import APIRouter, UploadFile, File, HTTPException
import json
import torch
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from app.routers.utils.data_processing import preprecessingData, calculatePriceBB, calculate_yellow_box, calculateTamountBB
from app.routers.model.non_transformer import Model
from .websocket import notify_clients
import boto3


s3_client = boto3.client("s3")
S3_BUCKET_NAME = "dev-jeus-bucket"
JSON_FILE_NAME = "stocklist.json"


class Args():
    def __init__(self):
        self.task_name = 'classification'
        self.seq_len = 10
        self.pred_len = 0
        self.label_len = 1
        self.moving_avg = 3
        self.num_class = 1
        self.c_out = 13
        self.enc_in = 13
        self.dec_in = 13
        self.d_model = 8
        self.embed = 'fixed'
        self.freq = 't'
        self.dropout = 0.3
        self.factor = 1
        self.output_attention = False
        self.n_heads = 4
        self.e_layers = 3
        self.d_layers = 1
        self.d_ff = 4 * self.n_heads
        self.activation = 'gelu'
        self.p_hidden_dims = [128, 128]
        self.p_hidden_layers = 2
        self.top_k = 5
        self.num_kernels = 4
        self.time_feature = False

args = Args()
model = Model(args)
model.load_state_dict(torch.load('/code/app/routers/checkpoints/checkpoint_nontft.pth', weights_only=True, map_location=torch.device('cpu')))
model.eval()


router = APIRouter()

@router.post("/predict")
async def predict(files: list[UploadFile] = File(...)):
    try:
        
        response = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=JSON_FILE_NAME)
        file_content = response["Body"].read().decode("utf-8")
        leading_stock_list = json.loads(file_content)

        updated_stocks = []  
        for file in files:
            if not file.filename.endswith("json"):
                raise HTTPException(status_code=400, detail=f"File {file.filename} must be a JSON file.")
            
            
            contents = await file.read()
            data = json.loads(contents)
            df = pd.DataFrame(data)

            
            target_columns = ['End', 'Open', 'High', 'Low', 
                              'Amount', 'AmountBB_center', 'AmountBB_upper', 'AmountBB_lower',
                              'Red_line', 'Yellow_line',
                              'PriceBB_center', 'PriceBB_upper', 'PriceBB_lower']
            convert_columns = ['End', 'Open', 'High', 'Low', 'Amount']
            for col in convert_columns:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            df.fillna(0, inplace=True)  
            df = preprecessingData(df)
            df = calculateTamountBB(df)
            df = calculate_yellow_box(df)
            df = calculatePriceBB(df)

            feature_columns = df[target_columns].values
            scaler = MinMaxScaler()
            X_windows = scaler.fit_transform(feature_columns)
            data_tensor = torch.tensor(X_windows, dtype=torch.float32).unsqueeze(0)[:, -10:, :]
            x_mark_enc = data_tensor[:, :, -4:]

            with torch.no_grad():
                output = model(data_tensor, x_mark_enc)
                prediction = output.squeeze()
            preds = torch.round(torch.sigmoid(prediction))

            for item in leading_stock_list:
                if item["name"] in file.filename:  
                    if int(preds.item()) == 1:
                        item["status"] = 1
                        updated_stocks.append(item)

        updated_content = json.dumps(leading_stock_list, ensure_ascii=False, indent=4)
        s3_client.put_object(
            Bucket=S3_BUCKET_NAME,
            Key=JSON_FILE_NAME,
            Body=updated_content,
            ContentType="application/json"
        )

        await notify_clients("All files predict complete")

        return {
            "message": f"Processed {len(files)} files.",
            "updated_stocks": updated_stocks
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")

 

 

모델 서빙 결과

Local 서버에서 Fast api 서버 /predict router로 주도주들의 1분봉 데이터와 함께 POST 요청을 보내면 Fast api 서버에서 모델 예측을 진행하여 나온 결과값 prediction을 json으로 return한다.

 

동시에 웹소켓을 통해 React에 알림을 주며 사용자에게 실시간으로 매수 알림을 제공한다.

 

예시주도주데이터.json

{
        "Date": "2024112614:32",
        "Open": "18120",
        "High": "18210",
        "Low": "18110",
        "End": "18160",
        "Amount": "150419280"
    },
    {
        "Date": "2024112614:33",
        "Open": "18140",
        "High": "18250",
        "Low": "18130",
        "End": "18170",
        "Amount": "137837620"
    }

 

위와 같은 형식의 주도주 데이터 json 파일들을 POST 요청할 경우 아래와 같이 매수 판단을 한 주도주 리스트를 Return한다.

 

 

동시에 AWS S3에 올라가 있는 주도주 리스트를 갱신하고 웹소켓으로 알림을 주며 실시간으로 매수 알림을 준다. 

개요

 

4-2 캡스톤 디자인 프로젝트에서 만들 웹사이트의 기능 중 하나인 뉴스 긍정 부정 분석 결과 그리고 코스피, 코스닥 지수들을 실시간으로 제공하기 위한 API를 구현하였다. 

 

 

뉴스 크롤링 및 분석

SeleniumBeautifulSoup을 사용해 네이버 뉴스에서 데이터를 수집하고, 이를 Gemini API분석 및 요약한 뒤 AWS S3저장하도록 구현하였다.

 

저장된 분석 결과는 React에서 호출되어 사용자에게 보여질 것이다.

Gemini api 호출 

import os
import google.generativeai as genai
from dotenv import load_dotenv
import typing_extensions as typing
from fastapi import APIRouter
from typing import Dict, Any, List
import json
from fastapi import APIRouter, HTTPException

class Analysis(typing.TypedDict):
    evaluation: str
    reason: str
    summary: str

router = APIRouter()

def configure_gemini_api():
    load_dotenv()
    GOOGLE_API_KEY = os.getenv('MY_KEY')
    genai.configure(api_key=GOOGLE_API_KEY)


def analyze_article(article_content: str, company_name: str, article_link: str, article_title: str):

    if not article_content:
        return {
            "evaluation": "Error",
            "summary": "분석 실패",
            "link": article_link,  # 링크는 결과에만 포함,
            "title": article_title
        }
    
    prompt = f"""
    다음의 Article을 바탕으로, 이 기사가 ${company_name} 종목에 대해 긍정적인 평가를 내리고 있는지, 부정적인 평가를 내리고 있는지, 또는 종목과 관련이 없는지 판단해줘. [긍정 / 관련 없음 / 부정] 중 하나의 단어로 평가를 내려서 evaluation 필드에 저장해줘. 또한 이 기사에 포함된 종목 관련 정말 중요한 기사의 핵심 내용을 한 번에 알아볼 수 있도록 한 줄로 간략하게 요약해서 'summary'에 저장해줘. 반환 결과는 반드시 JSON 형식이어야 해.
    여기서 'summary' 필드 예시를 들어보자면 다음과 같아.
    
    Article Example:
    {{'evaluation': '긍정, 'summary': '${company_name}가 3분기 실적에서 견조한 매출을 기록 및 '헬시 플레저' 트렌드에 부합하는 제품 출시 및 좋은 성적 도출'}}

    Article:
    {article_content}

    반환 형식은 아래와 같아:
    Analysis = {{'evaluation': str, 'summary': str}}
    Return: Analysis
    """
    
    try:
        model = genai.GenerativeModel("gemini-1.5-flash")
        result = model.generate_content(prompt)

        # candidates가 비어있는지 먼저 확인
        if not result.candidates or not result.candidates[0].content.parts:
            print(prompt);
            print(result);
            raise ValueError("API 응답이 비어 있습니다.")
        
        # API에서 반환된 텍스트 정리
        result_text = result.candidates[0].content.parts[0].text
        cleaned_response = result_text.replace("```json", "").replace("```", "").strip()
        
        # JSON 형식으로 변환
        analysis_result = json.loads(cleaned_response)
        analysis_result['link'] = article_link  # 링크는 결과에만 포함
        analysis_result['title'] = article_title  # 링크는 결과에만 포함
        
        return analysis_result  # 분석 결과 반환
    
    except Exception as e:
        print(f"Error during analysis: {e}")
        return {
            "evaluation": "Error",
            "summary": "API 요청에 실패했습니다.",
            "link": article_link,  # 실패 시에도 링크를 반환
            "title": article_title
        }

# 뉴스 기사 분석 함수
def analyze_news(articles: List[Dict[str, str]], company_name: str) -> List[Dict[str, str]]:
    try:
        configure_gemini_api()
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"API 설정 오류: {str(e)}")
    
    analyzed_articles = []

    for article in articles:
        # 각 기사에 'content' 필드가 없거나 비어 있을 경우 처리
        if 'content' not in article or not article['content']:
            analyzed_articles.append({
                "evaluation": "Error",
                "summary": "기사 내용이 없습니다.",
                "link": article.get('link', 'unknown'),  # 링크 필드 추가
                "title": article.get('title', 'unknown')  # 링크 필드 추가
            })
            continue
        
        # 각 기사를 분석하여 결과를 추가
        analysis = analyze_article(article['content'], company_name, article['link'], article['title'])
    
        analyzed_articles.append(analysis)
        filtered_articles = [article for article in analyzed_articles if article.get("evaluation") in ["긍정", "부정"]]

    return filtered_articles

 

뉴스 크롤링

from bs4 import BeautifulSoup
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
import datetime
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any

# 크롤링 요청 데이터 모델
class CrawlRequest(BaseModel):
    company_code: str
    page: int

# 라우터 설정
router = APIRouter()

class CrawlError(Exception):
    """Custom exception for errors during crawling."""
    def __init__(self, message: str):
        self.message = message
        super().__init__(self.message)

# Selenium 설정 함수
def get_browser():
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    driver = webdriver.Chrome(options=chrome_options)
    return driver

class CrawlError(Exception):

    def __init__(self, message: str):
        self.message = message
        super().__init__(self.message)


def crawl_news(company_code: str, page: int) -> List[Dict[str, str]]:
    driver = None  # driver 초기화
    try:
        if(len(company_code) < 6):
            print("종목 코드가 올바른 형식이 아닙니다.")
            raise CrawlError("종목 코드가 올바른 형식이 아닙니다.")
        
        if( page < 1):
            print("페이지 번호는 1과 200 사이여야 합니다.")
            raise CrawlError("페이지 번호는 1과 200 사이여야 합니다.")    

        
        # URL / 요청 헤더 설정
        url = f'https://finance.naver.com/item/news.naver?code={company_code}&page={page}'
        driver = get_browser()
        driver.get(url)

        WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'news_frame')))
        driver.switch_to.frame('news_frame')

        source_code = driver.page_source
        html = BeautifulSoup(source_code, "html.parser")

        # 중복 뉴스 제거
        for tr in html.select('tr.relation_lst'):
            tr.decompose()

        # 기사 item의 신문사 / 날짜 / 뉴스 주소 갖고 오기
        infos = html.select('.info')
        dates = html.select('.date')
        aTags = html.select('td.title a')

        links = [a.attrs['href'] for a in aTags]
        articles = []

        for i, full_url in enumerate(links):
            try:
                driver.get(full_url)
                WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'article')))
            except Exception as e:
                raise CrawlError(f"기사 페이지 로딩 실패: {full_url}")

            new_page_source = driver.page_source
            soup = BeautifulSoup(new_page_source, 'html.parser')

            for div in soup.select('div.vod_player_wrap._VIDEO_AREA_WRAP'):
                div.decompose()

            for div in soup.select('div.artical-btm'):
                div.decompose()

            for br in soup.find_all("br"):
                br.replace_with("\n")

            article_content = soup.select_one('article').text.strip()
            article_title = soup.select_one('#title_area span').text.strip()

            article = {
                'title': article_title,
                'publisher': infos[i].text.strip() if i < len(infos) else 'Unknown',
                'date': dates[i].text.strip() if i < len(dates) else 'Unknown',
                'link': full_url,
                'content': article_content,
            }

            articles.append(article)

        driver.quit()

    # 모든 크롤링 작업이 끝난 후 브라우저 종료
    except CrawlError as e:
        if driver:
            driver.quit()
        raise e  # CrawlError를 다시 발생시켜서 상위 함수에서 처리
    except Exception as e:
        if driver:
            driver.quit()
        raise CrawlError(f"크롤링 중 에러 발생: {str(e)}")  # 모든 에러를 CrawlError로 감싸서 던짐

    return articles

 

호출 Router

import json
import boto3
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List
from .crawl_news import crawl_news, CrawlError 
from .analyze_news import analyze_news
from .websocket import notify_clients

router = APIRouter()
s3_client = boto3.client("s3")

# 분석 응답 모델 정의
class AnalysisResDTO(BaseModel):
    evaluation: str
    summary: str
    link: str
    title: str

class CrawlAndAnalyzeRequest(BaseModel):
    company_code: str
    page: int
    company_name: str

class CrawlAndAnalyzeResponse(BaseModel):
    status: str
    total_articles: int 
    analysis: List[AnalysisResDTO]

@router.post("/crawl-and-analyze", response_model=CrawlAndAnalyzeResponse)
async def crawl_and_analyze(request: CrawlAndAnalyzeRequest) -> CrawlAndAnalyzeResponse:
    company_code = request.company_code
    page = request.page
    company_name = request.company_name

    try:
        articles = crawl_news(company_code, page)
    except CrawlError as e:
        raise HTTPException(status_code=400, detail=f"Crawling error: {e.message}")
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Crawling error: {e.message}")

    try:
        analyzed_articles = analyze_news(articles, company_name)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Gemini API error: {e.message}")

    analysis_dto = [AnalysisResDTO(**article) for article in analyzed_articles]

    s3_bucket_name = "dev-jeus-bucket"  
    file_name = f"{company_name}.json"

    # 기존 JSON 파일 가져오기
    try:
        response = s3_client.get_object(Bucket=s3_bucket_name, Key=file_name)
        existing_data = json.loads(response['Body'].read())
    except s3_client.exceptions.NoSuchKey:
        # 파일이 없는 경우 빈 리스트로 초기화
        existing_data = []

    # 기존 데이터에서 링크를 기준으로 중복 검사
    existing_links = {item["link"] for item in existing_data}

    # 중복되지 않는 새 데이터를 필터링하여 추가
    new_data = [article.dict() for article in analysis_dto if article.link not in existing_links]

    # 기존 데이터에 새로운 비중복 데이터 추가
    updated_data = existing_data + new_data

    # JSON 데이터를 다시 S3에 업로드
    try:
        s3_client.put_object(
            Bucket=s3_bucket_name,
            Key=file_name,
            Body=json.dumps(updated_data, ensure_ascii=False, indent=4),
            ContentType='application/json'
        )
        
        await notify_clients('newsupdate')  # 클라이언트 알림
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")

    return CrawlAndAnalyzeResponse(
        status="success",
        total_articles=len(new_data),
        analysis=analysis_dto
    )

 

결과 

실제로 고려아연 종목의 뉴스를 분석한 결과이다. 

[
    {
        "evaluation": "부정",
        "summary": "고려아연이 유상증자 계획을 철회하고 최윤범 회장이 이사회 의장직에서 물러나겠다고 밝히며 경영권 방어를 위해 주주들의 지지를 호소했지만, MBK파트너스·영풍과의 지분 격차는 더욱 벌어졌다.",
        "link": "https://n.news.naver.com/mnews/article/025/0003400388",
        "title": "고려아연 유상증자 철회 결정…최윤범 “이사회 의장직 사퇴”"
    },
    {
        "evaluation": "부정",
        "summary": "고려아연이 2조 5천억 원 규모의 유상증자를 철회하면서 경영권 분쟁이 임시 주총에서 의결권 대결로 이어질 것으로 예상됨",
        "link": "https://n.news.naver.com/mnews/article/056/0011837556",
        "title": "고려아연, 유상증자 철회…주총서 경영권 판가름 날 듯"
    },
    {
        "evaluation": "긍정",
        "summary": "고려아연은 유상증자를 철회하고 사외이사에게 의장직을 맡겨 독립 경영을 추진하며 주주 친화적인 정책을 강화한다. 분기 배당 도입, 소수주주 다수결 제도 도입, 소액주주 의사 반영 등을 통해 주주와의 소통 및 지지를 확보하려는 전략을 펼치고 있다.",
        "link": "https://n.news.naver.com/mnews/article/658/0000088195",
        "title": "고려아연, 유증 철회…최윤범 회장은 이사회 의장 물러난다"
    },
    {
        "evaluation": "긍정",
        "summary": "고려아연 최윤범 회장은 경영권 분쟁 승리를 자신하며 주주친화 정책을 통해 지지를 호소, 주주총회에서 승리할 가능성을 높였다.",
        "link": "https://n.news.naver.com/mnews/article/092/0002352347",
        "title": "주총 표대결 앞둔 최윤범 \"충분히 이길 수 있는 싸움\""
    },
]

 

 

지수 크롤링 및 분석

KOSPI, KOSDAQ, KOSPI 200 지수를 동일한 방법으로 SeleniumBeautifulSoup을 사용해 네이버 증권에서 수집 후 S3에 저장하도록 구현하였다.

 

from fastapi import APIRouter, HTTPException, Query
import requests
import urllib3
# from bs4 import BeautifulSoup as soup
from typing import List
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
import datetime
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from typing import List, Dict, Any
from .websocket import notify_clients
import json
import boto3

s3_client = boto3.client("s3")

urllib3.disable_warnings()

router = APIRouter()

# 기본 URL 및 필요한 태그 이름 정의
baseurl = 'https://m.stock.naver.com/domestic/index/'
nametag = 'zzDege'
indexvaluetag = 'YMlKec fxKbKc'

tickers = ['KOSPI', 'KOSDAQ', 'KPI200']

# 지수 정보를 가져오는 함수
from bs4 import BeautifulSoup as bs

# Selenium 설정 함수
def get_browser():
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    driver = webdriver.Chrome(options=chrome_options)
    return driver

def get_index_info(ticker: str):
    try:
        url = baseurl + ticker + '/total'
        driver = get_browser()
        driver.get(url)

        driver.implicitly_wait(15)
        WebDriverWait(driver, 10).until(
            lambda d: d.execute_script('return document.readyState') == 'complete'
        )


        # print('html 가져오기')
        source_code = driver.page_source
        page_html = bs(source_code, "html.parser")

        # page_html = requests.get(baseurl + ticker + '/total', verify=False)
        # print(page_html.text);

        # print('이름 가져오기')
        name = ticker  # 심볼로 이름 대신 사용
        # print(name);

        # print('지수 가져오기')
        strong_tag = page_html.find('strong', class_='GraphMain_price__GT8dV')

        if strong_tag is None:
            raise ValueError("Price strong tag not found")

        price = strong_tag.text.strip()
        # print(price);

        # print('상승폭 가져오기')
        change_value = page_html.find('span', class_='VGap_iconArrow__4xqtu VGap_RISING__hzXRB')
        
        if change_value:
            change_value = change_value.get_text()
            # print(change_value);
        else:
            change_value = "0"  # 값이 없을 경우 기본값 설정

        # print('변화율 가져오기')
        change_rate = page_html.find_all('span', class_='VGap_gap__LQYpL')
        # print(change_rate[1]);

        if change_rate:            
            change_rate = float(change_rate[1].contents[0].get_text());
            print(change_rate)
        else:
            change_rate = 0
        

        return name, price, change_value, change_rate

    except Exception as e:
        return None, None, None, f"Error fetching data for {ticker}: {str(e)}"


# 라우터 정의: 지수 데이터를 가져오는 엔드포인트
@router.get("/get_indices/")
async def get_indices(tickers: List[str] = Query(default=tickers)):

    s3_bucket_name = "dev-jeus-bucket"  
    file_name = "kospi_kosdaq_kpi200.json"    

    result_array = []

    for ticker in tickers:
        name, price, change_value, change_rate = get_index_info(ticker)
        if name and price and change_value and change_rate:
            result_array.append({
                "name": name,
                "price": price,
                "change_value": change_value,
                "change_rate": change_rate
            })
        else:
            result_array.append({
                "Error": change_rate  # change_rate가 에러 메시지로 사용됨
            })

    print(result_array)

    try:
        s3_client.put_object(
            Bucket=s3_bucket_name,
            Key=file_name,
            Body=json.dumps(result_array, ensure_ascii=False, indent=4),
            ContentType='application/json'
        )
        
        await notify_clients("kospi_update")  # 클라이언트 알림
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")
    return result_array

 

결과

실제로 호출한 결과이다. 

[
    {
        "name": "KOSPI",
        "price": "2,469.07",
        "change_value": "52.21",
        "change_rate": "+2.16"
    },
    {
        "name": "KOSDAQ",
        "price": "689.55",
        "change_value": "4.13",
        "change_rate": "+0.60"
    },
    {
        "name": "KPI200",
        "price": "328.44",
        "change_value": "7.91",
        "change_rate": "-2.47"
    }
]

개요

현재 4-2 캡스톤 프로젝트에선 매수 알림 기능을 위해 pytorch로 만들어진 모델을 서빙해주는 프레임워크가 필요했다.

그 역할에 제격인게 바로 같은 Python 프레임워크인 Fastapi였다.

본격적으로 모델을 만들기 전에 테스트로 어떤 식으로 동작하는지 알고 싶었고 간단하게 Iris dataset을 Random Forest 방법으로 학습시켜 만든 모델로 진행하였다.

 

모델 

import joblib
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier

# Load the iris dataset
iris = load_iris()
X, y = iris.data, iris.target

# Train a random forest classifier
model = RandomForestClassifier()
model.fit(X, y)

# Save the trained model
joblib.dump(model, 'model.joblib')

 

 

Fastapi

 

from fastapi import FastAPI
import joblib
import numpy as np
from sklearn.datasets import load_iris  # 추가
app = FastAPI()

# Load the trained model
model = joblib.load('model.joblib')

# Load the iris dataset
iris = load_iris()  # 추가

@app.get("/")
def read_root():
    return {"message": "Welcome to the ML Model API V2"}

@app.post("/predict/")
def predict(data: dict):
    features = np.array(data['features']).reshape(1, -1)
    prediction = model.predict(features)
    class_name = iris.target_names[prediction][0]
    return {"class": class_name}

 

만들어진 모델을 실고 /predict 라우터에 post 요청으로 딕셔너리 데이터가 도착하면 해당 데이터가 어떤 이미지인지 클래스 값을 리턴으로 돌려준다.

 

Docker

# Use the official Python image
FROM python:3.9

# Set the working directory in the container
WORKDIR /app

# Copy the local code to the container
COPY . .

# Install FastAPI and Uvicorn
RUN pip install --no-cache-dir -r requirements.txt

# Expose the port the app runs on
EXPOSE 8000

# Command to run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

 

우리의 프로젝트에선 모든 서비스를 컨테이너 단위로 올리기 때문에 도커 이미지로 만들어줄 필요가 있었다.

 

Docker hub upload

 

작동 test

Local에서 실행

 

/ endpoint로 접근

 

 

모델 호출을 위해 /predict로 접근

 

iris dataset중 setosa에 해당한다는 결과값을 받을 수 있었다.

 

 

AWS EC2에서 실행

 

Flow

 

보안그룹에서 8000 port tcp 연결 추가 → docker image pull 후 실행 → 브라우저에서 ip 단위로 접근 → 모델 호출

 

CI / CD

이번 프로젝트에서 devops 역할을 맡았기 때문에 만들어지는 모든 서비스를 자동으로 배포하고 싶었다. 

CI / CD 도구로는 Github Actions를 선택하였고 아래는 그 과정들이다.

 

1. Github repo 환경변수 설정 

 

 

2.  .github/workflows 하위 디렉토리로 yaml 파일 push

 

name: Build to dockerhub and deploy to ec2

on:
  push:
    branches:
      - main  # exec when pushed main branch

jobs:
  build:
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v2

      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.9'

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

      - name: Build Docker image
        run: |
          docker build -t ${{ secrets.DOCKER_USERNAME }}/fastapi_test:v1 .

      - name: Login to Docker Hub
        run: echo "${{ secrets.DOCKER_TOKEN }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin

      - name: Push Docker image to Docker Hub
        run: |
          docker push ${{ secrets.DOCKER_USERNAME }}/fastapi_test:v1

  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to EC2
        run: |
          echo "${{ secrets.EC2_KEY }}" > ec2_key.pem
          chmod 600 ec2_key.pem
          ssh -o StrictHostKeyChecking=no -i ec2_key.pem ec2-user@13.124.230.180 << 'EOF'
            docker pull ${{ secrets.DOCKER_USERNAME }}/fastapi_test:v1
            docker stop my_fastapi_container || true
            docker rm my_fastapi_container || true
            docker run -d -p 8000:8000 --name my_fastapi_container ${{ secrets.DOCKER_USERNAME }}/fastapi_test:v1
          EOF

 

작동 flow 및 테스트

 

 

개발자가 깃허브 레포지토리 메인 브렌치에 PUSH할 경우 그것을 트리거로 하여 Github Actions가 docker hub에 container를 빌드하고 최종적으로 EC2에 배포한다.

 

테스트

 

1. Fastapi Root router message 변경

 

2. Github push

 

 

ec2에서 확인 시 컨테이너가 재생성된 것 확인 및 버전 확인. V1에서 V2로 바뀐 것을 확인 가능하다.