From 29319cb12cdf108774451c67c1891007c27a1068 Mon Sep 17 00:00:00 2001 From: KWON Date: Mon, 28 Jul 2025 16:17:24 +0900 Subject: [PATCH] =?UTF-8?q?./data=20=ED=8F=B4=EB=8D=94=EB=A5=BC=20?= =?UTF-8?q?=EB=AA=A8=EB=8B=88=ED=84=B0=EB=9E=91=20=ED=95=98=EA=B3=A0,=20?= =?UTF-8?q?=EC=83=88=20=ED=8C=8C=EC=9D=BC=EC=9D=B4=20=EC=83=9D=EA=B8=B0?= =?UTF-8?q?=EB=A9=B4=20=EC=9D=BC=EC=B9=98=ED=95=98=EB=8A=94=20=ED=8C=8C?= =?UTF-8?q?=EC=9D=BC=20=ED=98=95=EC=8B=9D=EC=9D=B8=EC=A7=80=20=EC=B0=BE?= =?UTF-8?q?=EC=9D=80=20=ED=9B=84=20=EB=8D=B0=EC=9D=B4=ED=84=B0=EB=A5=BC=20?= =?UTF-8?q?=ED=8C=8C=EC=8B=B1=ED=95=B4=EC=84=9C=20DB=EC=97=90=20=EC=A0=80?= =?UTF-8?q?=EC=9E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/db.py | 7 +- conf/db_schema.py | 20 +- lib/file_watch.py | 75 +++++++ lib/pos_update_bill.py | 463 ++++++++++++++++++++--------------------- 4 files changed, 321 insertions(+), 244 deletions(-) create mode 100644 lib/file_watch.py diff --git a/conf/db.py b/conf/db.py index e63f790..71cc931 100644 --- a/conf/db.py +++ b/conf/db.py @@ -18,7 +18,12 @@ db_cfg = config['database'] db_url = f"mysql+pymysql://{db_cfg['user']}:{db_cfg['password']}@{db_cfg['host']}/{db_cfg['name']}?charset=utf8mb4" # MySQL 연결이 끊겼을 때 자동 재시도 옵션 포함 -engine = create_engine(db_url, pool_pre_ping=True) +engine = create_engine( + db_url, + pool_pre_ping=True, + pool_recycle=3600, # 3600초 = 1시간 +) + Session = sessionmaker(bind=engine) def get_engine(): diff --git a/conf/db_schema.py b/conf/db_schema.py index e11627e..73af925 100644 --- a/conf/db_schema.py +++ b/conf/db_schema.py @@ -164,6 +164,15 @@ ga4 = Table( mysql_charset='utf8mb4' ) +holiday = Table( + get_full_table_name('holiday'), metadata, + Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'), + Column('name', String(50), nullable=False, comment='휴일명'), + Column('created_at', DateTime, server_default=func.now(), comment='등록일시'), + Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'), + comment='한국천문연구원 특일정보' +) + pos = Table( get_full_table_name('pos'), metadata, Column('idx', Integer, primary_key=True, autoincrement=True), @@ -180,15 +189,6 @@ pos = Table( UniqueConstraint('date', 'ca01', 'ca02', 'ca03', 'name', 'barcode', name='uniq_pos_composite') ) -holiday = Table( - get_full_table_name('holiday'), metadata, - Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'), - Column('name', String(50), nullable=False, comment='휴일명'), - Column('created_at', DateTime, server_default=func.now(), comment='등록일시'), - Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'), - comment='한국천문연구원 특일정보' -) - pos_billdata = Table( get_full_table_name('pos_billdata'), metadata, Column('sale_date', Date, nullable=False), @@ -197,7 +197,7 @@ pos_billdata = Table( Column('bill_no', Integer, nullable=False), Column('product_cd', String(20), nullable=False), Column('division', String(10)), - Column('table_no', Integer), + Column('table_no', String(20)), Column('order_time', Time), Column('pay_time', Time), Column('barcode', String(20)), diff --git a/lib/file_watch.py b/lib/file_watch.py new file mode 100644 index 0000000..7f0315d --- /dev/null +++ b/lib/file_watch.py @@ -0,0 +1,75 @@ +import time +import os +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import threading + +# pos_update_bill 모듈에서 main 함수를 가져옵니다. +# pos_update_bill.py가 같은 폴더 혹은 PYTHONPATH에 있어야 합니다. +import pos_update_bill + +DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../data')) + +# 감시할 파일 확장자 및 패턴 정의 (pos_update_bill.py 내와 일치해야 함) +FILE_EXTENSIONS = ('.xls', '.xlsx') +FILE_PREFIX = "영수증별매출상세현황" + +class NewFileHandler(FileSystemEventHandler): + def __init__(self): + super().__init__() + self._lock = threading.Lock() + self._processing_files = set() + + def on_created(self, event): + if event.is_directory: + return + filepath = event.src_path + filename = os.path.basename(filepath) + if filename.startswith(FILE_PREFIX) and filename.endswith(FILE_EXTENSIONS): + print(f"[WATCHER] 신규 파일 감지: {filename}") + # 별도의 스레드에서 처리 (감시 중단 방지) + threading.Thread(target=self.process_file, args=(filepath, filename), daemon=True).start() + + def process_file(self, filepath, filename): + with self._lock: + if filename in self._processing_files: + print(f"[WATCHER] {filename} 이미 처리 중") + return + self._processing_files.add(filename) + + try: + # 파일이 완전히 쓰여질 때까지 대기 (간단히 3초 대기, 필요 시 로직 강화) + time.sleep(3) + + print(f"[WATCHER] 파일 처리 시작: {filename}") + # pos_update_bill.main() 내부가 파일 리스트를 탐색해서 처리하므로 + # 신규 파일이 존재하는 상태에서 호출하면 정상 동작함. + pos_update_bill.main() + except Exception as e: + print(f"[WATCHER] 파일 처리 중 오류 발생: {filename} / {e}") + else: + try: + os.remove(filepath) + print(f"[WATCHER] 파일 처리 완료 및 삭제: {filename}") + except Exception as e: + print(f"[WATCHER] 파일 삭제 실패: {filename} / {e}") + finally: + with self._lock: + self._processing_files.discard(filename) + +def start_watching(): + print(f"[WATCHER] {DATA_DIR} 폴더 감시 시작") + event_handler = NewFileHandler() + observer = Observer() + observer.schedule(event_handler, DATA_DIR, recursive=False) + observer.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("[WATCHER] 감시 종료 요청 수신, 종료 중...") + observer.stop() + observer.join() + +if __name__ == "__main__": + start_watching() diff --git a/lib/pos_update_bill.py b/lib/pos_update_bill.py index d7f0446..7089281 100644 --- a/lib/pos_update_bill.py +++ b/lib/pos_update_bill.py @@ -1,266 +1,263 @@ +""" +영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트 + +1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]') +2. 중복된 데이터는 update 처리됨 (on duplicate key update) +3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능) +""" + import os import sys import re -import time -import logging -from datetime import datetime import pandas as pd -import xlrd -from openpyxl import load_workbook -from sqlalchemy.dialects.mysql import insert as mysql_insert -from sqlalchemy.exc import IntegrityError -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler +from datetime import datetime +from sqlalchemy.dialects.mysql import insert +from sqlalchemy import select -# 프로젝트 루트 상위 경로 추가 +# 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + from conf import db, db_schema from lib.common import load_config +# 설정 파일 로드 및 데이터 폴더 경로 설정 +CONFIG = load_config() DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') -FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # xls 또는 xlsx -logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s] %(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', -) +# 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자 +FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") -def parse_header(filepath): - ext = os.path.splitext(filepath)[1].lower() - if ext == '.xlsx': - from openpyxl import load_workbook - wb = load_workbook(filepath, read_only=True, data_only=True) - ws = wb.active - cell_val = ws['A3'].value - elif ext == '.xls': - wb = xlrd.open_workbook(filepath) - sheet = wb.sheet_by_index(0) - cell_val = sheet.cell_value(2, 0) # 0-based row,col → A3 is row=2, col=0 - else: - raise ValueError(f"지원하지 않는 확장자: {ext}") +# 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또" +HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)") - if not cell_val: - raise ValueError("A3 셀에 값이 없습니다.") +def extract_file_info(filepath: str): + """ + 엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다. + A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱. - date_match = re.search(r'조회일자\s*:\s*([\d\-]+)', cell_val) - shop_match = re.search(r'매장선택\s*:\s*\[(.*?)\]\s*(.+)', cell_val) + Args: + filepath (str): 엑셀파일 경로 - if not date_match or not shop_match: - raise ValueError("A3 셀에서 날짜 또는 매장 정보 파싱 실패") + Returns: + tuple: (sale_date: date, shop_cd: str, shop_name: str) - sale_date = date_match.group(1) - shop_cd = shop_match.group(1) - shop_name = shop_match.group(2).strip() + Raises: + ValueError: 정규식 매칭 실패 시 + """ + print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작") + df_head = pd.read_excel(filepath, header=None, nrows=5) + first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index) + match = HEADER_PATTERN.search(str(first_row)) + if not match: + raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}") + + sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date() + shop_cd = match.group(2) + shop_name = match.group(3).strip() + print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") return sale_date, shop_cd, shop_name -def check_and_update_shop(shop_cd, shop_name): - conn = db.engine.connect() - try: - result = conn.execute( - "SELECT shop_name FROM fg_manager_pos_shop_name WHERE shop_cd = %s", - (shop_cd,) - ).fetchone() +def load_excel_data(filepath: str): + """ + 지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정. + '합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다. + Args: + filepath (str): 엑셀파일 경로 + + Returns: + pd.DataFrame: 전처리된 데이터프레임 + + Raises: + ValueError: 필수 컬럼 누락 시 + """ + print(f"[INFO] {filepath} 데이터 영역 로드 시작") + usecols = [ + "포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각", + "상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드", + "비고", "할인액", "할인구분", "실매출액", "가액", "부가세" + ] + # header=5 => 6번째 행이 컬럼명 + df = pd.read_excel(filepath, header=5, dtype=str) + # 컬럼명 좌우 공백 제거 + df.columns = df.columns.str.strip() + + # '합계'인 행의 인덱스 찾기 및 제거 + if '합계' in df['포스번호'].values: + idx = df[df['포스번호'] == '합계'].index[0] + df = df.loc[:idx-1] + print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외") + + # 필수 컬럼 존재 여부 체크 + if not set(usecols).issubset(df.columns): + raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}") + + df = df[usecols] + print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}") + return df + +def normalize_data(df: pd.DataFrame, sale_date, shop_cd): + """ + 컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다. + 조회일자와 매장코드를 데이터프레임에 추가. + + Args: + df (pd.DataFrame): 원본 데이터프레임 + sale_date (date): 조회일자 + shop_cd (str): 매장코드 + + Returns: + pd.DataFrame: 정규화된 데이터프레임 + """ + print(f"[INFO] 데이터 정규화 시작") + def to_int(x): + try: + return int(str(x).replace(",", "").strip()) + except: + return 0 + + df.rename(columns={ + "포스번호": "pos_no", + "영수증번호": "bill_no", + "구분": "division", + "테이블명": "table_no", + "최초주문": "order_time", + "결제시각": "pay_time", + "상품코드": "product_cd", + "바코드": "barcode", + "상품명": "product_name", + "수량": "qty", + "총매출액": "tot_sale_amt", + "ERP 매핑코드": "erp_cd", + "비고": "remark", + "할인액": "dc_amt", + "할인구분": "dc_type", + "실매출액": "dcm_sale_amt", + "가액": "net_amt", + "부가세": "vat_amt" + }, inplace=True) + + df["sale_date"] = sale_date + df["shop_cd"] = shop_cd + + # 숫자형 컬럼 정수 변환 + int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"] + for field in int_fields: + df[field] = df[field].apply(to_int) + + # pos_no, bill_no는 반드시 int로 변환 + df["pos_no"] = df["pos_no"].astype(int) + df["bill_no"] = df["bill_no"].astype(int) + + print(f"[INFO] 데이터 정규화 완료") + return df + +def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int: + """ + SQLAlchemy insert 구문을 사용하여 + 중복 PK 발생 시 update 처리 (on duplicate key update) + 대량 데이터는 batch_size 단위로 나누어 처리 + + Args: + df (pd.DataFrame): DB에 삽입할 데이터 + batch_size (int): 한번에 처리할 데이터 건수 (기본 500) + + Returns: + int: 영향 받은 총 행 수 + """ + print(f"[INFO] DB 저장 시작") + df = df.where(pd.notnull(df), None) # NaN → None 변환 + + engine = db.get_engine() + metadata = db_schema.metadata + table = db_schema.pos_billdata + total_affected = 0 + + with engine.connect() as conn: + for start in range(0, len(df), batch_size): + batch_df = df.iloc[start:start+batch_size] + records = batch_df.to_dict(orient="records") + insert_stmt = insert(table).values(records) + + update_fields = { + col.name: insert_stmt.inserted[col.name] + for col in table.columns + if col.name not in table.primary_key.columns + } + upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields) + + try: + result = conn.execute(upsert_stmt) + conn.commit() + total_affected += result.rowcount + print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}") + except Exception as e: + print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}") + # 필요 시 raise 하거나 continue로 다음 배치 진행 가능 + raise + + print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}") + return total_affected + + +def ensure_shop_exists(shop_cd, shop_name): + """ + 매장 정보 테이블에 매장코드가 없으면 신규 등록한다. + + Args: + shop_cd (str): 매장 코드 + shop_name (str): 매장 명 + """ + print(f"[INFO] 매장 존재 여부 확인: {shop_cd}") + engine = db.get_engine() + conn = engine.connect() + shop_table = db_schema.pos_shop_name + + try: + query = shop_table.select().where(shop_table.c.shop_cd == shop_cd) + result = conn.execute(query).fetchone() if result is None: - conn.execute( - "INSERT INTO fg_manager_pos_shop_name (shop_cd, shop_name, used) VALUES (%s, %s, 1)", - (shop_cd, shop_name) - ) - logging.info(f"매장코드 '{shop_cd}' 신규 등록: '{shop_name}'") + print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}") + ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name) + conn.execute(ins) + conn.commit() else: - existing_name = result[0] - if existing_name != shop_name: - print(f"매장코드 '{shop_cd}' 기존명: '{existing_name}', 새 이름: '{shop_name}'") - answer = input("매장명을 새 이름으로 변경하시겠습니까? (y/Enter=예, 그외=아니오): ").strip().lower() - if answer in ('', 'y', 'yes'): - conn.execute( - "UPDATE fg_manager_pos_shop_name SET shop_name = %s WHERE shop_cd = %s", - (shop_name, shop_cd) - ) - logging.info(f"매장코드 '{shop_cd}' 매장명 변경: '{existing_name}' -> '{shop_name}'") - else: - logging.info(f"매장코드 '{shop_cd}' 매장명 변경 거부됨") + print(f"[INFO] 기존 매장 존재: {shop_cd}") + except Exception as e: + print(f"[ERROR] 매장 확인/등록 실패: {e}") + raise finally: conn.close() -def load_data(filepath, sale_date, shop_cd): - ext = os.path.splitext(filepath)[1].lower() - if ext == '.xls': - engine = 'xlrd' - elif ext == '.xlsx': - engine = 'openpyxl' - else: - raise ValueError(f"지원하지 않는 엑셀 확장자: {ext}") - - df = pd.read_excel(filepath, header=6, engine=engine) - - if '합계' in df.iloc[:, 0].values: - df = df[df.iloc[:, 0] != '합계'] - - rename_map = { - '포스번호': 'pos_no', - '영수증번호': 'bill_no', - '구분': 'division', - '테이블번호': 'table_no', - '최초주문': 'order_time', - '결제시각': 'pay_time', - '상품코드': 'product_cd', - '바코드': 'barcode', - '상품명': 'product_name', - '수량': 'qty', - '총매출액': 'tot_sale_amt', - 'ERP매핑코드': 'erp_cd', - '비고': 'remark', - '할인액': 'dc_amt', - '할인구분': 'dc_type', - '실매출액': 'dcm_sale_amt', - '가액': 'net_amt', - '부가세': 'vat_amt', - } - df.rename(columns=rename_map, inplace=True) - - df['sale_date'] = pd.to_datetime(sale_date).date() - df['shop_cd'] = shop_cd - - int_cols = ['pos_no', 'bill_no', 'table_no', 'qty', 'tot_sale_amt', 'dc_amt', 'dcm_sale_amt', 'net_amt', 'vat_amt'] - for col in int_cols: - if col in df.columns: - df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int) - - import datetime - time_cols = ['order_time', 'pay_time'] - for col in time_cols: - if col in df.columns: - def to_time(val): - if pd.isna(val): - return None - if isinstance(val, datetime.time): - return val - if isinstance(val, datetime.datetime): - return val.time() - try: - return datetime.datetime.strptime(str(val), '%H:%M:%S').time() - except Exception: - return None - df[col] = df[col].apply(to_time) - - return df - -def insert_data_to_db(engine, table, df): - with engine.begin() as conn: - for idx, row in df.iterrows(): - data = row.to_dict() - stmt = mysql_insert(table).values(**data) - update_cols = { - 'division': data.get('division'), - 'table_no': data.get('table_no'), - 'order_time': data.get('order_time'), - 'pay_time': data.get('pay_time'), - 'barcode': data.get('barcode'), - 'product_name': data.get('product_name'), - 'qty': data.get('qty'), - 'tot_sale_amt': data.get('tot_sale_amt'), - 'erp_cd': data.get('erp_cd'), - 'remark': data.get('remark'), - 'dc_amt': data.get('dc_amt'), - 'dc_type': data.get('dc_type'), - 'dcm_sale_amt': data.get('dcm_sale_amt'), - 'net_amt': data.get('net_amt'), - 'vat_amt': data.get('vat_amt'), - } - stmt = stmt.on_duplicate_key_update(**update_cols) - try: - conn.execute(stmt) - except Exception as e: - logging.error(f"{idx+1}행 DB 입력 실패: {e}") - logging.info("DB 저장 완료") - -class BillFileHandler(FileSystemEventHandler): - def __init__(self): - super().__init__() - self.processing_files = set() - - def on_created(self, event): - if event.is_directory: - return - filename = os.path.basename(event.src_path) - if FILE_PATTERN.match(filename): - if filename in self.processing_files: - return - self.processing_files.add(filename) - logging.info(f"새 파일 감지: {filename}") - try: - time.sleep(3) # 파일 완전 생성 대기 - sale_date, shop_cd, shop_name = parse_header(event.src_path) - logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") - - check_and_update_shop(shop_cd, shop_name) - - df = load_data(event.src_path, sale_date, shop_cd) - if df.empty: - logging.warning(f"데이터가 없습니다: {filename}") - else: - insert_data_to_db(db.engine, db_schema.pos_billdata, df) - logging.info(f"{filename} 처리 완료") - - os.remove(event.src_path) - logging.info(f"파일 삭제 완료: {filename}") - except Exception as e: - logging.error(f"{filename} 처리 실패: {e}") - finally: - self.processing_files.discard(filename) - -def process_existing_files(): +def main(): + """ + 대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다. + 처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력. + """ files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)] - if not files: - logging.info("처리할 기존 파일이 없습니다.") - return - logging.info(f"시작 시 발견된 처리 대상 파일 {len(files)}개") - handler = BillFileHandler() - for filename in files: - filepath = os.path.join(DATA_DIR, filename) - if filename in handler.processing_files: - continue - handler.processing_files.add(filename) - logging.info(f"기존 파일 처리 시작: {filename}") + print(f"[INFO] 발견된 파일 {len(files)}개") + + for file in files: + filepath = os.path.join(DATA_DIR, file) + print(f"[INFO] 파일: {file} 처리 시작") + try: - sale_date, shop_cd, shop_name = parse_header(filepath) - logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") + sale_date, shop_cd, shop_name = extract_file_info(filepath) + ensure_shop_exists(shop_cd, shop_name) - check_and_update_shop(shop_cd, shop_name) + raw_df = load_excel_data(filepath) + df = normalize_data(raw_df, sale_date, shop_cd) - df = load_data(filepath, sale_date, shop_cd) - if df.empty: - logging.warning(f"데이터가 없습니다: {filename}") - else: - insert_data_to_db(db.engine, db_schema.pos_billdata, df) - logging.info(f"{filename} 처리 완료") + affected = upsert_data(df) + print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}") + + # 처리 완료 후 파일 삭제 (필요 시 활성화) + # os.remove(filepath) + # print(f"[INFO] 처리 완료 후 파일 삭제: {file}") - os.remove(filepath) - logging.info(f"파일 삭제 완료: {filename}") except Exception as e: - logging.error(f"{filename} 처리 실패: {e}") - finally: - handler.processing_files.discard(filename) - -def monitor_folder(): - process_existing_files() # 최초 실행 시 한번 검사 - - event_handler = BillFileHandler() - observer = Observer() - observer.schedule(event_handler, path=DATA_DIR, recursive=False) - observer.start() - logging.info(f"폴더 모니터링 시작: {DATA_DIR}") - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - observer.stop() - logging.info("폴더 모니터링 종료 요청됨") - observer.join() - + print(f"[ERROR] {file} 처리 실패: {e}") if __name__ == "__main__": - monitor_folder() + main()