28 Commits

Author SHA1 Message Date
ea70cbcf82 크롤링은 사용하기 어려움. 제외함. 2025-07-29 15:56:10 +09:00
4e22744adf 크롤링 시 브라우저 헤더를 선언해줌 2025-07-29 15:55:15 +09:00
c41bf82e58 DB 형테 메모 2025-07-29 15:55:01 +09:00
cb3b152217 데이터 삽입 방식을 배치로 변경하여 처리 속도 향상 2025-07-29 15:54:53 +09:00
ac54673983 데이터가 정상적으로 들어가지 않는 부분 수정, 영수증번호와 품명, 수량이 동일한 경우 중복값으로 인식시키고 덮어씌우도록 수정함(일부 데이터가 중복 데이터가 존재) 2025-07-29 15:49:08 +09:00
bf44f13a51 처리 과정에 대한 로그를 콘솔에 출력하기 위한 부분 추가 2025-07-29 14:29:54 +09:00
9abc760d7b 포스의 영수증데이터를 기반으로 mariadb에 데이터를 넣는 작업 2025-07-29 14:24:17 +09:00
3a15b938f2 db 스키마 업데이트 2025-07-29 14:23:59 +09:00
39046f20a5 upsolution pos 데이터 크롤링 - 클라우드플레어 캡차로 인해 실패 2025-07-29 11:33:16 +09:00
d539ffa626 정상 작동하지 않아 복구함 2025-07-29 10:27:25 +09:00
45610c9ea0 파일 감시 기능 강화 2025-07-28 17:12:28 +09:00
e607a9fdf2 컨테이너 구동용 파일 추가 2025-07-28 17:12:19 +09:00
7d2b155aa9 Merge branch 'master' into Dockerfile 2025-07-28 16:27:08 +09:00
1927ad91e6 GUI 기능은 사용하지 않으므로 제거함 2025-07-28 16:26:41 +09:00
2bbbe12abd pos_update_gui.py > pos_update_daily_product.py 로 입력할 파일에 맞춰 파일명 변경 2025-07-28 16:23:55 +09:00
6f2b9bc53e 중복되는 삭제 로직 제거 2025-07-28 16:20:41 +09:00
b362edeca0 pos_update_gui 파일도 동일하게 모니터링하도록 변경 2025-07-28 16:20:31 +09:00
9e52e74ded 오류 수정 2025-07-28 16:17:31 +09:00
29319cb12c ./data 폴더를 모니터랑 하고, 새 파일이 생기면 일치하는 파일 형식인지 찾은 후 데이터를 파싱해서 DB에 저장 2025-07-28 16:17:24 +09:00
1e275d2ac7 gui 실행부분 제거 2025-07-28 16:16:45 +09:00
77459587a7 db 엔진 사용 처리 2025-07-28 13:40:44 +09:00
f0362cbbd2 접두어 일부가 포함된 부분 제거 2025-07-28 13:40:34 +09:00
05e3d142cb 로그 출력 기능 2025-07-28 13:40:22 +09:00
fc2b579ce7 영수증데이터 업데이트를 위한 파일 추가. DB세팅 등 2025-07-28 13:15:33 +09:00
ed1e6f98d3 평일만/주말만 데이터 조회 기능 추가 2025-07-28 13:15:15 +09:00
2fdd2b38f7 평일/휴일 구분 업데이트 2025-07-28 13:14:59 +09:00
594bcd0897 Dockerfile update 2025-07-21 17:41:38 +09:00
3e9271517e readme update 2025-07-21 17:41:15 +09:00
18 changed files with 851 additions and 591 deletions

View File

@ -20,31 +20,34 @@
## 폴더 구조
```bash
project-root/
├── app/ # 🔹 웹 프론트엔드 및 Flask 서버
│ ├── templates/ # HTML 템플릿 (Jinja2)
├── app/ # 🔹 웹 프론트엔드 및 Flask 서버
│ ├── templates/ # HTML 템플릿 (Jinja2)
│ │ └── index.html
│ ├── static/ # (선택) JS, CSS 파일
│ └── app.py # Flask 애플리케이션 진입점
│ ├── static/ # (선택) JS, CSS 파일
│ └── app.py # Flask 애플리케이션 진입점
├── build/ # 🔹 Docker 빌드 전용 디렉토리
│ ├── Dockerfile # Ubuntu 22.04 기반 Dockerfile
│ ├── requirements.txt # Python 의존성
├── build/ # 🔹 Docker 빌드 전용 디렉토리
│ ├── Dockerfile # Ubuntu 22.04 기반 Dockerfile
│ ├── requirements.txt # Python 의존성
│ └── (선택) run.sh / build.sh 등 실행 스크립트
├── conf/ # 🔹 설정 및 DB 정의
│ ├── config.yaml # 설정 파일 (DB 접속 등)
│ ├── db.py # SQLAlchemy 연결 설정
│ └── db_schema.py # 테이블 정의 (SQLAlchemy metadata)
├── conf/ # 🔹 설정 및 DB 정의
│ ├── config.yaml # 설정 파일 (DB 접속 등)
│ ├── db.py # SQLAlchemy 연결 설정
│ └── db_schema.py # 테이블 정의 (SQLAlchemy metadata)
├── lib/ # 🔹 데이터 처리 및 백엔드 로직
│ ├── pos_view_gui.py # 기존 Tkinter GUI (조회용)
│ ├── pos_update_gui.py # 기존 Tkinter GUI (업데이트용)
│ ├── air_quality.py # 대기환경 API 수집
│ ├── ga4.py # GA4 수집 스크립트
── weather_asos.py # 기상청 ASOS 수집
├── data/ # 🔹 데이터 저장 및 엑셀 업로드 디렉토리
├── lib/ # 🔹 데이터 처리 및 백엔드 로직
│ ├── common.py # 중복 함수들을 처리
│ ├── pos_view_gui.py # 기존 Tkinter GUI (조회용)
│ ├── pos_update_gui.py # 기존 Tkinter GUI (업데이트용)
│ ├── air_quality.py # 대기환경 API 수집
── ga4.py # GA4 수집 스크립트
│ ├── weather_asos.py # 기상청 ASOS 수집
│ ├── weekly_visitor_forecast.py # GA4 수집 스크립트
│ ├── weekly_visitor_forecast_prophet.py # GA4 수집 스크립트
│ └──
├── data/ # 🔹 데이터 저장 및 엑셀 업로드 디렉토리
│ └── (엑셀 파일들, 일자별 상품별 파일 등)
└── .gitignore (선택)
├── .gitignore
└── README.md
```

View File

