데이터 수집 파이프라인 구축
졸업 프로젝트로 AI기반 평생교육 커리큘럼 추천 서비스 큐릭(Kuriq)을 개발하고 있다.
간단하게 서비스를 소개하자면, 국내 주요 공공 교육 플랫폼에 흩어져 있는 강좌를 하나의 인터페이스에서 통합 탐색할 수 있게 하고, 사용자가 자연어로 입력한 관심사/학습 목표/현재 수준등을 바탕으로 RAG 기반으로 개인에게 최적화된 주간 학습 로드맵을 자동으로 생성해주는게 주 기능이다.
그렇기에 서비스 구현을 위해서는 강좌 데이터를 벡터 DB에 적재하는 파이프라인이 필요하다!! 그래서 내가 구현한 공공 교육 데이터를 수집하고 전처리한 뒤 DB에 적재하기까지의 과정을 정리해봤다.
1. 환경 설정
의존성 충돌 방지, 환경의 동일성 유지, 뭉서보다 시스템의 깔끔한 유지등을 위해… 가상환경을 세팅하고 그 위에서 작업했다.
- 가상환경이란? 컴퓨터 전체 환경에 라이브러리를 설치하는 게 아니라, 특정 프로젝트만을 위한 독립적인 실행 환경을 구축하는 것이다.
가상환경 설치 및 의존성 설치
1
2
3
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
requirements.txt
1
2
3
4
requests==2.31.0
chromadb>=0.5.0
sentence-transformers==2.6.1
python-dotenv==1.0.0
requirements.txt란, 프로젝트를 실행하기 위해서 필요한 패키지의 리스트이다. (의존성 관리를 위한 문서)
사용 라이브러리는 네 가지로 최소화했다. 임베딩 모델로 현재는 OpenAI API 대신 sentence-transformers의 로컬 모델을 사용하기로 했는데, 후에 제공신청을 한 API에 대해서 제공여부가 결정된 후 여러 모델(OpenAI API, Qwen 등…)을 사용해본 후 가장 성능이 좋은 모델을 사용할 예정이다.
환경변수 (.env)
1
2
3
4
5
6
7
DATA_GO_KR_API_KEY=공공데이터포털_API_키
KOCW_API_KEY=KOCW_API_키
CHROMA_MODE=local # local | server
CHROMA_PATH=./chroma_db # 로컬 모드일 때 저장 경로
CHROMA_HOST=localhost # 서버 모드일 때
CHROMA_PORT=8000
공공데이터 API를 크롤링하여 정보를 가져오므로 공공데이터 포털 API키와 KOCW API키를 넣어야한다.
아직 서버를 배포하지 않아서 로컬에 저장해야하는데, 나중에 서버를 띄운 후 코드의 일부분을 수정해야하는 것을 막기 위해 CHROMA_MODE 변수로 local모드와 server모드를 나눴다. 나중에 서버 배포 후에는 CHROMA_MODE를 server로 바꾸고 서버 주소만 입력하면 DB에 연결할 수 있다.
2. 데이터 모델 설정
우리 서비스는 우선적으로 강좌 정보 API를 제공하는 국가평생교육 K-MOOC, 서울시 평생학습포털, KOCW의 강좌 정보만을 가져온다. (온국민평생배움터 강좌 정보 API 제공신청을 한 상태.) 이후, 서비스를 확장하게 되면 Gseek등의 좀 더 다양한 국가 평생교육 사이트의 강좌를 추가할 예정이다.
따라서 API 크롤링을 진행해야한다. 이에 앞서 모든 수집기가 공통으로 사용하는 Course 데이터클래스를 정의했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@dataclass
class Course:
id: str # 플랫폼 내 고유 ID
title: str # 강의명
institution: str # 운영 기관
platform: str # 플랫폼명 (K-MOOC, KOCW 등)
category: str # 원본 카테고리
std_category: str = "기타" # 큐릭 표준 카테고리 (전처리 후 채워짐)
description: str = "" # 강의 소개
duration: str = "" # 학습 기간 / 시간
url: str = "" # 수강 신청 URL
is_free: bool = True # 무료 여부
level: str = "" # 난이도 (있는 경우)
def embed_text(self) -> str:
"""임베딩용 텍스트 생성: 강의명 + 표준카테고리 + 기관 + 소개 200자"""
snippet = self.description[:200] if self.description else ""
return f"{self.title} [{self.std_category}] {self.institution} {snippet}".strip()
def to_metadata(self) -> dict:
"""ChromaDB metadata 딕셔너리"""
return {
"title": self.title,
"institution": self.institution,
"platform": self.platform,
"category": self.std_category,
"duration": self.duration,
"url": self.url,
"is_free": self.is_free,
"level": self.level,
}
def chroma_id(self) -> str:
"""ChromaDB document ID — 플랫폼_ID"""
return f"{self.platform}_{self.id}"
- 데이터클래스(@dataclass)란, 데이터를 저장하는 클래스를 만들 때 보일러플레이트 코드를 자동으로 생성해주는 데코레이터이다. 데이터 구조를 정의하는 간단한 경우에는 일반 클래스보다 가독성이 높고 유지보수가 편리하다.
embed_text(self)메서드로 벡터 DB에 저장하기 위해 임베딩용 텍스트를 생성한다. (강의명, 카테고리, 기관, 소개글을 합쳐서 하나의 문장으로 만듦) 강의 전체 내용을 전부 넣으면 검색 품질이 저하될 수 있기에 소개 앞 200자만 사용하여 토큰 수를 줄인다.- ChromaDB는 실제 텍스트 외에 메타데이터를 딕셔너리 형태로 저장하기에,
to_metadata(self)메서드로 메타데이터를 가공한다. - 강의를 구별하기 위해 플랫폼 이름과 ID를 조합하여 DB에서 각 강의를 식별할 고유 키를 만든다.
3. 수집기 설계
공통 추상 클래스
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BaseCollector(ABC):
"""모든 수집기의 공통 추상 클래스"""
PLATFORM: str = ""
REQUEST_DELAY: float = 0.3 # API 호출 간격
def __init__(self, api_key: str = ""):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"User-Agent": "KuriqDataPipeline/1.0"})
@abstractmethod
def collect_all(self) -> Generator[Course, None, None]:
"""전체 강좌 수집 — Generator로 yield"""
pass
def fetch(self, url: str, params: dict = None, retries: int = 3, debug: bool = False) -> dict:
"""공통 HTTP GET 요청 — 재시도 로직 포함"""
for attempt in range(retries):
try:
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
if debug:
import json
logger.debug(f"[{self.PLATFORM}] HTTP {response.status_code} — URL: {response.url}")
try:
logger.debug(f"[{self.PLATFORM}] RAW JSON:\n{json.dumps(response.json(), ensure_ascii=False, indent=2)}")
except Exception:
logger.debug(f"[{self.PLATFORM}] RAW TEXT:\n{response.text[:2000]}")
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"[{self.PLATFORM}] 타임아웃 (시도 {attempt + 1}/{retries}): {url}")
except requests.exceptions.HTTPError as e:
logger.error(f"[{self.PLATFORM}] HTTP 오류 {e.response.status_code}: {url}")
logger.error(f"[{self.PLATFORM}] 응답 본문: {e.response.text[:500]}")
break
except Exception as e:
logger.error(f"[{self.PLATFORM}] 요청 실패: {e}")
logger.error(f"[{self.PLATFORM}] RAW TEXT:\n{response.text[:1000]}")
break
time.sleep(1)
return {}
def delay(self):
"""API 호출 간격 준수"""
time.sleep(self.REQUEST_DELAY)
BaseCollector를 상속받는 클래스는 무조건 collect_all() 메서드를 구현해야 한다. 이런 식으로 구현을 하면 다음과같은 장점이 있다.
강제성 부여
- 앞서 말했듯
BaseCollector를 상속받는 클래스는 무조건collect_all()메서드를 구현해야 한다. 이렇게 하면 수집기가 몇 개가 되더라도, 메인 로직에서는 어떤 수집기든collect_all()을 부르면 데이터가 나온다는 것을 보장받을 수 있다. (다형성의 핵심!) @abstractmethod가 붙은 메서드를 구현하지 않으면 파이썬은 아예 객체 생성을 허용하지 않는다.
- 앞서 말했듯
메모리 절약
Generator[Course, None, None]타입을 반환하는 방식을 사용하여 메모리를 절약했다.- 일반적인 방식(
return list)은, 강의 데이터를 다 긁을 때까지 기다렸다가, 한꺼번에 메모리에 올린다. 이때 데이터가 많아지면 서버가 뻗을 수도 있다. 반면 제너레이터 방식(yield)을 사용하면 데이터를 하나 긁어올 때마다 즉시 밖으로 던져주기에 전체 수집량이 아무리 많아도 메모리에는 항상 강의 데이터 1개 만큼의 공간만 사용된다.
중복 코드 제거
requests.Session(),delay, 재시도 로직 등을 부모클래스에서 처리해준다.
4. 전처리
카테고리 표준화
1
2
3
4
5
6
7
8
def normalize_category(raw: str) -> str:
if not raw:
return "기타"
for std_cat, keywords in CATEGORY_MAP.items():
for kw in keywords:
if kw in raw:
return std_cat
return "기타"
플랫폼마다 카테고리 체계가 다르거나 제공해주지 않는 경우가 있었다. K-MOOC은 응답 필드에서 직접 매핑했지만, KOCW나 전국평생학습강좌는 카테고리 정보가 제공되지 않아 설명 텍스트 기반 키워드 매칭으로 카테고리를 추론했다.
LLM을 사용하여 카테고리를 분류하는 방법도 고려했으나, 강좌 수가 많은 탓에 API 비용과 처리 시간이 많이 들 것 같아서 키워드 매칭으로 했으나, 후에 API 비용이 들지 않는 가벼운 로컬모델로 다시 매핑해보려 한다.
5. ChromaDB 적재
ChromaDB를 클라이언트 모드로 전환
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## 6. ChromaDB 적재
### ChromaDB 클라이언트 모드 전환
```python
def get_chroma_client():
mode = os.getenv("CHROMA_MODE", "local") # 기본값 로컬
if mode == "server":
return chromadb.HttpClient(
host=os.getenv("CHROMA_HOST", "localhost"),
port=int(os.getenv("CHROMA_PORT", "8000")),
)
else:
return chromadb.PersistentClient(
path=os.getenv("CHROMA_PATH", "./chroma_db")
)
로컬 개발 시에는 PersistentClient로 파일에 저장하고, 서버 배포 시에는 HttpClient로 전환하도록 했다. 후에 서버 배포 후 .env파일만 바꿔주면 되어 편리하다.
임베딩 모델
1
2
EMBED_MODEL = "paraphrase-multilingual-MiniLM-L12-v2"
BATCH_SIZE = 64
OpenAI의 text-embedding-ada-002 대신 sentence-transformers의 다국어 모델을 선택했다. 한국어를 포함한 다국어를 지원하며, 로컬에서 실행되어 API 비용이 들지 않는다. 마찬가지로 후에 좀 더 괜찮은 모델이 있다면 검증 후 적용해볼 계획이다.
배치 upsert
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def upsert(self, courses: List[Course]) -> None:
"""배치 단위로 임베딩 생성 후 ChromaDB upsert"""
if not courses:
return
ids = [c.chroma_id() for c in courses]
documents = [c.embed_text() for c in courses]
metadatas = [c.to_metadata() for c in courses]
for i in range(0, len(ids), BATCH_SIZE):
b_ids = ids[i:i + BATCH_SIZE]
b_docs = documents[i:i + BATCH_SIZE]
b_meta = metadatas[i:i + BATCH_SIZE]
embeddings = self.model.encode(b_docs, batch_size=BATCH_SIZE).tolist()
self.collection.upsert(
ids=b_ids,
documents=b_docs,
embeddings=embeddings,
metadatas=b_meta,
)
한 번에 전체를 넣는 대신 64개씩 배치로 처리해 메모리 사용을 줄였다.
- upsert란, Update(수정)와 Insert(삽입)의 합성어로, 데이터베이스(DB)에서 테이블에 데이터를 넣을 때 중복되는 값이 있으면 갱신(Update)하고, 없으면 새로 삽입(Insert)하는 기능을 뜻한다.
6. 메인 파이프라인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def preprocess(course: Course) -> Course | None:
"""수집기 -> 전처리 파이프라인"""
course = clean_course(course)
course.std_category = normalize_category(course.category)
if not is_valid(course):
return None
return course
def run_collector(
collector_gen: Generator[Course, None, None],
embedder: Embedder,
name: str,
) -> int:
buffer = []
total = 0
for raw in collector_gen:
processed = preprocess(raw)
if processed is None:
continue
buffer.append(processed)
if len(buffer) >= BATCH_FLUSH_SIZE:
embedder.upsert(buffer)
total += len(buffer)
logger.info(f"[{name}] 누적 적재: {total}개")
buffer.clear()
if buffer:
embedder.upsert(buffer)
total += len(buffer)
logger.info(f"[{name}] 완료 — 총 {total}개 적재")
return total
500개씩 버퍼에 쌓고 ChromaDB에 배치 적재한다.
run_collector로 효율적인 데이터 적재를 한다.- DB에 1개씩 1000번 요청을 보내는 것보다, 500개씩 2번 보내는 것이 훨씬 빠르다. (I/O 오버헤드 감소)
- 단순히 add만 하는 것이 아니라, 이미 있는 데이터면 업데이트하도 없으면 새로 추가하는 로직(upsert)로 데이터 중복을 방지한다.
7. 실행
1
2
3
4
5
6
# 가상환경 활성화
source venv/bin/activate
# 파이프라인 실행
python pipeline.py