Files
static/app/app.py

132 lines
4.3 KiB
Python

import os, sys
from flask import Flask, render_template, request, jsonify
from sqlalchemy import select, func, between, and_, or_
from datetime import datetime, timedelta
import json
# 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
app = Flask(__name__)
engine = db.engine
pos_table = db_schema.pos
@app.route('/')
def index():
today = datetime.today().date()
end_date = today - timedelta(days=1)
start_date = end_date - timedelta(days=6)
return render_template('index.html', start_date=start_date, end_date=end_date)
@app.route('/api/ca01_list')
def ca01_list():
with engine.connect() as conn:
result = conn.execute(
select(pos_table.c.ca01).distinct().order_by(pos_table.c.ca01)
).scalars().all()
return jsonify(['전체'] + result)
@app.route('/api/ca03_list')
def ca03_list():
ca01 = request.args.get('ca01', None)
with engine.connect() as conn:
query = select(pos_table.c.ca03).distinct().order_by(pos_table.c.ca03)
if ca01 and ca01 != '전체':
query = query.where(pos_table.c.ca01 == ca01)
result = conn.execute(query).scalars().all()
return jsonify(['전체'] + result)
@app.route('/search', methods=['GET'])
def search():
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
ca01 = request.args.get('ca01')
ca03 = request.args.get('ca03')
conditions = [between(pos_table.c.date, start_date, end_date)]
if ca01 and ca01 != '전체':
conditions.append(pos_table.c.ca01 == ca01)
if ca03 and ca03 != '전체':
conditions.append(pos_table.c.ca03 == ca03)
with engine.connect() as conn:
stmt = select(
pos_table.c.ca01,
pos_table.c.ca02,
pos_table.c.ca03,
pos_table.c.name,
func.sum(pos_table.c.qty).label("qty"),
func.sum(pos_table.c.tot_amount).label("tot_amount"),
func.sum(pos_table.c.tot_discount).label("tot_discount"),
func.sum(pos_table.c.actual_amount).label("actual_amount")
).where(*conditions).group_by(pos_table.c.barcode)
result = conn.execute(stmt).mappings().all()
return jsonify([dict(row) for row in result])
# 월별 데이터 불러오기
def get_monthly_visitor_data(ca01_keywords=None, ca03_includes=None):
from collections import defaultdict
from decimal import Decimal
ca01_keywords = ca01_keywords or ['매표소']
ca03_includes = ca03_includes or ['입장료', '티켓', '기업제휴']
pos = db_schema.pos
session = db.get_session()
# 필터 조건
ca01_conditions = [pos.c.ca01.like(f'%{kw}%') for kw in ca01_keywords]
conditions = [or_(*ca01_conditions), pos.c.ca03.in_(ca03_includes)]
# 연도별 월별 합계 쿼리
query = (
session.query(
func.year(pos.c.date).label('year'),
func.month(pos.c.date).label('month'),
func.sum(pos.c.qty).label('qty')
)
.filter(and_(*conditions))
.group_by(func.year(pos.c.date), func.month(pos.c.date))
.order_by(func.year(pos.c.date), func.month(pos.c.date))
)
result = query.all()
session.close()
# 결과 가공: {년도: [1~12월 값]} 형태
data = defaultdict(lambda: [0]*12)
for row in result:
year = int(row.year)
month = int(row.month)
qty = int(row.qty or 0) if isinstance(row.qty, Decimal) else row.qty or 0
data[year][month - 1] = qty
# Dict → 일반 dict 정렬
return dict(sorted(data.items()))
@app.route('/monthly_view.html')
def monthly_view():
visitor_data = get_monthly_visitor_data()
visitor_data_json = json.dumps(visitor_data) # JSON 문자열로 변환
return render_template('monthly_view.html', visitor_data=visitor_data_json)
from lib.weekly_visitor_forecast_prophet import get_forecast_dict
from lib.weekly_visitor_forecast import get_recent_dataframe, get_last_year_dataframe
@app.route('/2weeks_view')
def view_2weeks():
df_recent = get_recent_dataframe()
df_prev = get_last_year_dataframe()
return render_template(
'2weeks_view.html',
recent_data=df_recent.to_dict(orient='records'),
lastyear_data=df_prev.to_dict(orient='records')
)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')