@ -1,131 +0,0 @@
import os, sys
from flask import Flask, render_template, request, jsonify
from sqlalchemy import select, func, between, and_, or_
from datetime import datetime, timedelta
import json
# 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
app = Flask(__name__)
engine = db.engine
pos_table = db_schema.pos
@app.route('/')
def index():
today = datetime.today().date()
end_date = today - timedelta(days=1)
start_date = end_date - timedelta(days=6)
return render_template('index.html', start_date=start_date, end_date=end_date)
@app.route('/api/ca01_list')
def ca01_list():
with engine.connect() as conn:
result = conn.execute(
select(pos_table.c.ca01).distinct().order_by(pos_table.c.ca01)
).scalars().all()
return jsonify(['전체'] + result)
@app.route('/api/ca03_list')
def ca03_list():
ca01 = request.args.get('ca01', None)
with engine.connect() as conn:
query = select(pos_table.c.ca03).distinct().order_by(pos_table.c.ca03)
if ca01 and ca01 != '전체':
query = query.where(pos_table.c.ca01 == ca01)
result = conn.execute(query).scalars().all()
return jsonify(['전체'] + result)
@app.route('/search', methods=['GET'])
def search():
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
ca01 = request.args.get('ca01')
ca03 = request.args.get('ca03')
conditions = [between(pos_table.c.date, start_date, end_date)]
if ca01 and ca01 != '전체':
conditions.append(pos_table.c.ca01 == ca01)
if ca03 and ca03 != '전체':
conditions.append(pos_table.c.ca03 == ca03)
with engine.connect() as conn:
stmt = select(
pos_table.c.ca01,
pos_table.c.ca02,
pos_table.c.ca03,
pos_table.c.name,
func.sum(pos_table.c.qty).label("qty"),
func.sum(pos_table.c.tot_amount).label("tot_amount"),
func.sum(pos_table.c.tot_discount).label("tot_discount"),
func.sum(pos_table.c.actual_amount).label("actual_amount")
).where(*conditions).group_by(pos_table.c.barcode)
result = conn.execute(stmt).mappings().all()
return jsonify([dict(row) for row in result])
# 월별 데이터 불러오기
def get_monthly_visitor_data(ca01_keywords=None, ca03_includes=None):
from collections import defaultdict
from decimal import Decimal
ca01_keywords = ca01_keywords or ['매표소']
ca03_includes = ca03_includes or ['입장료', '티켓', '기업제휴']
pos = db_schema.pos
session = db.get_session()
# 필터 조건
ca01_conditions = [pos.c.ca01.like(f'%{kw}%') for kw in ca01_keywords]
conditions = [or_(*ca01_conditions), pos.c.ca03.in_(ca03_includes)]
# 연도별 월별 합계 쿼리
query = (
session.query(
func.year(pos.c.date).label('year'),
func.month(pos.c.date).label('month'),
func.sum(pos.c.qty).label('qty')
)
.filter(and_(*conditions))
.group_by(func.year(pos.c.date), func.month(pos.c.date))
.order_by(func.year(pos.c.date), func.month(pos.c.date))
)
result = query.all()
session.close()
# 결과 가공: {년도: [1~12월 값]} 형태
data = defaultdict(lambda: [0]*12)
for row in result:
year = int(row.year)
month = int(row.month)
qty = int(row.qty or 0) if isinstance(row.qty, Decimal) else row.qty or 0
data[year][month - 1] = qty
# Dict → 일반 dict 정렬
return dict(sorted(data.items()))
@app.route('/monthly_view.html')
def monthly_view():
visitor_data = get_monthly_visitor_data()
visitor_data_json = json.dumps(visitor_data) # JSON 문자열로 변환
return render_template('monthly_view.html', visitor_data=visitor_data_json)
from lib.weekly_visitor_forecast_prophet import get_forecast_dict
from lib.weekly_visitor_forecast import get_recent_dataframe, get_last_year_dataframe
@app.route('/2weeks_view')
def view_2weeks():
df_recent = get_recent_dataframe()
df_prev = get_last_year_dataframe()
return render_template(
'2weeks_view.html',
recent_data=df_recent.to_dict(orient='records'),
lastyear_data=df_prev.to_dict(orient='records')
)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')

View File

@ -1,61 +0,0 @@
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8" />
<title>월별 입장객 현황</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet" />
</head>
<body class="p-4">
<section class="2weeks_visitor_detail">
<h2>직전 2주간 방문객 현황 상세 내역</h2>
<table class="table table-bordered text-center align-middle">
<thead>
<tr>
<th colspan="2">구분</th>
<th colspan="{{ dates|length }}">방문현황</th>
<th rowspan="2">합계/평균</th>
<th colspan="3">예상</th>
</tr>
<tr>
<th>년도</th>
<th>항목</th>
{% for d in dates %}
<th>{{ d.strftime('%-m/%-d') }}</th>
{% endfor %}
<th>1일</th>
<th>2일</th>
<th>3일</th>
</tr>
</thead>
<tbody>
{% for year_label, data_by_item in data.items() %}
{% set rowspan_val = data_by_item|length %}
{% for item_name, row in data_by_item.items() %}
<tr>
{% if loop.first %}
<th rowspan="{{ rowspan_val }}">{{ year_label }}</th>
{% endif %}
<th>{{ item_name }}</th>
{% for val in row.values_list %}
<td>{{ val }}</td>
{% endfor %}
<td>{{ row.total or '' }}</td>
{% if row.expected %}
{% for ex_val in row.expected %}
<td>{{ ex_val }}</td>
{% endfor %}
{% else %}
<td colspan="3"></td>
{% endif %}
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
</section>
</body>
</html>

View File

