import sys, os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import yaml import pprint from datetime import datetime, timedelta from google.analytics.data import BetaAnalyticsDataClient from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import IntegrityError from sqlalchemy import select, func from concurrent.futures import ThreadPoolExecutor, as_completed from conf import db, db_schema CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml") # ------------------------ # 설정 파일 로드 # ------------------------ def load_config(): with open(CONFIG_PATH, encoding="utf-8") as f: cfg = yaml.safe_load(f) sa_file = cfg.get('ga4', {}).get('service_account_file') if sa_file and not os.path.isabs(sa_file): base_dir = os.path.dirname(CONFIG_PATH) cfg['ga4']['service_account_file'] = os.path.abspath(os.path.join(base_dir, sa_file)) return cfg # ------------------------ # GA4 클라이언트 초기화 # ------------------------ def init_ga_client(service_account_file): os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file print(f"[INFO] GA4 클라이언트 초기화 - 인증파일: {service_account_file}") return BetaAnalyticsDataClient() # ------------------------ # config.yaml에 최대 rows 저장 # ------------------------ def update_config_file_with_max_rows(max_rows): with open(CONFIG_PATH, encoding="utf-8") as f: config = yaml.safe_load(f) if "ga4" not in config: config["ga4"] = {} config["ga4"]["max_rows_per_request"] = max_rows with open(CONFIG_PATH, "w", encoding="utf-8") as f: yaml.dump(config, f, allow_unicode=True) print(f"[INFO] config.yaml에 max_rows_per_request = {max_rows} 저장 완료") # ------------------------ # GA4 API로 최대 rows 감지 # ------------------------ def detect_max_rows_supported(client, property_id): try: request = RunReportRequest( property=f"properties/{property_id}", dimensions=[Dimension(name="date")], metrics=[Metric(name="sessions")], date_ranges=[DateRange(start_date="2024-01-01", end_date="2024-12-31")], limit=100000 ) response = client.run_report(request) print(f"[INFO] 최대 rows 감지: {len(response.rows)} rows 수신됨.") return len(response.rows) except Exception as e: print(f"[WARNING] 최대 rows 감지 실패: {e}") return 10000 # ------------------------ # GA4 데이터 요청 # ------------------------ def fetch_report(client, property_id, start_date, end_date, dimensions, metrics, limit=10000): request = RunReportRequest( property=f"properties/{property_id}", dimensions=[Dimension(name=d) for d in dimensions], metrics=[Metric(name=m) for m in metrics], date_ranges=[DateRange(start_date=start_date, end_date=end_date)], limit=limit, ) response = client.run_report(request) print(f"[INFO] GA4 리포트 응답 받음: {len(response.rows)} rows - dims={dimensions}, metrics={metrics}") return response # ------------------------ # 응답 데이터를 DB에 저장 # ------------------------ def save_report_to_db(engine, table, response, dimension_names, metric_names, debug=False): with engine.begin() as conn: for row in response.rows: dims = row.dimension_values mets = row.metric_values data = {} for i, dim_name in enumerate(dimension_names): val = dims[i].value if dim_name == "date": if len(val) == 8: val = datetime.strptime(val, "%Y%m%d").date() else: try: val = datetime.strptime(val, "%Y-%m-%d").date() except: pass data[dim_name] = val for i, met_name in enumerate(metric_names): try: data[met_name] = int(mets[i].value) except: data[met_name] = None stmt = mysql_insert(table).values(**data) stmt = stmt.on_duplicate_key_update(**data) if debug: print(f"[DEBUG] 저장할 데이터:") pprint.pprint(data, indent=2, width=80) print(f"[DEBUG] 실행할 쿼리:") print(stmt) print("-" * 60) else: try: conn.execute(stmt) except IntegrityError as e: print(f"[DB ERROR] 중복 오류 또는 기타: {e}") except Exception as e: print(f"[DB ERROR] 저장 실패: {e}") # ------------------------ # 테이블에서 마지막 날짜 조회 # ------------------------ def get_latest_date_from_db(engine, table): with engine.connect() as conn: stmt = select(func.max(table.c.date)) result = conn.execute(stmt).scalar() return result # ------------------------ # 요청 범위를 최대 rows 기준으로 나눔 # ------------------------ def date_range_chunks(start_date, end_date, max_rows_per_request, avg_rows_per_day=500): chunk_days = max(1, max_rows_per_request // avg_rows_per_day) current_start = start_date while current_start <= end_date: current_end = min(current_start + timedelta(days=chunk_days - 1), end_date) yield current_start, current_end current_start = current_end + timedelta(days=1) # ------------------------ # 날짜 범위 결정 로직 # ------------------------ def determine_date_range(table, config_start, config_end, force_update, engine): yesterday = datetime.now().date() - timedelta(days=1) actual_end = min(yesterday, config_end) latest_db_date = get_latest_date_from_db(engine, table) if latest_db_date and not force_update: actual_start = latest_db_date + timedelta(days=1) else: actual_start = config_start return actual_start, actual_end # ------------------------ # 단일 테이블 단위 데이터 처리 # ------------------------ def process_dimension_metric(engine, client, property_id, config, table, dims, mets, max_rows, debug=False, force_update=False): config_start = datetime.strptime(config.get("startDt", "20230101"), "%Y%m%d").date() config_end = datetime.strptime(config.get("endDt", datetime.now().strftime("%Y%m%d")), "%Y%m%d").date() actual_start, actual_end = determine_date_range(table, config_start, config_end, force_update, engine) if actual_start > actual_end: print(f"[INFO] 이미 모든 데이터가 수집되어 있습니다: {actual_start} > {actual_end}") return for start_dt, end_dt in date_range_chunks(actual_start, actual_end, max_rows): start_str = start_dt.strftime('%Y-%m-%d') end_str = end_dt.strftime('%Y-%m-%d') print(f"[INFO] GA4 데이터 조회: {start_str} ~ {end_str}") response = fetch_report(client, property_id, start_str, end_str, dimensions=dims, metrics=mets, limit=max_rows) if len(response.rows) > 0: save_report_to_db(engine, table, response, dimension_names=dims, metric_names=mets, debug=debug) else: print(f"[INFO] 해당 기간 {start_str} ~ {end_str} 데이터 없음") # ------------------------ # 메인 진입점 (병렬 처리 포함) # ------------------------ def main(): config = load_config() ga4_cfg = config.get('ga4', {}) service_account_file = ga4_cfg.get('service_account_file') property_id = ga4_cfg.get('property_id') debug = config.get('debug', False) force_update = config.get('force_update', False) max_workers = config.get('max_workers', 4) if not service_account_file or not property_id: print("[ERROR] config.yaml에 'ga4.service_account_file'과 'ga4.property_id'를 반드시 설정하세요.") return engine = db.engine client = init_ga_client(service_account_file) max_rows = ga4_cfg.get("max_rows_per_request") if not max_rows: max_rows = detect_max_rows_supported(client, property_id) update_config_file_with_max_rows(max_rows) tasks = [ (db_schema.fg_manager_static_ga4_by_date, ["date"], ["activeUsers", "screenPageViews", "sessions"]), (db_schema.fg_manager_static_ga4_by_source, ["date", "sessionSource"], ["sessions"]), (db_schema.fg_manager_static_ga4_by_medium, ["date", "sessionMedium"], ["sessions"]), (db_schema.fg_manager_static_ga4_by_device, ["date", "deviceCategory"], ["activeUsers"]), (db_schema.fg_manager_static_ga4_by_country, ["date", "country"], ["activeUsers"]), (db_schema.fg_manager_static_ga4_by_city, ["date", "city"], ["activeUsers"]) ] with ThreadPoolExecutor(max_workers=4) as executor: futures = [ executor.submit(process_dimension_metric, engine, client, property_id, ga4_cfg, table, dims, mets, max_rows, debug, force_update) for table, dims, mets in tasks ] for future in as_completed(futures): future.result() print("[INFO] GA4 데이터 수집 및 저장 완료") if __name__ == '__main__': main()