날짜 조회 함수에서 최근 날짜가 앞에 오는 현상이 생기는 경우 순서를 바꿔서 수집하도록 변경.
This commit is contained in:
50
lib/ga4.py
50
lib/ga4.py
@ -4,6 +4,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
import yaml
|
||||
import pprint
|
||||
from datetime import datetime, timedelta
|
||||
from dateutil.parser import parse
|
||||
from google.analytics.data import BetaAnalyticsDataClient
|
||||
from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest
|
||||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||
@ -74,6 +75,7 @@ def detect_max_rows_supported(client, property_id):
|
||||
# GA4 데이터 요청
|
||||
# ------------------------
|
||||
def fetch_report(client, property_id, start_date, end_date, dimensions, metrics, limit=10000):
|
||||
print(f"[INFO] fetch_report 호출 - 기간: {start_date} ~ {end_date}, dims={dimensions}, metrics={metrics}")
|
||||
request = RunReportRequest(
|
||||
property=f"properties/{property_id}",
|
||||
dimensions=[Dimension(name=d) for d in dimensions],
|
||||
@ -82,7 +84,7 @@ def fetch_report(client, property_id, start_date, end_date, dimensions, metrics,
|
||||
limit=limit,
|
||||
)
|
||||
response = client.run_report(request)
|
||||
print(f"[INFO] GA4 리포트 응답 받음: {len(response.rows)} rows - dims={dimensions}, metrics={metrics}")
|
||||
print(f"[INFO] GA4 리포트 응답 받음: {len(response.rows)} rows")
|
||||
return response
|
||||
|
||||
# ------------------------
|
||||
@ -95,21 +97,22 @@ def save_report_to_db(engine, table, response, dimension_names, metric_names, de
|
||||
mets = row.metric_values
|
||||
data = {}
|
||||
for i, dim_name in enumerate(dimension_names):
|
||||
val = dims[i].value
|
||||
if dim_name == "date":
|
||||
if len(val) == 8:
|
||||
val = datetime.strptime(val, "%Y%m%d").date()
|
||||
else:
|
||||
try:
|
||||
val = datetime.strptime(val, "%Y-%m-%d").date()
|
||||
except:
|
||||
pass
|
||||
data[dim_name] = val
|
||||
try:
|
||||
val = dims[i].value
|
||||
if dim_name == "date":
|
||||
if len(val) == 8:
|
||||
val = datetime.strptime(val, "%Y%m%d").date()
|
||||
else:
|
||||
val = parse(val).date()
|
||||
data[dim_name] = val
|
||||
except IndexError:
|
||||
print(f"[WARNING] dimension index {i} 초과: dims={dims}")
|
||||
|
||||
for i, met_name in enumerate(metric_names):
|
||||
try:
|
||||
data[met_name] = int(mets[i].value)
|
||||
except:
|
||||
except (IndexError, ValueError):
|
||||
print(f"[WARNING] metric 처리 오류: {met_name}")
|
||||
data[met_name] = None
|
||||
|
||||
stmt = mysql_insert(table).values(**data)
|
||||
@ -136,6 +139,7 @@ def get_latest_date_from_db(engine, table):
|
||||
with engine.connect() as conn:
|
||||
stmt = select(func.max(table.c.date))
|
||||
result = conn.execute(stmt).scalar()
|
||||
print(f"[INFO] DB 기준 마지막 저장 날짜: {result}")
|
||||
return result
|
||||
|
||||
# ------------------------
|
||||
@ -165,21 +169,25 @@ def determine_date_range(table, config_start, config_end, force_update, engine):
|
||||
else:
|
||||
actual_start = config_start
|
||||
|
||||
# 시작일이 종료일보다 뒤에 있으면 자동 교체
|
||||
if actual_start > actual_end:
|
||||
return None, None
|
||||
print(f"[WARN] 시작일({actual_start})이 종료일({actual_end})보다 뒤에 있습니다. 날짜를 교환하여 수집을 계속합니다.")
|
||||
actual_start, actual_end = actual_end, actual_start
|
||||
|
||||
print(f"[INFO] 수집 날짜 범위 결정: {actual_start} ~ {actual_end}")
|
||||
return actual_start, actual_end
|
||||
|
||||
|
||||
# ------------------------
|
||||
# 단일 테이블 단위 데이터 처리
|
||||
# ------------------------
|
||||
def process_dimension_metric(engine, client, property_id, config, table, dims, mets, max_rows, debug=False, force_update=False):
|
||||
config_start = datetime.strptime(config.get("startDt", "20230101"), "%Y%m%d").date()
|
||||
config_end = datetime.strptime(config.get("endDt", datetime.now().strftime("%Y%m%d")), "%Y%m%d").date()
|
||||
config_start = parse(config.get("startDt", "2023-01-01")).date()
|
||||
config_end = parse(config.get("endDt", datetime.now().strftime("%Y-%m-%d")).strip()).date()
|
||||
actual_start, actual_end = determine_date_range(table, config_start, config_end, force_update, engine)
|
||||
|
||||
if actual_start is None or actual_end is None:
|
||||
print(f"[INFO] 이미 모든 데이터가 수집되어 있습니다 또는 수집 범위가 없습니다.")
|
||||
print(f"[INFO] 모든 데이터가 이미 수집되었거나 수집 범위가 없습니다.")
|
||||
return
|
||||
|
||||
for start_dt, end_dt in date_range_chunks(actual_start, actual_end, max_rows):
|
||||
@ -196,6 +204,7 @@ def process_dimension_metric(engine, client, property_id, config, table, dims, m
|
||||
# 메인 진입점 (병렬 처리 포함)
|
||||
# ------------------------
|
||||
def main():
|
||||
print("[INFO] GA4 수집 프로그램 시작")
|
||||
config = load_config()
|
||||
ga4_cfg = config.get('ga4', {})
|
||||
service_account_file = ga4_cfg.get('service_account_file')
|
||||
@ -215,6 +224,7 @@ def main():
|
||||
if not max_rows:
|
||||
max_rows = detect_max_rows_supported(client, property_id)
|
||||
update_config_file_with_max_rows(max_rows)
|
||||
print(f"[INFO] 설정된 max_rows_per_request = {max_rows}")
|
||||
|
||||
tasks = [
|
||||
(db_schema.ga4_by_date, ["date"], ["activeUsers", "screenPageViews", "sessions"]),
|
||||
@ -231,8 +241,12 @@ def main():
|
||||
table, dims, mets, max_rows, debug, force_update)
|
||||
for table, dims, mets in tasks
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
future.result()
|
||||
for i, future in enumerate(as_completed(futures)):
|
||||
try:
|
||||
future.result()
|
||||
print(f"[INFO] 태스크 {i} 완료")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 태스크 {i} 실패: {e}")
|
||||
|
||||
print("[INFO] GA4 데이터 수집 및 저장 완료")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user