정상 작동하지 않아 복구함
This commit is contained in:
@ -1,7 +1,6 @@
|
|||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.observers.polling import PollingObserver # 필요시 사용
|
|
||||||
from watchdog.events import FileSystemEventHandler
|
from watchdog.events import FileSystemEventHandler
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
@ -21,40 +20,27 @@ class NewFileHandler(FileSystemEventHandler):
|
|||||||
self._processing_files = set()
|
self._processing_files = set()
|
||||||
|
|
||||||
def on_created(self, event):
|
def on_created(self, event):
|
||||||
self._handle_event(event)
|
|
||||||
|
|
||||||
def on_moved(self, event):
|
|
||||||
# moved event는 dest_path 사용
|
|
||||||
self._handle_event(event, moved=True)
|
|
||||||
|
|
||||||
def on_modified(self, event):
|
|
||||||
self._handle_event(event)
|
|
||||||
|
|
||||||
def _handle_event(self, event, moved=False):
|
|
||||||
if event.is_directory:
|
if event.is_directory:
|
||||||
return
|
return
|
||||||
|
filepath = event.src_path
|
||||||
filepath = event.dest_path if moved else event.src_path
|
|
||||||
filename = os.path.basename(filepath)
|
filename = os.path.basename(filepath)
|
||||||
|
|
||||||
if not filename.endswith(FILE_EXTENSIONS):
|
if not filename.endswith(FILE_EXTENSIONS):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# 처리 대상 여부 확인
|
||||||
if filename.startswith(BILL_PREFIX) or filename.startswith(DAILY_PRODUCT_PREFIX):
|
if filename.startswith(BILL_PREFIX) or filename.startswith(DAILY_PRODUCT_PREFIX):
|
||||||
with self._lock:
|
print(f"[WATCHER] 신규 파일 감지: {filename}")
|
||||||
if filename in self._processing_files:
|
|
||||||
print(f"[WATCHER] {filename} 이미 처리 중")
|
|
||||||
return
|
|
||||||
self._processing_files.add(filename)
|
|
||||||
|
|
||||||
threading.Thread(target=self.process_file, args=(filepath, filename), daemon=True).start()
|
threading.Thread(target=self.process_file, args=(filepath, filename), daemon=True).start()
|
||||||
|
|
||||||
def process_file(self, filepath, filename):
|
def process_file(self, filepath, filename):
|
||||||
try:
|
with self._lock:
|
||||||
# 파일 완전 생성/복사 대기 (파일 크기 변화 없을 때까지 기다림)
|
if filename in self._processing_files:
|
||||||
if not self.wait_for_file_stable(filepath):
|
print(f"[WATCHER] {filename} 이미 처리 중")
|
||||||
print(f"[WATCHER] 파일 안정화 실패, 처리 중단: {filename}")
|
|
||||||
return
|
return
|
||||||
|
self._processing_files.add(filename)
|
||||||
|
|
||||||
|
try:
|
||||||
|
time.sleep(3) # 파일 쓰기 완료 대기
|
||||||
|
|
||||||
print(f"[WATCHER] 파일 처리 시작: {filename}")
|
print(f"[WATCHER] 파일 처리 시작: {filename}")
|
||||||
if filename.startswith(BILL_PREFIX):
|
if filename.startswith(BILL_PREFIX):
|
||||||
@ -77,42 +63,10 @@ class NewFileHandler(FileSystemEventHandler):
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
self._processing_files.discard(filename)
|
self._processing_files.discard(filename)
|
||||||
|
|
||||||
def wait_for_file_stable(self, filepath, wait_seconds=10, interval=1):
|
|
||||||
"""
|
|
||||||
파일 크기가 interval 간격으로 변하지 않으면 안정화된 것으로 판단
|
|
||||||
"""
|
|
||||||
last_size = -1
|
|
||||||
stable_count = 0
|
|
||||||
required_stable_count = int(wait_seconds / interval)
|
|
||||||
|
|
||||||
for _ in range(required_stable_count):
|
|
||||||
try:
|
|
||||||
current_size = os.path.getsize(filepath)
|
|
||||||
except FileNotFoundError:
|
|
||||||
# 파일이 없어지면 중단
|
|
||||||
return False
|
|
||||||
|
|
||||||
if current_size == last_size:
|
|
||||||
stable_count += 1
|
|
||||||
if stable_count >= required_stable_count:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
stable_count = 0
|
|
||||||
last_size = current_size
|
|
||||||
|
|
||||||
time.sleep(interval)
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def start_watching():
|
def start_watching():
|
||||||
print(f"[WATCHER] '{DATA_DIR}' 폴더 감시 시작")
|
print(f"[WATCHER] '{DATA_DIR}' 폴더 감시 시작")
|
||||||
|
|
||||||
event_handler = NewFileHandler()
|
event_handler = NewFileHandler()
|
||||||
|
observer = Observer()
|
||||||
# 기본 Observer 대신 PollingObserver로 변경 가능 (Docker 환경에서 안정적)
|
|
||||||
#observer = Observer()
|
|
||||||
observer = PollingObserver() # 필요시 이걸로 바꿔서 사용
|
|
||||||
|
|
||||||
observer.schedule(event_handler, DATA_DIR, recursive=False)
|
observer.schedule(event_handler, DATA_DIR, recursive=False)
|
||||||
observer.start()
|
observer.start()
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user