402 lines
14 KiB
Python
402 lines
14 KiB
Python
# ===================================================================
|
|
# services/analytics/ga4.py
|
|
# Google Analytics 4 데이터 수집 서비스 모듈
|
|
# ===================================================================
|
|
# GA4 API를 통해 웹사이트 방문자 데이터를 수집하고 DB에 저장합니다.
|
|
# 병렬 처리를 통해 대량 데이터 수집 성능을 최적화합니다.
|
|
# ===================================================================
|
|
"""
|
|
Google Analytics 4 데이터 수집 서비스 모듈
|
|
|
|
GA4 Data API를 사용하여 웹사이트 분석 데이터를 수집합니다.
|
|
일별 세션, 사용자, 이벤트 등 다양한 메트릭을 조회할 수 있습니다.
|
|
|
|
사용 예시:
|
|
from services.analytics.ga4 import GA4Client, GA4DataCollector
|
|
|
|
# 간단한 데이터 조회
|
|
client = GA4Client(property_id, service_account_file)
|
|
data = client.get_daily_sessions('2024-01-01', '2024-01-31')
|
|
|
|
# 자동 데이터 수집 및 저장
|
|
collector = GA4DataCollector(config)
|
|
collector.collect_and_save()
|
|
"""
|
|
|
|
import os
|
|
import traceback
|
|
from datetime import datetime, timedelta, date
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from dateutil.parser import parse as parse_date
|
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy import select, func, Table
|
|
|
|
from core.logging_utils import get_logger
|
|
from core.config import get_config
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# GA4 라이브러리 임포트 (설치 필요)
|
|
try:
|
|
from google.analytics.data import BetaAnalyticsDataClient
|
|
from google.analytics.data_v1beta.types import (
|
|
DateRange, Dimension, Metric, RunReportRequest
|
|
)
|
|
GA4_AVAILABLE = True
|
|
except ImportError:
|
|
GA4_AVAILABLE = False
|
|
logger.warning("google-analytics-data 패키지가 설치되지 않았습니다.")
|
|
|
|
|
|
class GA4Client:
|
|
"""
|
|
Google Analytics 4 API 클라이언트
|
|
|
|
GA4 Data API를 통해 리포트 데이터를 조회합니다.
|
|
|
|
Attributes:
|
|
property_id: GA4 속성 ID
|
|
client: BetaAnalyticsDataClient 인스턴스
|
|
max_rows: API 요청당 최대 행 수
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
property_id: int,
|
|
service_account_file: Optional[str] = None,
|
|
max_rows: int = 10000
|
|
):
|
|
"""
|
|
Args:
|
|
property_id: GA4 속성 ID
|
|
service_account_file: 서비스 계정 JSON 파일 경로
|
|
max_rows: 요청당 최대 행 수
|
|
|
|
Raises:
|
|
ImportError: google-analytics-data 패키지 미설치 시
|
|
Exception: 인증 실패 시
|
|
"""
|
|
if not GA4_AVAILABLE:
|
|
raise ImportError(
|
|
"GA4 기능을 사용하려면 google-analytics-data 패키지를 설치하세요: "
|
|
"pip install google-analytics-data"
|
|
)
|
|
|
|
self.property_id = property_id
|
|
self.max_rows = max_rows
|
|
|
|
# 서비스 계정 인증 설정
|
|
if service_account_file:
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file
|
|
logger.info(f"GA4 클라이언트 초기화 - 인증파일: {service_account_file}")
|
|
|
|
try:
|
|
self.client = BetaAnalyticsDataClient()
|
|
logger.info("GA4 클라이언트 초기화 완료")
|
|
except Exception as e:
|
|
logger.error(f"GA4 클라이언트 초기화 실패: {e}")
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
def run_report(
|
|
self,
|
|
start_date: str,
|
|
end_date: str,
|
|
dimensions: List[str],
|
|
metrics: List[str],
|
|
limit: Optional[int] = None
|
|
) -> Optional[Any]:
|
|
"""
|
|
GA4 리포트 실행
|
|
|
|
Args:
|
|
start_date: 시작 날짜 (YYYY-MM-DD)
|
|
end_date: 종료 날짜 (YYYY-MM-DD)
|
|
dimensions: 차원 목록 (예: ['date', 'city'])
|
|
metrics: 메트릭 목록 (예: ['sessions', 'activeUsers'])
|
|
limit: 결과 제한 (None이면 max_rows 사용)
|
|
|
|
Returns:
|
|
RunReportResponse 또는 None (실패 시)
|
|
"""
|
|
if limit is None:
|
|
limit = self.max_rows
|
|
|
|
logger.debug(f"GA4 리포트 요청: {start_date} ~ {end_date}, dims={dimensions}, metrics={metrics}")
|
|
|
|
try:
|
|
request = RunReportRequest(
|
|
property=f"properties/{self.property_id}",
|
|
dimensions=[Dimension(name=d) for d in dimensions],
|
|
metrics=[Metric(name=m) for m in metrics],
|
|
date_ranges=[DateRange(start_date=start_date, end_date=end_date)],
|
|
limit=limit,
|
|
)
|
|
response = self.client.run_report(request)
|
|
logger.info(f"GA4 리포트 응답: {len(response.rows)}건")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"GA4 리포트 요청 실패: {e}")
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def get_daily_sessions(
|
|
self,
|
|
start_date: str,
|
|
end_date: str
|
|
) -> List[Dict]:
|
|
"""
|
|
일별 세션 데이터 조회
|
|
|
|
Args:
|
|
start_date: 시작 날짜 (YYYY-MM-DD)
|
|
end_date: 종료 날짜 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
일별 세션 데이터 리스트
|
|
[{'date': date, 'sessions': int, 'activeUsers': int}, ...]
|
|
"""
|
|
response = self.run_report(
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
dimensions=['date'],
|
|
metrics=['sessions', 'activeUsers']
|
|
)
|
|
|
|
if response is None:
|
|
return []
|
|
|
|
result = []
|
|
for row in response.rows:
|
|
date_str = row.dimension_values[0].value
|
|
result.append({
|
|
'date': datetime.strptime(date_str, "%Y%m%d").date(),
|
|
'sessions': int(row.metric_values[0].value),
|
|
'activeUsers': int(row.metric_values[1].value)
|
|
})
|
|
|
|
return result
|
|
|
|
def detect_max_rows(self) -> int:
|
|
"""
|
|
API에서 지원하는 최대 행 수 감지
|
|
|
|
Returns:
|
|
최대 행 수 (감지 실패 시 기본값 10000)
|
|
"""
|
|
try:
|
|
request = RunReportRequest(
|
|
property=f"properties/{self.property_id}",
|
|
dimensions=[Dimension(name="date")],
|
|
metrics=[Metric(name="sessions")],
|
|
date_ranges=[DateRange(start_date="2024-01-01", end_date="2024-12-31")],
|
|
limit=100000
|
|
)
|
|
response = self.client.run_report(request)
|
|
n_rows = len(response.rows)
|
|
logger.info(f"최대 행 수 감지: {n_rows}")
|
|
return n_rows
|
|
except Exception as e:
|
|
logger.warning(f"최대 행 수 감지 실패: {e}")
|
|
return 10000
|
|
|
|
|
|
class GA4DataCollector:
|
|
"""
|
|
GA4 데이터 자동 수집기
|
|
|
|
설정에 따라 GA4 데이터를 자동으로 수집하고 DB에 저장합니다.
|
|
|
|
Attributes:
|
|
client: GA4Client 인스턴스
|
|
engine: SQLAlchemy 엔진
|
|
table: 대상 테이블
|
|
force_update: 강제 업데이트 여부
|
|
debug: 디버그 모드
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
engine,
|
|
table: Table,
|
|
property_id: Optional[int] = None,
|
|
service_account_file: Optional[str] = None,
|
|
force_update: bool = False,
|
|
debug: bool = False
|
|
):
|
|
"""
|
|
Args:
|
|
engine: SQLAlchemy 엔진
|
|
table: 대상 테이블
|
|
property_id: GA4 속성 ID (None이면 설정에서 로드)
|
|
service_account_file: 서비스 계정 파일 (None이면 설정에서 로드)
|
|
force_update: 기존 데이터 덮어쓰기 여부
|
|
debug: 디버그 모드
|
|
"""
|
|
config = get_config()
|
|
|
|
if property_id is None:
|
|
property_id = config.ga4.get('property_id')
|
|
if service_account_file is None:
|
|
service_account_file = config.ga4.get('service_account_file')
|
|
|
|
self.client = GA4Client(property_id, service_account_file)
|
|
self.engine = engine
|
|
self.table = table
|
|
self.force_update = force_update
|
|
self.debug = debug
|
|
|
|
# 설정에서 날짜 범위 로드
|
|
self.config_start_date = datetime.strptime(
|
|
config.ga4.get('start_date', '20170101'), '%Y%m%d'
|
|
).date()
|
|
self.config_end_date = datetime.strptime(
|
|
config.ga4.get('end_date', '20991231'), '%Y%m%d'
|
|
).date()
|
|
|
|
def get_latest_date_from_db(self) -> Optional[date]:
|
|
"""DB에서 가장 최근 저장된 날짜 조회"""
|
|
with self.engine.connect() as conn:
|
|
stmt = select(func.max(self.table.c.date))
|
|
result = conn.execute(stmt).scalar()
|
|
logger.info(f"DB 기준 마지막 저장 날짜: {result}")
|
|
return result
|
|
|
|
def determine_date_range(self) -> Tuple[date, date]:
|
|
"""
|
|
수집할 날짜 범위 결정
|
|
|
|
Returns:
|
|
(시작일, 종료일) 튜플
|
|
"""
|
|
yesterday = datetime.now().date() - timedelta(days=1)
|
|
actual_end = min(yesterday, self.config_end_date)
|
|
|
|
if self.force_update:
|
|
actual_start = self.config_start_date
|
|
else:
|
|
latest_db_date = self.get_latest_date_from_db()
|
|
if latest_db_date is not None:
|
|
actual_start = latest_db_date + timedelta(days=1)
|
|
else:
|
|
actual_start = self.config_start_date
|
|
|
|
return actual_start, actual_end
|
|
|
|
def save_response_to_db(
|
|
self,
|
|
response,
|
|
dimension_names: List[str],
|
|
metric_names: List[str]
|
|
) -> int:
|
|
"""
|
|
GA4 응답 데이터를 DB에 저장
|
|
|
|
Args:
|
|
response: GA4 RunReportResponse
|
|
dimension_names: 차원 이름 목록
|
|
metric_names: 메트릭 이름 목록
|
|
|
|
Returns:
|
|
저장된 레코드 수
|
|
"""
|
|
if response is None:
|
|
return 0
|
|
|
|
saved_count = 0
|
|
|
|
with self.engine.begin() as conn:
|
|
for row in response.rows:
|
|
data = {}
|
|
|
|
# 차원 처리
|
|
for i, dim_name in enumerate(dimension_names):
|
|
try:
|
|
val = row.dimension_values[i].value
|
|
if dim_name == "date":
|
|
if len(val) == 8:
|
|
val = datetime.strptime(val, "%Y%m%d").date()
|
|
else:
|
|
val = parse_date(val).date()
|
|
data[dim_name] = val
|
|
except (IndexError, ValueError) as e:
|
|
logger.warning(f"차원 처리 오류 ({dim_name}): {e}")
|
|
|
|
# 메트릭 처리
|
|
for i, met_name in enumerate(metric_names):
|
|
try:
|
|
data[met_name] = int(row.metric_values[i].value)
|
|
except (IndexError, ValueError):
|
|
data[met_name] = None
|
|
|
|
# DB 저장
|
|
if self.debug:
|
|
logger.debug(f"[DEBUG] 저장할 데이터: {data}")
|
|
continue
|
|
|
|
try:
|
|
stmt = mysql_insert(self.table).values(**data)
|
|
stmt = stmt.on_duplicate_key_update(**data)
|
|
conn.execute(stmt)
|
|
saved_count += 1
|
|
except IntegrityError as e:
|
|
logger.error(f"중복 오류: {e}")
|
|
except Exception as e:
|
|
logger.error(f"저장 실패: {e}")
|
|
traceback.print_exc()
|
|
|
|
return saved_count
|
|
|
|
def collect_and_save(
|
|
self,
|
|
dimensions: List[str] = ['date'],
|
|
metrics: List[str] = ['sessions', 'activeUsers'],
|
|
chunk_days: int = 30
|
|
) -> int:
|
|
"""
|
|
데이터 수집 및 저장 실행
|
|
|
|
Args:
|
|
dimensions: 수집할 차원 목록
|
|
metrics: 수집할 메트릭 목록
|
|
chunk_days: 청크 크기 (일)
|
|
|
|
Returns:
|
|
총 저장된 레코드 수
|
|
"""
|
|
start_date, end_date = self.determine_date_range()
|
|
|
|
if start_date > end_date:
|
|
logger.info("최신 데이터가 이미 존재합니다.")
|
|
return 0
|
|
|
|
logger.info(f"GA4 데이터 수집 시작: {start_date} ~ {end_date}")
|
|
|
|
total_saved = 0
|
|
current_start = start_date
|
|
|
|
while current_start <= end_date:
|
|
current_end = min(current_start + timedelta(days=chunk_days - 1), end_date)
|
|
|
|
logger.info(f"청크 처리: {current_start} ~ {current_end}")
|
|
|
|
response = self.client.run_report(
|
|
start_date=current_start.strftime("%Y-%m-%d"),
|
|
end_date=current_end.strftime("%Y-%m-%d"),
|
|
dimensions=dimensions,
|
|
metrics=metrics
|
|
)
|
|
|
|
if response:
|
|
saved = self.save_response_to_db(response, dimensions, metrics)
|
|
total_saved += saved
|
|
|
|
current_start = current_end + timedelta(days=1)
|
|
|
|
logger.info(f"GA4 데이터 수집 완료: 총 {total_saved}건 저장")
|
|
return total_saved
|