@ -1,145 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>POS 데이터 조회</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css">
</head>
<body class="p-4">
<h2>POS 데이터 조회</h2>
<form id="filterForm" class="row g-3">
<div class="col-md-3">
<label>시작일</label>
<input type="date" name="start_date" value="{{ start_date }}" class="form-control">
</div>
<div class="col-md-3">
<label>종료일</label>
<input type="date" name="end_date" value="{{ end_date }}" class="form-control">
</div>
<div class="col-md-2">
<label>대분류</label>
<select name="ca01" class="form-select">
<option value="전체">전체</option>
</select>
</div>
<div class="col-md-2">
<label>소분류</label>
<select name="ca03" class="form-select">
<option value="전체">전체</option>
</select>
</div>
<div class="col-md-2 align-self-end">
<button type="submit" class="btn btn-primary w-100">조회</button>
</div>
</form>
<hr>
<table class="table table-bordered mt-3" id="resultTable">
<thead>
<tr>
<th>대분류</th><th>중분류</th><th>소분류</th><th>상품명</th>
<th>수량</th><th>총매출액</th><th>총할인액</th><th>실매출액</th>
</tr>
</thead>
<tbody></tbody>
<tfoot>
<tr>
<th colspan="4" class="text-end">합계</th>
<th id="sum_qty">0</th>
<th id="sum_tot_amount">0</th>
<th id="sum_tot_discount">0</th>
<th id="sum_actual_amount">0</th>
</tr>
</tfoot>
</table>
<script>
async function fetchAndFillSelect(url, selectElem) {
const res = await fetch(url);
const list = await res.json();
selectElem.innerHTML = '';
list.forEach(val => {
const opt = document.createElement('option');
opt.value = val;
opt.textContent = val;
selectElem.appendChild(opt);
});
}
document.addEventListener('DOMContentLoaded', async () => {
const ca01Select = document.querySelector('select[name="ca01"]');
const ca03Select = document.querySelector('select[name="ca03"]');
// 대분류 목록 불러오기
await fetchAndFillSelect('/api/ca01_list', ca01Select);
// 대분류 변경 시 소분류 목록 갱신
ca01Select.addEventListener('change', async () => {
const selectedCa01 = ca01Select.value;
await fetchAndFillSelect(`/api/ca03_list?ca01=${encodeURIComponent(selectedCa01)}`, ca03Select);
});
// 초기 소분류 목록 불러오기
await fetchAndFillSelect('/api/ca03_list', ca03Select);
// 폼 제출 이벤트 처리
document.getElementById('filterForm').addEventListener('submit', async function(e) {
e.preventDefault();
const formData = new FormData(this);
const params = new URLSearchParams(formData);
const res = await fetch('/search?' + params.toString());
if (!res.ok) {
alert('조회 중 오류가 발생했습니다.');
return;
}
const data = await res.json();
const tbody = document.querySelector('#resultTable tbody');
tbody.innerHTML = '';
// 합계 초기화
let sum_qty = 0;
let sum_tot_amount = 0;
let sum_tot_discount = 0;
let sum_actual_amount = 0;
if (data.length === 0) {
tbody.innerHTML = `<tr><td colspan="8" class="text-center">조회 결과가 없습니다.</td></tr>`;
} else {
data.forEach(row => {
const tr = document.createElement('tr');
tr.innerHTML = `
<td>${row.ca01}</td>
<td>${row.ca02}</td>
<td>${row.ca03}</td>
<td>${row.name}</td>
<td>${row.qty}</td>
<td>${row.tot_amount}</td>
<td>${row.tot_discount}</td>
<td>${row.actual_amount}</td>
`;
tbody.appendChild(tr);
// 합계 계산
sum_qty += Number(row.qty) || 0;
sum_tot_amount += Number(row.tot_amount) || 0;
sum_tot_discount += Number(row.tot_discount) || 0;
sum_actual_amount += Number(row.actual_amount) || 0;
});
}
// 합계 출력
document.getElementById('sum_qty').textContent = sum_qty.toLocaleString();
document.getElementById('sum_tot_amount').textContent = sum_tot_amount.toLocaleString();
document.getElementById('sum_tot_discount').textContent = sum_tot_discount.toLocaleString();
document.getElementById('sum_actual_amount').textContent = sum_actual_amount.toLocaleString();
});
});
</script>
</body>
</html>

View File

@ -1,144 +0,0 @@
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8" />
<title>월별 입장객 현황</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet" />
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
.diff-positive { color: blue; }
.diff-negative { color: red; }
.diff-zero { color: gray; }
table th, table td { vertical-align: middle; }
table th { text-align: center; }
table td { text-align: right; } /* 숫자 우측 정렬 */
#visitorChart {
width: 90% ;
max-width: 1200px;
min-height: 600px;
margin: 30px auto;
}
</style>
</head>
<body class="p-4">
<h2>월별 입장객 현황 (2017년 ~ 현재)</h2>
<table class="table table-bordered table-sm">
<thead class="table-secondary">
<tr>
<th>연도</th>
<th>1월</th><th>2월</th><th>3월</th><th>4월</th><th>5월</th><th>6월</th>
<th>7월</th><th>8월</th><th>9월</th><th>10월</th><th>11월</th><th>12월</th>
<th>합계</th>
</tr>
</thead>
<tbody id="dataBody">
<!-- 데이터가 여기에 삽입됩니다 -->
</tbody>
</table>
<canvas id="visitorChart"></canvas>
<script>
const visitorData = JSON.parse('{{ visitor_data | safe }}');
function formatNumber(num) {
return num.toLocaleString('ko-KR');
}
function formatDiff(diff) {
if (diff > 0) return `<span class="diff-positive">(+${formatNumber(diff)})</span>`;
else if (diff < 0) return `<span class="diff-negative">(${formatNumber(diff)})</span>`;
else return `<span class="diff-zero">(0)</span>`;
}
function renderTable(data) {
const years = Object.keys(data).sort();
const tbody = document.getElementById('dataBody');
tbody.innerHTML = '';
years.forEach((year, idx) => {
const currYearData = data[year];
const prevYearData = idx > 0 ? data[years[idx-1]] : null;
let row = `<tr><th style="text-align:center">${year}</th>`;
let annualSum = 0;
for(let m=0; m<12; m++) {
const curr = currYearData[m] || 0;
annualSum += curr;
let diff = prevYearData ? (curr - (prevYearData[m] || 0)) : 0;
row += `<td>${formatNumber(curr)}<br>${formatDiff(diff)}</td>`;
}
row += `<th style="text-align:right">${formatNumber(annualSum)}</th></tr>`;
tbody.insertAdjacentHTML('beforeend', row);
});
}
function renderChart(data) {
const ctx = document.getElementById('visitorChart').getContext('2d');
const labels = ['1월','2월','3월','4월','5월','6월','7월','8월','9월','10월','11월','12월'];
// 연도를 숫자형으로 정렬
const years = Object.keys(data).map(Number).sort((a, b) => a - b);
// 각 연도별 데이터셋 생성
const datasets = years.map((year, i) => ({
label: year.toString(),
data: data[year],
backgroundColor: `hsla(${(i * 40) % 360}, 70%, 50%, 0.7)`,
borderColor: `hsla(${(i * 40) % 360}, 70%, 50%, 1)`,
borderWidth: 1
}));
const chartData = {
labels: labels,
datasets: datasets
};
new Chart(ctx, {
type: 'bar',
data: chartData,
options: {
responsive: true,
maintainAspectRatio: true,
scales: {
y: {
beginAtZero: true,
title: {
display: true,
text: '입장객 수'
},
ticks: {
callback: value => value.toLocaleString('ko-KR')
}
}
},
plugins: {
legend: {
position: 'top'
},
title: {
display: true,
text: '월별 입장객 수 (연도별)'
},
tooltip: {
callbacks: {
label: context => `입장객 수: ${context.parsed.y.toLocaleString('ko-KR')}`
}
}
}
}
});
}
document.addEventListener('DOMContentLoaded', () => {
renderTable(visitorData);
renderChart(visitorData);
});
</script>
</body>
</html>

