diff --git a/lib/ga4.py b/lib/ga4.py new file mode 100644 index 0000000..b218d30 --- /dev/null +++ b/lib/ga4.py @@ -0,0 +1,232 @@ +import sys, os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import yaml +import pprint +from datetime import datetime, timedelta +from google.analytics.data import BetaAnalyticsDataClient +from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.exc import IntegrityError +from sqlalchemy import select, func +from concurrent.futures import ThreadPoolExecutor, as_completed + +from conf import db, db_schema + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml") + +# ------------------------ +# 설정 파일 로드 +# ------------------------ +def load_config(): + with open(CONFIG_PATH, encoding="utf-8") as f: + cfg = yaml.safe_load(f) + sa_file = cfg.get('ga4', {}).get('service_account_file') + if sa_file and not os.path.isabs(sa_file): + base_dir = os.path.dirname(CONFIG_PATH) + cfg['ga4']['service_account_file'] = os.path.abspath(os.path.join(base_dir, sa_file)) + return cfg + +# ------------------------ +# GA4 클라이언트 초기화 +# ------------------------ +def init_ga_client(service_account_file): + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file + print(f"[INFO] GA4 클라이언트 초기화 - 인증파일: {service_account_file}") + return BetaAnalyticsDataClient() + +# ------------------------ +# config.yaml에 최대 rows 저장 +# ------------------------ +def update_config_file_with_max_rows(max_rows): + with open(CONFIG_PATH, encoding="utf-8") as f: + config = yaml.safe_load(f) + + if "ga4" not in config: + config["ga4"] = {} + config["ga4"]["max_rows_per_request"] = max_rows + + with open(CONFIG_PATH, "w", encoding="utf-8") as f: + yaml.dump(config, f, allow_unicode=True) + + print(f"[INFO] config.yaml에 max_rows_per_request = {max_rows} 저장 완료") + +# ------------------------ +# GA4 API로 최대 rows 감지 +# ------------------------ +def detect_max_rows_supported(client, property_id): + try: + request = RunReportRequest( + property=f"properties/{property_id}", + dimensions=[Dimension(name="date")], + metrics=[Metric(name="sessions")], + date_ranges=[DateRange(start_date="2024-01-01", end_date="2024-12-31")], + limit=100000 + ) + response = client.run_report(request) + print(f"[INFO] 최대 rows 감지: {len(response.rows)} rows 수신됨.") + return len(response.rows) + except Exception as e: + print(f"[WARNING] 최대 rows 감지 실패: {e}") + return 10000 + +# ------------------------ +# GA4 데이터 요청 +# ------------------------ +def fetch_report(client, property_id, start_date, end_date, dimensions, metrics, limit=10000): + request = RunReportRequest( + property=f"properties/{property_id}", + dimensions=[Dimension(name=d) for d in dimensions], + metrics=[Metric(name=m) for m in metrics], + date_ranges=[DateRange(start_date=start_date, end_date=end_date)], + limit=limit, + ) + response = client.run_report(request) + print(f"[INFO] GA4 리포트 응답 받음: {len(response.rows)} rows - dims={dimensions}, metrics={metrics}") + return response + +# ------------------------ +# 응답 데이터를 DB에 저장 +# ------------------------ +def save_report_to_db(engine, table, response, dimension_names, metric_names, debug=False): + with engine.begin() as conn: + for row in response.rows: + dims = row.dimension_values + mets = row.metric_values + data = {} + for i, dim_name in enumerate(dimension_names): + val = dims[i].value + if dim_name == "date": + if len(val) == 8: + val = datetime.strptime(val, "%Y%m%d").date() + else: + try: + val = datetime.strptime(val, "%Y-%m-%d").date() + except: + pass + data[dim_name] = val + + for i, met_name in enumerate(metric_names): + try: + data[met_name] = int(mets[i].value) + except: + data[met_name] = None + + stmt = mysql_insert(table).values(**data) + stmt = stmt.on_duplicate_key_update(**data) + + if debug: + print(f"[DEBUG] 저장할 데이터:") + pprint.pprint(data, indent=2, width=80) + print(f"[DEBUG] 실행할 쿼리:") + print(stmt) + print("-" * 60) + else: + try: + conn.execute(stmt) + except IntegrityError as e: + print(f"[DB ERROR] 중복 오류 또는 기타: {e}") + except Exception as e: + print(f"[DB ERROR] 저장 실패: {e}") + +# ------------------------ +# 테이블에서 마지막 날짜 조회 +# ------------------------ +def get_latest_date_from_db(engine, table): + with engine.connect() as conn: + stmt = select(func.max(table.c.date)) + result = conn.execute(stmt).scalar() + return result + +# ------------------------ +# 요청 범위를 최대 rows 기준으로 나눔 +# ------------------------ +def date_range_chunks(start_date, end_date, max_rows_per_request, avg_rows_per_day=500): + chunk_days = max(1, max_rows_per_request // avg_rows_per_day) + current_start = start_date + while current_start <= end_date: + current_end = min(current_start + timedelta(days=chunk_days - 1), end_date) + yield current_start, current_end + current_start = current_end + timedelta(days=1) + +# ------------------------ +# 날짜 범위 결정 로직 +# ------------------------ +def determine_date_range(table, config_start, config_end, force_update, engine): + yesterday = datetime.now().date() - timedelta(days=1) + actual_end = min(yesterday, config_end) + latest_db_date = get_latest_date_from_db(engine, table) + if latest_db_date and not force_update: + actual_start = latest_db_date + timedelta(days=1) + else: + actual_start = config_start + return actual_start, actual_end + +# ------------------------ +# 단일 테이블 단위 데이터 처리 +# ------------------------ +def process_dimension_metric(engine, client, property_id, config, table, dims, mets, max_rows, debug=False, force_update=False): + config_start = datetime.strptime(config.get("startDt", "20230101"), "%Y%m%d").date() + config_end = datetime.strptime(config.get("endDt", datetime.now().strftime("%Y%m%d")), "%Y%m%d").date() + actual_start, actual_end = determine_date_range(table, config_start, config_end, force_update, engine) + + if actual_start > actual_end: + print(f"[INFO] 이미 모든 데이터가 수집되어 있습니다: {actual_start} > {actual_end}") + return + + for start_dt, end_dt in date_range_chunks(actual_start, actual_end, max_rows): + start_str = start_dt.strftime('%Y-%m-%d') + end_str = end_dt.strftime('%Y-%m-%d') + print(f"[INFO] GA4 데이터 조회: {start_str} ~ {end_str}") + response = fetch_report(client, property_id, start_str, end_str, dimensions=dims, metrics=mets, limit=max_rows) + if len(response.rows) > 0: + save_report_to_db(engine, table, response, dimension_names=dims, metric_names=mets, debug=debug) + else: + print(f"[INFO] 해당 기간 {start_str} ~ {end_str} 데이터 없음") + +# ------------------------ +# 메인 진입점 (병렬 처리 포함) +# ------------------------ +def main(): + config = load_config() + ga4_cfg = config.get('ga4', {}) + service_account_file = ga4_cfg.get('service_account_file') + property_id = ga4_cfg.get('property_id') + debug = config.get('debug', False) + force_update = config.get('force_update', False) + max_workers = config.get('max_workers', 4) + + if not service_account_file or not property_id: + print("[ERROR] config.yaml에 'ga4.service_account_file'과 'ga4.property_id'를 반드시 설정하세요.") + return + + engine = db.engine + client = init_ga_client(service_account_file) + + max_rows = ga4_cfg.get("max_rows_per_request") + if not max_rows: + max_rows = detect_max_rows_supported(client, property_id) + update_config_file_with_max_rows(max_rows) + + tasks = [ + (db_schema.fg_manager_static_ga4_by_date, ["date"], ["activeUsers", "screenPageViews", "sessions"]), + (db_schema.fg_manager_static_ga4_by_source, ["date", "sessionSource"], ["sessions"]), + (db_schema.fg_manager_static_ga4_by_medium, ["date", "sessionMedium"], ["sessions"]), + (db_schema.fg_manager_static_ga4_by_device, ["date", "deviceCategory"], ["activeUsers"]), + (db_schema.fg_manager_static_ga4_by_country, ["date", "country"], ["activeUsers"]), + (db_schema.fg_manager_static_ga4_by_city, ["date", "city"], ["activeUsers"]) + ] + + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [ + executor.submit(process_dimension_metric, engine, client, property_id, ga4_cfg, + table, dims, mets, max_rows, debug, force_update) + for table, dims, mets in tasks + ] + for future in as_completed(futures): + future.result() + + print("[INFO] GA4 데이터 수집 및 저장 완료") + +if __name__ == '__main__': + main() \ No newline at end of file