47
build/Dockerfile Normal file
View File

@ -0,0 +1,47 @@
FROM python:3.10-slim
# 작업 디렉토리 설정
WORKDIR /app
# 시스템 패키지 설치
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
libsqlite3-dev \
libssl-dev \
libffi-dev \
libbz2-dev \
libreadline-dev \
libncurses5-dev \
libgdbm-dev \
liblzma-dev \
libtk8.6 \
tk8.6-dev \
tcl8.6-dev \
wget \
curl \
unzip \
git \
cron \
&& rm -rf /var/lib/apt/lists/*
# requirements 설치
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# 앱 전체 복사
COPY . .
# 환경 변수 설정
ENV PYTHONUNBUFFERED=1
# 크론 작업 등록: 매일 11시에 daily_run.py 실행
RUN echo "0 11 * * * python /app/daily_run.py >> /var/log/cron.log 2>&1" > /etc/cron.d/daily-cron \
&& chmod 0644 /etc/cron.d/daily-cron \
&& crontab /etc/cron.d/daily-cron
# 로그 파일 생성
RUN touch /var/log/cron.log
# 컨테이너 시작 시 cron 실행 + file_watch.py 실행 + 로그 출력 유지
CMD cron && python lib/file_watch.py & tail -f /var/log/cron.log

View File

@ -18,8 +18,16 @@ db_cfg = config['database']
db_url = f"mysql+pymysql://{db_cfg['user']}:{db_cfg['password']}@{db_cfg['host']}/{db_cfg['name']}?charset=utf8mb4"
# MySQL 연결이 끊겼을 때 자동 재시도 옵션 포함
engine = create_engine(db_url, pool_pre_ping=True)
engine = create_engine(
db_url,
pool_pre_ping=True,
pool_recycle=3600, # 3600초 = 1시간
)
Session = sessionmaker(bind=engine)
def get_engine():
return engine
def get_session():
return Session()

View File

@ -1,7 +1,7 @@
# db_schema.py
import os
import yaml
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint, Index
from sqlalchemy.sql import func
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
@ -164,6 +164,15 @@ ga4 = Table(
mysql_charset='utf8mb4'
)
holiday = Table(
get_full_table_name('holiday'), metadata,
Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'),
Column('name', String(50), nullable=False, comment='휴일명'),
Column('created_at', DateTime, server_default=func.now(), comment='등록일시'),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'),
comment='한국천문연구원 특일정보'
)
pos = Table(
get_full_table_name('pos'), metadata,
Column('idx', Integer, primary_key=True, autoincrement=True),
@ -180,11 +189,71 @@ pos = Table(
UniqueConstraint('date', 'ca01', 'ca02', 'ca03', 'name', 'barcode', name='uniq_pos_composite')
)
holiday = Table(
get_full_table_name('holiday'), metadata,
Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'),
Column('name', String(50), nullable=False, comment='휴일명'),
Column('created_at', DateTime, server_default=func.now(), comment='등록일시'),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'),
comment='한국천문연구원 특일정보'
pos_billdata = Table(
get_full_table_name('pos_billdata'), metadata,
Column('sale_date', Date, nullable=False),
Column('shop_cd', String(20), nullable=False),
Column('pos_no', Integer, nullable=False),
Column('bill_no', Integer, nullable=False),
Column('product_cd', String(20), nullable=False),
Column('division', String(10)),
Column('table_no', String(20)),
Column('order_time', Time),
Column('pay_time', Time),
Column('barcode', String(20)),
Column('product_name', String(100)),
Column('qty', Integer),
Column('tot_sale_amt', Integer),
Column('erp_cd', String(50)),
Column('remark', Text),
Column('dc_amt', Integer),
Column('dc_type', String(50)),
Column('dcm_sale_amt', Integer),
Column('net_amt', Integer),
Column('vat_amt', Integer),
PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd')
)
pos_ups_billdata = Table(
get_full_table_name('pos_ups_billdata'), metadata,
Column('sale_date', DateTime, nullable=False),
Column('shop_name', String(100), nullable=False),
Column('pos_no', String(20), nullable=False),
Column('bill_no', String(20), nullable=False),
Column('product_cd', String(20), nullable=False),
Column('ca01', String(50)),
Column('ca02', String(50)),
Column('ca03', String(50)),
Column('product_name', String(100)),
Column('barcode', String(20)),
Column('amt', Integer),
Column('qty', Integer),
Column('tot_sale_amt', Integer),
Column('dc_amt', Integer),
Column('dcm_sale_amt', Integer),
Column('net_amt', Integer),
Column('vat_amt', Integer),
Column('cash_receipt', Integer),
Column('card', Integer),
# PrimaryKeyConstraint 생략
mysql_engine='InnoDB',
mysql_charset='utf8mb4'
)
# 인덱스 추가
Index('idx_sale_shop_pos_product', pos_ups_billdata.c.sale_date, pos_ups_billdata.c.shop_name, pos_ups_billdata.c.pos_no, pos_ups_billdata.c.product_cd)
Index('idx_category', pos_ups_billdata.c.ca01, pos_ups_billdata.c.ca02, pos_ups_billdata.c.ca03)
Index('idx_product_barcode', pos_ups_billdata.c.product_name, pos_ups_billdata.c.barcode)
pos_shop_name = Table(
get_full_table_name('pos_shop_name'), metadata,
Column('shop_cd', String(20), primary_key=True, nullable=False),
Column('shop_name', String(100), nullable=False),
Column('used', Integer, nullable=False, default=1, comment='사용여부 (1=사용, 0=미사용)'),
Column('created_at', DateTime, server_default=func.current_timestamp(), comment='등록일시'),
Column('updated_at', DateTime, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment='수정일시'),
mysql_engine='InnoDB',
mysql_charset='utf8mb4',
)

25
conf/install.sql Normal file
View File

@ -0,0 +1,25 @@
CREATE TABLE `fg_manager_static_pos_ups_billdata` (
`sale_date` DATETIME NOT NULL,
`shop_name` VARCHAR(100) NOT NULL,
`pos_no` VARCHAR(20) NOT NULL,
`bill_no` VARCHAR(20) NOT NULL,
`product_cd` VARCHAR(20) NOT NULL,
`ca01` VARCHAR(50),
`ca02` VARCHAR(50),
`ca03` VARCHAR(50),
`product_name` VARCHAR(100),
`barcode` VARCHAR(20),
`amt` INT,
`qty` INT,
`tot_sale_amt` INT,
`dc_amt` INT,
`dcm_sale_amt` INT,
`net_amt` INT,
`vat_amt` INT,
`cash_receipt` INT,
`card` INT,
PRIMARY KEY (`sale_date`, `shop_name`, `pos_no`, `bill_no`, `product_cd`, `qty`), -- 옵션: 복합 PK (원하지 않으면 제거)
KEY `idx_sale_shop_pos_product` (`sale_date`, `shop_name`, `pos_no`, `product_cd`),
KEY `idx_category` (`ca01`, `ca02`, `ca03`),
KEY `idx_product_barcode` (`product_name`, `barcode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

13
docker-compose.yml Normal file
View File

@ -0,0 +1,13 @@
services:
fg-static:
container_name: fg-static
build:
context: .
dockerfile: build/Dockerfile
image: reg.firstgarden.co.kr/fg-static:latest
volumes:
- ./data:/app/data
- ./conf:/app/conf
environment:
- TZ=Asia/Seoul
restart: unless-stopped

View File

@ -1,5 +1,8 @@
# common.py
import os, yaml
import logging
import time
import glob
def load_config():
"""
@ -8,3 +11,21 @@ def load_config():
path = os.path.join(os.path.dirname(__file__), '..', 'conf', 'config.yaml')
with open(path, encoding='utf-8') as f:
return yaml.safe_load(f)
def get_logger(name):
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def wait_download_complete(download_dir, ext, timeout=60):
for _ in range(timeout):
files = glob.glob(os.path.join(download_dir, f"*.{ext.strip('.')}"))
if files:
return files[0]
time.sleep(1)
raise TimeoutError("다운로드 대기 시간 초과")

81
lib/file_watch.py Normal file
View File

@ -0,0 +1,81 @@
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import pos_update_bill
import pos_update_daily_product
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../data'))
FILE_EXTENSIONS = ('.xls', '.xlsx')
BILL_PREFIX = "영수증별매출상세현황"
DAILY_PRODUCT_PREFIX = "일자별 (상품별)"
class NewFileHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self._lock = threading.Lock()
self._processing_files = set()
def on_created(self, event):
if event.is_directory:
return
filepath = event.src_path
filename = os.path.basename(filepath)
if not filename.endswith(FILE_EXTENSIONS):
return
# 처리 대상 여부 확인
if filename.startswith(BILL_PREFIX) or filename.startswith(DAILY_PRODUCT_PREFIX):
print(f"[WATCHER] 신규 파일 감지: {filename}")
threading.Thread(target=self.process_file, args=(filepath, filename), daemon=True).start()
def process_file(self, filepath, filename):
with self._lock:
if filename in self._processing_files:
print(f"[WATCHER] {filename} 이미 처리 중")
return
self._processing_files.add(filename)
try:
time.sleep(3) # 파일 쓰기 완료 대기
print(f"[WATCHER] 파일 처리 시작: {filename}")
if filename.startswith(BILL_PREFIX):
pos_update_bill.main()
elif filename.startswith(DAILY_PRODUCT_PREFIX):
pos_update_daily_product.main()
else:
print(f"[WATCHER] 처리 대상이 아님: {filename}")
return
except Exception as e:
print(f"[WATCHER] 처리 중 오류 발생: {filename} / {e}")
else:
try:
os.remove(filepath)
print(f"[WATCHER] 파일 처리 완료 및 삭제: {filename}")
except Exception as e:
print(f"[WATCHER] 파일 삭제 실패: {filename} / {e}")
finally:
with self._lock:
self._processing_files.discard(filename)
def start_watching():
print(f"[WATCHER] '{DATA_DIR}' 폴더 감시 시작")
event_handler = NewFileHandler()
observer = Observer()
observer.schedule(event_handler, DATA_DIR, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("[WATCHER] 감시 종료 요청 수신, 종료 중...")
observer.stop()
observer.join()
if __name__ == "__main__":
start_watching()

View File

@ -4,7 +4,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import yaml
import requests
import xml.etree.ElementTree as ET
from datetime import datetime, date
from datetime import date, datetime, timedelta
from sqlalchemy import select, insert, delete
# config.yaml 경로 및 로딩
@ -134,8 +134,40 @@ def is_korean_holiday(dt: date) -> bool:
finally:
session.close()
def get_holiday_dates(start_date: date, end_date: date) -> set[date]:
"""특정 기간 내의 휴일 목록 반환"""
session = db.get_session()
try:
stmt = select(holiday_table.c.date).where(
holiday_table.c.date.between(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d"))
)
results = session.execute(stmt).scalars().all()
return set(datetime.strptime(d, "%Y%m%d").date() for d in results)
finally:
session.close()
def get_weekday_dates(start_date: date, end_date: date) -> set[date]:
"""특정 기간 중 평일(월~금 & 비휴일) 목록 반환"""
holiday_dates = get_holiday_dates(start_date, end_date)
result = set()
curr = start_date
while curr <= end_date:
if curr.weekday() < 5 and curr not in holiday_dates: # 월(0)~금(4)
result.add(curr)
curr += timedelta(days=1)
return result
if __name__ == "__main__":
print("📌 특일정보 초기화 시작")
print("📌 휴일 테스트 시작")
init_holidays()
print("✅ 특일정보 초기화 완료")
from datetime import date
start = date(2025, 1, 1)
end = date(2025, 12, 31)
holidays = get_holiday_dates(start, end)
print(f"🔍 {start} ~ {end} 사이 휴일 {len(holidays)}")
for d in sorted(holidays):
print(" -", d)

263
lib/pos_update_bill.py Normal file
View File

@ -0,0 +1,263 @@
"""
영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트
1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]')
2. 중복된 데이터는 update 처리됨 (on duplicate key update)
3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능)
"""
import os
import sys
import re
import pandas as pd
from datetime import datetime
from sqlalchemy.dialects.mysql import insert
from sqlalchemy import select
# 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
# 설정 파일 로드 및 데이터 폴더 경로 설정
CONFIG = load_config()
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
# 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$")
# 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또"
HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)")
def extract_file_info(filepath: str):
"""
엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다.
A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱.
Args:
filepath (str): 엑셀파일 경로
Returns:
tuple: (sale_date: date, shop_cd: str, shop_name: str)
Raises:
ValueError: 정규식 매칭 실패 시
"""
print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작")
df_head = pd.read_excel(filepath, header=None, nrows=5)
first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index)
match = HEADER_PATTERN.search(str(first_row))
if not match:
raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}")
sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date()
shop_cd = match.group(2)
shop_name = match.group(3).strip()
print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
return sale_date, shop_cd, shop_name
def load_excel_data(filepath: str):
"""
지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정.
'합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다.
Args:
filepath (str): 엑셀파일 경로
Returns:
pd.DataFrame: 전처리된 데이터프레임
Raises:
ValueError: 필수 컬럼 누락 시
"""
print(f"[INFO] {filepath} 데이터 영역 로드 시작")
usecols = [
"포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각",
"상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드",
"비고", "할인액", "할인구분", "실매출액", "가액", "부가세"
]
# header=5 => 6번째 행이 컬럼명
df = pd.read_excel(filepath, header=5, dtype=str)
# 컬럼명 좌우 공백 제거
df.columns = df.columns.str.strip()
# '합계'인 행의 인덱스 찾기 및 제거
if '합계' in df['포스번호'].values:
idx = df[df['포스번호'] == '합계'].index[0]
df = df.loc[:idx-1]
print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외")
# 필수 컬럼 존재 여부 체크
if not set(usecols).issubset(df.columns):
raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}")
df = df[usecols]
print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}")
return df
def normalize_data(df: pd.DataFrame, sale_date, shop_cd):
"""
컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다.
조회일자와 매장코드를 데이터프레임에 추가.
Args:
df (pd.DataFrame): 원본 데이터프레임
sale_date (date): 조회일자
shop_cd (str): 매장코드
Returns:
pd.DataFrame: 정규화된 데이터프레임
"""
print(f"[INFO] 데이터 정규화 시작")
def to_int(x):
try:
return int(str(x).replace(",", "").strip())
except:
return 0
df.rename(columns={
"포스번호": "pos_no",
"영수증번호": "bill_no",
"구분": "division",
"테이블명": "table_no",
"최초주문": "order_time",
"결제시각": "pay_time",
"상품코드": "product_cd",
"바코드": "barcode",
"상품명": "product_name",
"수량": "qty",
"총매출액": "tot_sale_amt",
"ERP 매핑코드": "erp_cd",
"비고": "remark",
"할인액": "dc_amt",
"할인구분": "dc_type",
"실매출액": "dcm_sale_amt",
"가액": "net_amt",
"부가세": "vat_amt"
}, inplace=True)
df["sale_date"] = sale_date
df["shop_cd"] = shop_cd
# 숫자형 컬럼 정수 변환
int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"]
for field in int_fields:
df[field] = df[field].apply(to_int)
# pos_no, bill_no는 반드시 int로 변환
df["pos_no"] = df["pos_no"].astype(int)
df["bill_no"] = df["bill_no"].astype(int)
print(f"[INFO] 데이터 정규화 완료")
return df
def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int:
"""
SQLAlchemy insert 구문을 사용하여
중복 PK 발생 시 update 처리 (on duplicate key update)
대량 데이터는 batch_size 단위로 나누어 처리
Args:
df (pd.DataFrame): DB에 삽입할 데이터
batch_size (int): 한번에 처리할 데이터 건수 (기본 500)
Returns:
int: 영향 받은 총 행 수
"""
print(f"[INFO] DB 저장 시작")
df = df.where(pd.notnull(df), None) # NaN → None 변환
engine = db.get_engine()
metadata = db_schema.metadata
table = db_schema.pos_billdata
total_affected = 0
with engine.connect() as conn:
for start in range(0, len(df), batch_size):
batch_df = df.iloc[start:start+batch_size]
records = batch_df.to_dict(orient="records")
insert_stmt = insert(table).values(records)
update_fields = {
col.name: insert_stmt.inserted[col.name]
for col in table.columns
if col.name not in table.primary_key.columns
}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields)
try:
result = conn.execute(upsert_stmt)
conn.commit()
total_affected += result.rowcount
print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}")
except Exception as e:
print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}")
# 필요 시 raise 하거나 continue로 다음 배치 진행 가능
raise
print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}")
return total_affected
def ensure_shop_exists(shop_cd, shop_name):
"""
매장 정보 테이블에 매장코드가 없으면 신규 등록한다.
Args:
shop_cd (str): 매장 코드
shop_name (str): 매장 명
"""
print(f"[INFO] 매장 존재 여부 확인: {shop_cd}")
engine = db.get_engine()
conn = engine.connect()
shop_table = db_schema.pos_shop_name
try:
query = shop_table.select().where(shop_table.c.shop_cd == shop_cd)
result = conn.execute(query).fetchone()
if result is None:
print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}")
ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name)
conn.execute(ins)
conn.commit()
else:
print(f"[INFO] 기존 매장 존재: {shop_cd}")
except Exception as e:
print(f"[ERROR] 매장 확인/등록 실패: {e}")
raise
finally:
conn.close()
def main():
"""
대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다.
처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력.
"""
files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)]
print(f"[INFO] 발견된 파일 {len(files)}")
for file in files:
filepath = os.path.join(DATA_DIR, file)
print(f"[INFO] 파일: {file} 처리 시작")
try:
sale_date, shop_cd, shop_name = extract_file_info(filepath)
ensure_shop_exists(shop_cd, shop_name)
raw_df = load_excel_data(filepath)
df = normalize_data(raw_df, sale_date, shop_cd)
affected = upsert_data(df)
print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}")
# 처리 완료 후 파일 삭제 (필요 시 활성화)
# os.remove(filepath)
# print(f"[INFO] 처리 완료 후 파일 삭제: {file}")
except Exception as e:
print(f"[ERROR] {file} 처리 실패: {e}")
if __name__ == "__main__":
main()

View File

@ -1,22 +1,16 @@
# POS Update
'''
포스 데이터를 추출한 엑셀파일을 업데이트
OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀
추출파일을 ./data에 복사
파일 실행하면 자동으로 mariadb의 DB에 삽입함.
'''
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import tkinter as tk
import pandas as pd
from tkinter import filedialog, messagebox
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
@ -29,14 +23,12 @@ def update_pos_table(engine, table, df):
data = row.to_dict()
stmt = mysql_insert(table).values(**data)
# insert ... on duplicate key update (복합 unique key 기준)
update_data = {
'qty': data['qty'],
'tot_amount': data['tot_amount'],
'tot_discount': data['tot_discount'],
'actual_amount': data['actual_amount']
}
stmt = stmt.on_duplicate_key_update(**update_data)
try:
@ -50,7 +42,6 @@ def process_file(filepath, table, engine):
print(f"[INFO] 처리 시작: {filepath}")
try:
ext = os.path.splitext(filepath)[-1].lower()
if ext == ".xls":
df = pd.read_excel(filepath, header=5, engine="xlrd")
elif ext == ".xlsx":
@ -73,8 +64,7 @@ def process_file(filepath, table, engine):
'실매출액': 'actual_amount'
}, inplace=True)
if 'idx' in df.columns:
df = df.drop(columns=['idx'])
df.drop(columns=[col for col in ['idx'] if col in df.columns], inplace=True)
df['date'] = pd.to_datetime(df['date']).dt.date
df['barcode'] = df['barcode'].astype(int)
@ -98,7 +88,6 @@ def process_file(filepath, table, engine):
def batch_process_files(table, engine):
files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))]
if not files:
print("[INFO] 처리할 파일이 없습니다.")
return False
@ -114,36 +103,15 @@ def batch_process_files(table, engine):
total_rows += count
try:
os.remove(full_path)
print(f"[INFO] 처리 완료 후 파일 삭제: {fname}")
print(f"[INFO] 파일 삭제 완료: {fname}")
deleted_files += 1
except Exception as e:
print(f"[WARN] 파일 삭제 실패: {fname}, {e}")
print(f"[WARN] 파일 삭제 실패: {fname} / {e}")
print(f"[INFO] 처리된 전체 데이터 건수: {total_rows}")
print(f"[INFO] 처리 데이터 건수: {total_rows}")
print(f"[INFO] 삭제된 파일 수: {deleted_files}")
return True
def run_pos_update():
filepath = filedialog.askopenfilename(
filetypes=[("Excel Files", "*.xlsx *.xls")],
title="파일을 선택하세요"
)
if not filepath:
return
engine = db.engine
try:
table = db_schema.pos
except AttributeError:
messagebox.showerror("DB 오류", "'pos' 테이블이 db_schema에 정의되어 있지 않습니다.")
return
if messagebox.askyesno("확인", f"'{os.path.basename(filepath)}' 파일을 'pos' 테이블에 업로드 하시겠습니까?"):
success, count = process_file(filepath, table, engine)
if success:
print(f"[INFO] 수동 선택된 파일 처리 완료: {count}")
messagebox.showinfo("완료", f"DB 업데이트가 완료되었습니다.\n{count}건 처리됨.")
def main():
engine = db.engine
try:
@ -154,18 +122,7 @@ def main():
batch_done = batch_process_files(table, engine)
if not batch_done:
# GUI 시작
root = tk.Tk()
root.title("POS 데이터 업데이트")
root.geometry("300x150")
lbl = tk.Label(root, text="POS 데이터 업데이트")
lbl.pack(pady=20)
btn = tk.Button(root, text="데이터 선택 및 업데이트", command=run_pos_update)
btn.pack()
root.mainloop()
print("[INFO] 처리할 데이터가 없습니다.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,147 @@
import os
import sys
import pandas as pd
import shutil
from datetime import datetime
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import get_logger
from conf import db, db_schema # get_engine, get_session 포함
logger = get_logger("POS_UPS")
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
FINISH_DIR = os.path.join(DATA_DIR, "finish")
os.makedirs(FINISH_DIR, exist_ok=True)
def nan_to_none(value):
import pandas as pd
if pd.isna(value):
return None
return value
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
# 컬럼명 공백 제거 등 정리
df.columns = [col.strip() for col in df.columns]
# 필수 컬럼 체크
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"필수 컬럼 누락: {col}")
df = df.dropna(subset=required_cols)
return df
def process_file(filepath: str, engine, session, table, batch_size=500):
try:
df = load_excel_data(filepath)
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}")
inserted, updated, errors = 0, 0, 0
batch_data = []
for idx, row in df.iterrows():
data = None
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
"shop_name": str(row["매장명"]).strip(),
"pos_no": str(row["포스"]).strip(),
"bill_no": str(row["영수증 번호"]).strip(),
"product_cd": str(row["품목"]).strip(),
"product_name": str(row["품목명"]).strip(),
"qty": int(row["수량"]),
"ca01": nan_to_none(row.get("대분류", None)),
"ca02": nan_to_none(row.get("중분류", None)),
"ca03": nan_to_none(row.get("소분류", None)),
"barcode": nan_to_none(row.get("바코드", None)),
"amt": int(row.get("단가", 0)),
"tot_sale_amt": int(row.get("주문 금액", 0)),
"dc_amt": int(row.get("할인 금액", 0)),
"dcm_sale_amt": int(row.get("공급가액", 0)),
"vat_amt": int(row.get("세금", 0)),
"net_amt": int(row.get("결제 금액", 0)),
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
batch_data.append(data)
except Exception as e:
if data is not None:
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
else:
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
errors += 1
# 배치 크기 도달시 DB에 한번에 처리
if len(batch_data) >= batch_size:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
inserted += len(batch_data)
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
batch_data = []
# 남은 잔여 데이터 처리
if batch_data:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
inserted += len(batch_data)
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
except Exception as e:
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
session.rollback()
def main():
engine = db.get_engine()
session = db.get_session()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
logger.info(f"[INFO] 처리할 파일 {len(files)}")
for file in sorted(files):
logger.info(f"[START] {os.path.basename(file)}")
process_file(file, engine, session, table)
if __name__ == "__main__":
main()

View File

@ -9,12 +9,13 @@ from tkcalendar import DateEntry
from datetime import datetime, timedelta
from sqlalchemy import select, func, between
from conf import db_schema, db
from lib import holiday # 휴일 기능
# Windows DPI Awareness 설정 (윈도우 전용)
# Windows DPI Awareness 설정
if sys.platform == "win32":
import ctypes
try:
ctypes.windll.shcore.SetProcessDpiAwareness(1) # SYSTEM_AWARE = 1
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
@ -26,30 +27,23 @@ class PosViewGUI(ctk.CTk):
super().__init__()
self.title("POS 데이터 조회")
self.geometry("900x500")
self.configure(fg_color="#f0f0f0") # 배경색 맞춤
self.geometry("1100x700")
self.configure(fg_color="#f0f0f0")
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
# 폰트 세팅 - NanumGothic이 없으면 Arial 대체
try:
self.label_font = ("NanumGothic", 13)
except Exception:
self.label_font = ("Arial", 13)
# Treeview 스타일 설정 (ttk 스타일)
style = ttk.Style(self)
style.theme_use('default')
style.configure("Treeview",
font=("NanumGothic", 12),
rowheight=30) # 높이 조절로 글씨 깨짐 방지
style.configure("Treeview.Heading",
font=("NanumGothic", 13, "bold"))
style.configure("Treeview", font=("NanumGothic", 12), rowheight=30)
style.configure("Treeview.Heading", font=("NanumGothic", 13, "bold"))
# --- 위젯 배치 ---
# 날짜 범위
# 날짜 필터
ctk.CTkLabel(self, text="시작일:", anchor="w", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=0, column=0, padx=10, pady=5, sticky="e")
self.start_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white')
@ -60,6 +54,18 @@ class PosViewGUI(ctk.CTk):
self.end_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white')
self.end_date_entry.grid(row=0, column=3, padx=10, pady=5, sticky="w")
# 날짜유형 라디오버튼
self.date_filter_var = ctk.StringVar(value="전체")
ctk.CTkLabel(self, text="날짜유형:", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=0, column=4, padx=(10, 0), pady=5, sticky="e")
ctk.CTkRadioButton(self, text="전체", variable=self.date_filter_var, value="전체")\
.grid(row=0, column=5, padx=2, pady=5, sticky="w")
ctk.CTkRadioButton(self, text="휴일", variable=self.date_filter_var, value="휴일")\
.grid(row=0, column=6, padx=2, pady=5, sticky="w")
ctk.CTkRadioButton(self, text="평일", variable=self.date_filter_var, value="평일")\
.grid(row=0, column=7, padx=2, pady=5, sticky="w")
# 대분류
ctk.CTkLabel(self, text="대분류 :", anchor="w", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=1, column=0, padx=10, pady=5, sticky="e")
@ -82,9 +88,9 @@ class PosViewGUI(ctk.CTk):
# 조회 버튼
self.search_btn = ctk.CTkButton(self, text="조회", command=self.search,
fg_color="#0d6efd", hover_color="#0b5ed7", text_color="white")
self.search_btn.grid(row=3, column=0, columnspan=4, pady=10)
self.search_btn.grid(row=3, column=0, columnspan=8, pady=10)
# 결과 Treeview
# 상품별 트리뷰
self.DISPLAY_COLUMNS = ['ca01', 'ca02', 'ca03', 'name', 'qty', 'tot_amount', 'tot_discount', 'actual_amount']
self.COLUMN_LABELS = {
'ca01': '대분류',
@ -97,28 +103,38 @@ class PosViewGUI(ctk.CTk):
'actual_amount': '실매출액'
}
self.tree = ttk.Treeview(self, columns=self.DISPLAY_COLUMNS, show='headings', height=15)
self.tree = ttk.Treeview(self, columns=self.DISPLAY_COLUMNS, show='headings', height=12)
for col in self.DISPLAY_COLUMNS:
self.tree.heading(col, text=self.COLUMN_LABELS[col])
self.tree.column(col, width=120, anchor='center')
self.tree.grid(row=4, column=0, columnspan=4, padx=10, pady=10, sticky="nsew")
self.tree.grid(row=4, column=0, columnspan=8, padx=10, pady=10, sticky="nsew")
# 날짜 요약 트리뷰
self.date_tree = ttk.Treeview(self, columns=['date', 'qty', 'tot_amount', 'actual_amount'], show='headings', height=6)
self.date_tree.heading('date', text='일자')
self.date_tree.heading('qty', text='수량합')
self.date_tree.heading('tot_amount', text='총매출합')
self.date_tree.heading('actual_amount', text='실매출합')
for col in ['date', 'qty', 'tot_amount', 'actual_amount']:
self.date_tree.column(col, width=150, anchor='center')
self.date_tree.grid(row=5, column=0, columnspan=8, padx=10, pady=(0, 10), sticky="nsew")
# 그리드 가중치 설정 (창 크기에 따라 트리뷰 확장)
self.grid_rowconfigure(4, weight=1)
for col_index in range(4):
self.grid_rowconfigure(5, weight=1)
for col_index in range(8):
self.grid_columnconfigure(col_index, weight=1)
# 날짜 기본값 설정 (전날부터 7일 전까지)
# 날짜 기본값
end_date = datetime.today().date() - timedelta(days=1)
start_date = end_date - timedelta(days=6)
self.start_date_entry.set_date(start_date)
self.end_date_entry.set_date(end_date)
# 초기 대분류, 소분류 콤보박스 값 불러오기
self.load_ca01_options()
def on_ca01_selected(self, value):
# print("대분류 선택됨:", value) 디버깅용
self.load_ca03_options()
def load_ca01_options(self):
@ -148,7 +164,7 @@ class PosViewGUI(ctk.CTk):
result = conn.execute(stmt)
ca03_list = [row[0] for row in result.fetchall()]
self.ca03_combo.configure(values=['전체'] + ca03_list)
self.ca03_combo.set('전체') # 항상 기본값으로 초기화
self.ca03_combo.set('전체')
def search(self):
start_date = self.start_date_entry.get_date()
@ -156,8 +172,34 @@ class PosViewGUI(ctk.CTk):
ca01_val = self.ca01_combo.get()
ca03_val = self.ca03_combo.get()
name_val = self.name_entry.get().strip()
date_filter = self.date_filter_var.get()
print("🔍 date_filter:", date_filter,
"| start:", start_date, "end:", end_date)
if date_filter == "휴일":
valid_dates = holiday.get_holiday_dates(start_date, end_date)
print("🚩 반환된 휴일 날짜 리스트:", valid_dates)
conditions = []
if date_filter == "전체":
conditions.append(between(pos_table.c.date, start_date, end_date))
else:
if date_filter == "휴일":
valid_dates = holiday.get_holiday_dates(start_date, end_date)
elif date_filter == "평일":
valid_dates = holiday.get_weekday_dates(start_date, end_date)
else:
valid_dates = set()
if not valid_dates:
messagebox.showinfo("알림", f"{date_filter}에 해당하는 데이터가 없습니다.")
self.tree.delete(*self.tree.get_children())
self.date_tree.delete(*self.date_tree.get_children())
return
conditions.append(pos_table.c.date.in_(valid_dates))
conditions = [between(pos_table.c.date, start_date, end_date)]
if ca01_val != '전체':
conditions.append(pos_table.c.ca01 == ca01_val)
if ca03_val != '전체':
@ -166,6 +208,7 @@ class PosViewGUI(ctk.CTk):
conditions.append(pos_table.c.name.like(f"%{name_val}%"))
with engine.connect() as conn:
# 상품별
stmt = select(
pos_table.c.ca01,
pos_table.c.ca02,
@ -179,11 +222,42 @@ class PosViewGUI(ctk.CTk):
result = conn.execute(stmt).mappings().all()
# 날짜별 요약
date_stmt = select(
pos_table.c.date,
func.sum(pos_table.c.qty).label("qty"),
func.sum(pos_table.c.tot_amount).label("tot_amount"),
func.sum(pos_table.c.actual_amount).label("actual_amount")
).where(*conditions).group_by(pos_table.c.date).order_by(pos_table.c.date)
date_summary = conn.execute(date_stmt).mappings().all()
# 트리뷰 초기화
self.tree.delete(*self.tree.get_children())
self.date_tree.delete(*self.date_tree.get_children())
# 상품별 출력
for row in result:
values = tuple(row[col] for col in self.DISPLAY_COLUMNS)
self.tree.insert('', 'end', values=values)
# 날짜별 출력
total_qty = total_amount = total_actual = 0
for row in date_summary:
self.date_tree.insert('', 'end', values=(
row['date'].strftime("%Y-%m-%d"),
row['qty'],
row['tot_amount'],
row['actual_amount']
))
total_qty += row['qty']
total_amount += row['tot_amount']
total_actual += row['actual_amount']
# 총합계 추가
self.date_tree.insert('', 'end', values=("총합계", total_qty, total_amount, total_actual))
if __name__ == "__main__":
try:
import tkcalendar

View File

@ -13,3 +13,4 @@ scikit-learn
customtkinter
tkcalendar
tabulate
watchdog