Commit a326b311 authored by 邱阿朋's avatar 邱阿朋

refactor(gui): 重构店铺管理工具GUI,提升界面与流程体验

- 重构VCManagerGUI类,拆分功能,简化代码结构
- 使用ttkbootstrap实现现代化深色主题和美观样式
- 增加暂停/继续按钮,支持任务暂停控制
- 优化日志显示,添加日志级别颜色和时间戳
- 实现日志队列异步处理,提升界面响应
- 完善进度条显示,自动根据日志数据更新
- 统一处理器创建逻辑,简化不同功能调用
- 细化输入验证和用户交互提示,提升使用体验
- 优化浏览器初始化与资源清理流程,保证稳定性
- 删除无用的console日志模块,减少冗余代码
- AutoInterface接口中新增运行状态控制方法,支持暂停机制
- 各业务类新增调用父类初始化和暂停检查,实现统一控制暂停状态
parent ff5e2359
# coding: utf-8
import logging
from app.logger.logger import Logger
class ColoredFormatter(logging.Formatter):
def format(self, record):
# 为不同的日志级别定义一些颜色
colors = {
'DEBUG': '\033[94m', # blue
'INFO': '\033[92m', # green
'WARNING': '\033[93m', # yellow
'ERROR': '\033[91m', # red
'CRITICAL': '\033[95m' # magenta
}
# 从记录中获取原始消息
message = super().format(record)
# 如果日志级别定义了颜色,则添加颜色代码
if record.levelname in colors:
color_code = colors[record.levelname]
message = f"{color_code}{message}\033[0m"
return message
class ConsoleLog(Logger):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ConsoleLog, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
# 使用彩色格式化程序创建控制台处理程序
console_handler = logging.StreamHandler()
console_handler.setFormatter(ColoredFormatter('%(asctime)s %(message)s'))
# 将控制台处理程序添加到记录器
self.logger.addHandler(console_handler)
def info(self, arg):
self.logger.info(arg)
def debug(self, arg):
self.logger.debug(arg)
def warning(self, arg):
self.logger.warning(arg)
def error(self, arg):
self.logger.error(arg)
\ No newline at end of file
......@@ -32,5 +32,4 @@ class GuiLog(Logger):
self.__message("ERROR", arg)
def __message(self, t, message):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._log_queue.put(f"[{t}] [{timestamp}] {message}")
self._log_queue.put(f"[{t}] {message}")
......@@ -16,6 +16,7 @@ from app.vc.interface import AutoInterface
class AdvertCost(AutoInterface):
def __init__(self, logger: Logger, page: Page, country: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.page = page
self.country = country
......@@ -177,8 +178,8 @@ class AdvertCost(AutoInterface):
store_ids = country_store_ids[self.country]
for store_id in store_ids:
while not self.running:
time.sleep(1)
# 检查暂停状态
self.check_pause()
name = store_id
self.logger.debug(f"开始处理店铺:{name}")
......
# coding: utf-8
from abc import ABC, abstractmethod
import time
# 定义一个接口
class AutoInterface(ABC):
def __init__(self):
self._running = True # 运行状态标志
def set_running(self, status: bool):
"""设置运行状态"""
self._running = status
def is_running(self) -> bool:
"""获取运行状态"""
return self._running
def check_pause(self):
"""检查暂停状态,如果暂停则等待"""
while not self._running:
time.sleep(0.5)
@abstractmethod
def run(self, file_name: str):
pass
......
......@@ -20,6 +20,7 @@ from app.vc.interface import AutoInterface
class Payment(AutoInterface):
def __init__(self, logger: Logger, page: Page, country: str, payee_code: str, shop_code: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.page = page
self.country = country
......@@ -313,9 +314,13 @@ class Payment(AutoInterface):
all_price_pay_data = []
i = 0
for _, data in list_data.iterrows():
# 检查暂停状态
self.check_pause()
i += 1
invoice_number = data.get("Invoice Number")
invoice_amount = data.get("Invoice Amount")
self.logger.info({"index": i, "invoice_number": invoice_number})
# 获取当前订单的Payee和优惠比例
vendor_payment_terms = self.__get_po_code(i, invoice_number)
......
......@@ -11,6 +11,7 @@ from app.vc.interface import AutoInterface
class PaymentPush(AutoInterface):
def __init__(self, logger: Logger, country: str, shop_code: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.country = country
self.shop_code = shop_code
......@@ -55,8 +56,12 @@ class PaymentPush(AutoInterface):
i = 0
for _, data in invoices.iterrows():
# 检查暂停状态
self.check_pause()
i += 1
payment_number = data.get("Payment Number")
self.logger.info({"index": i, "payment_number": payment_number})
payment_date = payments_map.get(payment_number, {}).get('Payment Date', '')
platform_payable_amount = data.get('Invoice Amount', '')
if self.country == 'FR' or self.country == 'UK':
......
......@@ -16,6 +16,7 @@ from datetime import datetime, timedelta
class ProductSales(AutoInterface):
def __init__(self, logger: Logger, page: Page, country: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.page = page
self.country = country
......@@ -50,8 +51,9 @@ class ProductSales(AutoInterface):
self.page.ele("#raw_xlsx_btn").click()
self.logger.debug("点击导出等待下载任务提交...")
while True:
while not self.running:
time.sleep(1)
# 检查暂停状态
self.check_pause()
try:
self.page.wait(3)
......
......@@ -15,6 +15,7 @@ from app.vc.interface import AutoInterface
class ReturnGoods(AutoInterface):
def __init__(self, logger: Logger, page: Page, country: str, shop_code: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.page = page
self.country = country
......@@ -63,6 +64,9 @@ class ReturnGoods(AutoInterface):
new_list_data = []
i = 0
for _, data in list_data.iterrows():
# 检查暂停状态
self.check_pause()
i += 1
return_id = data.get('Return ID')
self.logger.info({"index": i, "return_id": return_id})
......
......@@ -17,6 +17,7 @@ from app.vc.interface import AutoInterface
class Spa(AutoInterface):
def __init__(self, logger: Logger, page: Page, country: str, shop_code: str):
super().__init__() # 调用父类初始化
self.logger = logger
self.page = page
self.country = country
......@@ -407,6 +408,9 @@ class Spa(AutoInterface):
large_sheet_data = {} # 保存大数据(需要分 Sheet)
# 遍历合作列表
for index, coop in coop_list.iterrows():
# 检查暂停状态
self.check_pause()
index += 1
invoice_id = coop.get("Invoice ID") # 获取发票 ID
self.logger.info({"index": index, "invoice_id": invoice_id})
......
# coding: utf-8
"""
店铺管理工具 - 现代化 GUI 版本
支持 SPA 查询、退货查询、ERP 回款等功能
"""
import os
import threading
import queue
......@@ -8,8 +13,11 @@ from datetime import datetime
from tkinter import filedialog, simpledialog, messagebox
from DrissionPage import ChromiumPage
from dotenv import load_dotenv
import pandas as pd
import ttkbootstrap as ttk
from ttkbootstrap.scrolled import ScrolledText
from ttkbootstrap.constants import *
from app.helper import domain
from app.logger.gui import GuiLog
from app.vc.payment import Payment
......@@ -18,351 +26,735 @@ from app.vc.return_goods import ReturnGoods
from app.vc.spa import Spa
class VCManagerGUI(ttk.Window):
def __init__(self, logger):
super().__init__(themename="cosmo")
class VCManagerGUI:
"""店铺管理工具 GUI 类"""
self.run_btn = None
self.file_entry = None
self.action_var = None
self.shop_entry = None
self.country_var = None
self.push_msg_var = None
self.log_text = None
self.title("店铺管理工具")
self.geometry("700x800")
# 窗口配置 - 加高窗口以显示更多日志
WINDOW_WIDTH = 620
WINDOW_HEIGHT = 800
# 设置窗口属性禁止缩放
self.resizable(False, False)
# 国家配置
COUNTRY_OPTIONS = [
("🇺🇸 美国", "US"),
("🇬🇧 英国", "UK"),
("🇯🇵 日本", "JP"),
("🇫🇷 法国", "FR"),
("🇩🇪 德国", "DE"),
("🇨🇦 加拿大", "CA"),
]
# 功能配置
ACTION_OPTIONS = [
("📊 SPA查询", "spa"),
("📦 退货查询", "return"),
("💰 ERP回款", "payment_erp"),
]
# 推送配置
PUSH_OPTIONS = [
("✅ 是", "1"),
("❌ 否", "0"),
]
# 初始化状态变量
def __init__(self, logger):
"""初始化应用程序"""
# 状态变量
self.page = None
self.running = False
self.paused = False # 暂停状态
self.processor = None # 当前处理器
self.domain_login = None # 域名登录对象
self.log_queue = queue.Queue()
self.payee_code = None
self.total_count = 0 # 数据总条数
self.current_progress = 0 # 当前进度
# 设置日志处理
# GUI 组件引用
self.log_text = None
self.country_var = None
self.action_var = None
self.push_msg_var = None
self.file_entry = None
self.progress_bar = None
self.progress_label = None
self.status_label = None
self.run_btn = None
self.pause_btn = None # 暂停按钮
self.stats_labels = {}
# 设置日志
logger.set_console(self.log_queue)
self.logger = logger
# 设置窗口尺寸并居中
# 初始化主窗口 - 使用深色主题
self.root = ttk.Window(themename="darkly")
self.root.title("🛠️ 店铺管理工具")
self.root.resizable(False, False)
# 设置自定义样式
self._setup_custom_styles()
# 构建 GUI
self._build_gui()
# 窗口居中
self._center_window()
# 创建界面组件
self.create_widgets()
# 启动日志处理
self.root.after(100, self._process_log_queue)
# 启动日志处理循环
self.after(100, self.process_log_queue) # type: ignore
@staticmethod
def _setup_custom_styles():
"""设置自定义样式"""
style = ttk.Style()
# 标题样式
style.configure(
'Title.TLabel',
font=('Segoe UI', 20, 'bold'),
foreground='#ffffff'
)
# 副标题样式
style.configure(
'Subtitle.TLabel',
font=('Segoe UI', 9),
foreground='#a0a0a0'
)
# 区域标题样式
style.configure(
'SectionTitle.TLabel',
font=('Segoe UI', 10, 'bold'),
foreground='#74b9ff'
)
# 统计数字样式
style.configure(
'Stats.TLabel',
font=('Segoe UI', 14, 'bold'),
foreground='#00cec9'
)
def _center_window(self):
"""设置窗口居中"""
window_width = 700
window_height = 800
# 获取屏幕尺寸
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
# 计算居中坐标
x = (screen_width - window_width) // 2
y = (screen_height - window_height) // 2
# 设置窗口位置
self.geometry(f"{window_width}x{window_height}+{x}+{y}")
def create_widgets(self):
"""创建主界面布局"""
main_frame = ttk.Frame(self)
main_frame.pack(fill='both', expand=True, padx=15, pady=15)
# 配置区域
config_frame = ttk.Labelframe(main_frame, text="配置参数", padding=10)
config_frame.pack(fill='x', pady=10)
# 国家选择
country_frame = ttk.Frame(config_frame)
country_frame.grid(row=0, column=0, sticky=W, pady=5)
ttk.Label(country_frame, text="国家:", width=6).pack(side='left')
"""窗口居中"""
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
x = int((screen_width - self.WINDOW_WIDTH) / 2)
y = int((screen_height - self.WINDOW_HEIGHT) / 3)
self.root.geometry(f"{self.WINDOW_WIDTH}x{self.WINDOW_HEIGHT}+{x}+{y}")
def _build_gui(self):
"""构建完整 GUI 界面"""
# 主容器
main_container = ttk.Frame(self.root, padding=12)
main_container.pack(fill='both', expand=True)
# 1. 标题区域 - 紧凑
self._build_header(main_container)
# 2. 配置卡片 - 紧凑
self._build_config_card(main_container)
# 3. 文件选择 - 紧凑
self._build_file_card(main_container)
# 4. 日志区域 - 主要区域,占用大部分空间
self._build_log_area(main_container)
# 5. 底部状态栏
self._build_status_bar(main_container)
@staticmethod
def _build_header(parent):
"""构建标题区域 - 紧凑版"""
header_frame = ttk.Frame(parent)
header_frame.pack(fill='x', pady=(0, 8))
# 主标题
title_label = ttk.Label(
header_frame,
text="🛠️ 店铺管理工具",
style='Title.TLabel'
)
title_label.pack()
def _build_config_card(self, parent):
"""构建配置卡片区域 - 紧凑版"""
# 卡片容器
card_frame = ttk.LabelFrame(
parent,
text=" ⚙️ 配置参数 ",
padding=8,
bootstyle="info"
)
card_frame.pack(fill='x', pady=(0, 6))
# === 国家选择区 ===
self.country_var = ttk.StringVar(value="US")
countries = [
("美国", "US"),
("英国", "UK"),
("日本", "JP"),
("法国", "FR"),
("德国", "DE"),
("加拿大", "CA"),
]
for i, (text, value) in enumerate(countries):
rb = ttk.Radiobutton(
country_frame,
text=text,
country_row = ttk.Frame(card_frame)
country_row.pack(fill='x', pady=(0, 5))
ttk.Label(
country_row,
text="📍 市场:",
font=('Segoe UI', 9, 'bold'),
foreground='#74b9ff'
).pack(side='left', padx=(0, 8))
for i, (name, code) in enumerate(self.COUNTRY_OPTIONS):
btn = ttk.Radiobutton(
country_row,
text=name,
value=code,
variable=self.country_var,
value=value
bootstyle="info-toolbutton",
width=8
)
rb.pack(side='left', padx=(10 if i != 0 else 0))
# 功能选择
action_frame = ttk.Frame(config_frame)
action_frame.grid(row=1, column=0, sticky=W, pady=5)
ttk.Label(action_frame, text="功能:", width=6).pack(side='left')
self.action_var = ttk.StringVar()
actions = [
("SPA查询", "spa"),
("退货查询", "return"),
# ("回款查询", "payment"),
("ERP回款", "payment_erp")
]
for i, (text, value) in enumerate(actions):
rb = ttk.Radiobutton(
action_frame,
text=text,
btn.pack(side='left', padx=2)
# === 功能选择区 ===
self.action_var = ttk.StringVar(value="spa")
action_row = ttk.Frame(card_frame)
action_row.pack(fill='x', pady=(0, 5))
ttk.Label(
action_row,
text="🎯 功能:",
font=('Segoe UI', 9, 'bold'),
foreground='#00b894'
).pack(side='left', padx=(0, 8))
for i, (name, code) in enumerate(self.ACTION_OPTIONS):
btn = ttk.Radiobutton(
action_row,
text=name,
value=code,
variable=self.action_var,
value=value
bootstyle="success-toolbutton",
width=10
)
rb.pack(side='left', padx=(10 if i != 0 else 0))
btn.pack(side='left', padx=2)
# 是否推送消息
push_msg_frame = ttk.Frame(config_frame)
push_msg_frame.grid(row=2, column=0, sticky=W, pady=5)
ttk.Label(push_msg_frame, text="推送:", width=6).pack(side='left')
# === 推送选择区 ===
self.push_msg_var = ttk.StringVar(value="1")
push_msg_actions = [
("是", "1"),
("否", "0")
]
for i, (text, value) in enumerate(push_msg_actions):
rb = ttk.Radiobutton(
push_msg_frame,
text=text,
push_row = ttk.Frame(card_frame)
push_row.pack(fill='x')
ttk.Label(
push_row,
text="📤 推送:",
font=('Segoe UI', 9, 'bold'),
foreground='#fdcb6e'
).pack(side='left', padx=(0, 8))
for i, (name, code) in enumerate(self.PUSH_OPTIONS):
btn = ttk.Radiobutton(
push_row,
text=name,
value=code,
variable=self.push_msg_var,
value=value
bootstyle="warning-toolbutton",
width=6
)
btn.pack(side='left', padx=2)
def _build_file_card(self, parent):
"""构建文件选择卡片 - 紧凑版"""
file_frame = ttk.LabelFrame(
parent,
text=" 📁 数据文件 ",
padding=8,
bootstyle="secondary"
)
rb.pack(side='left', padx=(10 if i != 0 else 0))
file_frame.pack(fill='x', pady=(0, 6))
# 文件选择区域
file_frame = ttk.Labelframe(main_frame, text="数据文件", padding=10)
file_frame.pack(fill='x', pady=10)
# 文件选择
file_row = ttk.Frame(file_frame)
file_row.pack(fill='x')
self.file_entry = ttk.Entry(file_frame)
self.file_entry.pack(side='left', fill='x', expand=True, padx=5)
ttk.Button(file_frame, text="浏览", command=self.select_file, width=8).pack(side='left')
self.file_entry = ttk.Entry(
file_row,
font=('Consolas', 9)
)
self.file_entry.pack(side='left', fill='x', expand=True, padx=(0, 8))
browse_btn = ttk.Button(
file_row,
text="📂 浏览",
command=self._select_file,
bootstyle="outline",
width=8
)
browse_btn.pack(side='right')
# 控制按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill='x', pady=15)
self.run_btn = ttk.Button(btn_frame, text="开始执行", command=self.start_process,
style=PRIMARY, width=12)
self.run_btn.pack(side='left', padx=5)
ttk.Button(btn_frame, text="清除日志", command=self.clear_log, width=10).pack(side='left', padx=5)
ttk.Button(btn_frame, text="退出", command=self.quit_app, width=8).pack(side='right')
# === 进度区域 ===
progress_row = ttk.Frame(file_frame)
progress_row.pack(fill='x', pady=(8, 0))
# 日志显示
log_frame = ttk.Labelframe(main_frame, text="操作日志", padding=10)
log_frame.pack(fill='both', expand=True)
self.progress_label = ttk.Label(
progress_row,
text="⏳ 准备就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.progress_label.pack(anchor='w')
self.progress_bar = ttk.Progressbar(
progress_row,
mode='determinate',
bootstyle="success-striped",
maximum=100,
value=0
)
self.progress_bar.pack(fill='x', pady=(3, 8))
# === 操作按钮 ===
button_frame = ttk.Frame(file_frame)
button_frame.pack()
self.run_btn = ttk.Button(
button_frame,
text="🚀 开始执行",
command=self._start_process,
bootstyle="success",
width=14
)
self.run_btn.pack(side='left', padx=5)
self.log_text = ttk.ScrolledText(log_frame, state=DISABLED)
self.pause_btn = ttk.Button(
button_frame,
text="⏸ 暂停",
command=self._toggle_pause,
bootstyle="warning-outline",
width=14,
state='disabled'
)
self.pause_btn.pack(side='left', padx=5)
def _build_log_area(self, parent):
"""构建日志显示区域 - 主要区域"""
log_frame = ttk.LabelFrame(
parent,
text=" 📝 运行日志 ",
padding=8,
bootstyle="dark"
)
log_frame.pack(fill='both', expand=True, pady=(0, 5))
# 使用 ScrolledText - 增大高度
self.log_text = ScrolledText(
log_frame,
height=20, # 增大日志区域高度
autohide=True,
bootstyle="dark"
)
self.log_text.pack(fill='both', expand=True)
def select_file(self):
# 设置日志文本样式
self.log_text.text.configure(
font=('Consolas', 10), # 稍大的字体
bg='#1e272e',
fg='#dfe6e9',
insertbackground='#dfe6e9',
selectbackground='#74b9ff',
padx=10,
pady=10
)
# 配置日志标签颜色
self.log_text.text.tag_configure('info', foreground='#74b9ff')
self.log_text.text.tag_configure('success', foreground='#00b894')
self.log_text.text.tag_configure('warning', foreground='#fdcb6e')
self.log_text.text.tag_configure('error', foreground='#e17055')
self.log_text.text.tag_configure('time', foreground='#636e72')
def _build_status_bar(self, parent):
"""构建底部状态栏"""
status_frame = ttk.Frame(parent, padding=(0, 3))
status_frame.pack(fill='x')
self.status_label = ttk.Label(
status_frame,
text="✨ 就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.status_label.pack(side='left')
# 版本信息
version_label = ttk.Label(
status_frame,
text="v2.0.0",
font=('Segoe UI', 8),
foreground='#636e72'
)
version_label.pack(side='right')
# ==================== 日志方法 ====================
def _log(self, message: str, level: str = 'info'):
"""添加日志消息"""
timestamp = datetime.now().strftime('%H:%M:%S')
# 插入时间戳
self.log_text.text.insert('end', f"[{timestamp}] ", 'time')
# 级别前缀
level_prefixes = {
'info': 'ℹ️ ',
'success': '✅ ',
'warning': '⚠️ ',
'error': '❌ '
}
prefix = level_prefixes.get(level, 'ℹ️ ')
# 插入消息
self.log_text.text.insert('end', f"{prefix}{message}\n", level)
self.log_text.text.see('end')
self.root.update()
def _set_running_state(self, is_running: bool):
"""设置运行状态"""
self.running = is_running
if is_running:
self.run_btn.configure(state='disabled', text="⏳ 执行中...")
self.pause_btn.configure(state='normal')
self.status_label.configure(
text="🔄 正在执行中...",
foreground='#00b894'
)
self.progress_bar.configure(value=0)
self.current_progress = 0
self.progress_label.configure(text="🔄 进度: 0%")
else:
self.run_btn.configure(state='normal', text="🚀 开始执行")
self.pause_btn.configure(state='disabled', text="⏸ 暂停")
self.status_label.configure(
text="✨ 就绪",
foreground='#b2bec3'
)
self.progress_bar.configure(value=0)
self.progress_label.configure(text="⏳ 准备就绪")
self.paused = False
self.current_progress = 0
def _update_progress(self, current: int, total: int = None):
"""更新进度条"""
if total is not None:
self.total_count = total
if self.total_count > 0:
self.current_progress = current
percent = int((current / self.total_count) * 100)
self.progress_bar.configure(value=percent)
self.progress_label.configure(text=f"🔄 进度: {current}/{self.total_count} ({percent}%)")
self.root.update()
def _toggle_pause(self):
"""暂停/继续"""
self.paused = not self.paused
if self.paused:
self.pause_btn.configure(text="▶ 继续", bootstyle="success")
self.status_label.configure(text="⏸ 已暂停", foreground='#fdcb6e')
# 保持当前进度显示
if self.total_count > 0:
percent = int((self.current_progress / self.total_count) * 100)
self.progress_label.configure(text=f"⏸ 已暂停 ({self.current_progress}/{self.total_count})")
else:
self.progress_label.configure(text="⏸ 已暂停")
self._log("已暂停", 'warning')
# 通知处理器暂停
if self.processor and hasattr(self.processor, 'set_running'):
self.processor.set_running(False)
if self.domain_login and hasattr(self.domain_login, 'set_status'):
self.domain_login.set_status(False)
else:
self.pause_btn.configure(text="⏸ 暂停", bootstyle="warning-outline")
self.status_label.configure(text="🔄 正在执行中...", foreground='#00b894')
if self.total_count > 0:
percent = int((self.current_progress / self.total_count) * 100)
self.progress_label.configure(text=f"🔄 进度: {self.current_progress}/{self.total_count} ({percent}%)")
else:
self.progress_label.configure(text="🔄 正在处理...")
self._log("已继续", 'info')
# 通知处理器继续
if self.processor and hasattr(self.processor, 'set_running'):
self.processor.set_running(True)
if self.domain_login and hasattr(self.domain_login, 'set_status'):
self.domain_login.set_status(True)
# ==================== 业务逻辑 ====================
def _select_file(self):
"""选择数据文件"""
path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx *.xls")])
path = filedialog.askopenfilename(
filetypes=[("Excel文件", "*.xlsx *.xls")],
title="选择数据文件"
)
if path:
self.file_entry.delete(0, ttk.END)
self.file_entry.insert(0, path)
self._log(f"已选择文件: {os.path.basename(path)}", 'info')
def start_process(self):
"""启动处理线程"""
if self.running:
messagebox.showwarning("警告", "已有任务正在运行")
return
if not self.validate_input():
return
self.running = True
self.run_btn.config(state=ttk.DISABLED)
threading.Thread(target=self.run_process, daemon=True).start()
# 读取 Excel 文件并显示数据条数
try:
df = pd.read_excel(path)
row_count = len(df)
col_count = len(df.columns)
self.total_count = row_count # 保存总条数
self._log(f"数据加载完成: 共 {row_count} 条记录, {col_count} 个字段", 'success')
self.progress_label.configure(text=f"已加载 {row_count} 条数据")
except Exception as e:
self._log(f"读取文件失败: {str(e)}", 'error')
def validate_input(self):
def _validate_input(self) -> bool:
"""验证输入有效性"""
if not self.file_entry.get():
messagebox.showerror("输入错误", "请选择数据文件")
messagebox.showerror("❌ 输入错误", "请选择数据文件!")
return False
if not self.action_var.get():
messagebox.showerror("输入错误", "请选择要执行的功能")
messagebox.showerror("❌ 输入错误", "请选择要执行的功能!")
return False
return True
def run_process(self):
def _start_process(self):
"""启动处理线程"""
if self.running:
messagebox.showwarning("⚠️ 警告", "已有任务正在运行!")
return
if not self._validate_input():
return
self._set_running_state(True)
self._log("启动处理任务...", 'info')
threading.Thread(target=self._run_process, daemon=True).start()
def _run_process(self):
"""后台处理流程"""
try:
# 初始化浏览器
self.log("初始化浏览器引擎...")
self.init_browser()
self._log("初始化浏览器引擎...", 'info')
self._init_browser()
# 获取参数
params = {
'country': self.country_var.get(),
'action': self.action_var.get(),
'file_name': self.file_entry.get()
}
country = self.country_var.get()
action = self.action_var.get()
file_name = self.file_entry.get()
# 获取显示名称
country_names = {c: n for n, c in self.COUNTRY_OPTIONS}
action_names = {c: n for n, c in self.ACTION_OPTIONS}
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
# 特殊参数处理
if params['action'] == "payment":
self.get_payee_code()
if action == "payment":
self._get_payee_code()
if not self.payee_code:
return
# 创建处理器实例
processor = self.create_processor(params)
self.processor = self._create_processor(country, action)
# 切换域名
domain_login = domain.LoginDomain(self.logger, self.page, self.country_var.get())
domain_login.set_status(True)
domain_login.login_check()
self._log(f"登录 {country_names.get(country, country)} 站点...", 'info')
self.domain_login = domain.LoginDomain(self.logger, self.page, country)
self.domain_login.set_status(True)
self.domain_login.login_check()
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
# 执行核心操作
self.log(f"开始执行 {params['action']} 操作...")
processor.run(params['file_name'])
self._log(f"开始执行 {action_names.get(action, action)} 操作...", 'info')
if hasattr(self.processor, 'set_running'):
self.processor.set_running(True)
self.processor.run(file_name)
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
# 推送数据
if self.push_msg_var.get() == "1":
processor.push_data_queue()
self._log("推送数据到消息队列...", 'info')
self.processor.push_data_queue()
self._log("操作成功完成!", 'success')
self.log("操作成功完成!")
# 设置进度为100%
if self.total_count > 0:
self._update_progress(self.total_count, self.total_count)
self.progress_bar.configure(value=100)
self.progress_label.configure(text="✅ 已完成 100%")
# 保存日志
self.save_log()
self._save_log()
# 清空日志
self.clear_log()
# 更新状态栏
self.root.after(0, lambda: self.status_label.configure(
text="✅ 任务完成",
foreground='#00b894'
))
except Exception as e:
self.log(f"发生错误:{str(e)}")
self.log(traceback.format_exc())
self._log(f"发生错误: {str(e)}", 'error')
self._log(traceback.format_exc(), 'error')
self.root.after(0, lambda: self.status_label.configure(
text="❌ 任务失败",
foreground='#e17055'
))
finally:
self._cleanup_resources()
def init_browser(self):
def _init_browser(self):
"""初始化浏览器配置"""
self.page = ChromiumPage()
self.page.set.load_mode.normal()
self.page.set.when_download_file_exists('overwrite')
download_path = os.path.join(os.getcwd())
self.page.set.download_path(download_path)
self.log(f"下载目录设置为:{download_path}")
self._log(f"下载目录: {download_path}", 'info')
def create_processor(self, params):
def _create_processor(self, country: str, action: str):
"""创建功能处理器"""
# 根据国家编码获取店铺代码
country = params['country']
shop_code = "VECELO"
if country == "JP":
shop_code = "JP-VC"
elif country == "UK":
shop_code = "UK-VC"
elif country == "DE":
shop_code = "DE-VC"
elif country == "FR":
shop_code = "FR-VC"
elif country == "CA":
shop_code = "CA-VC"
action = params['action']
if action == "payment":
return Payment(self.logger, self.page, country, self.payee_code, shop_code)
elif action == "payment_erp":
return PaymentPush(self.logger, country, shop_code)
elif action == "return":
return ReturnGoods(self.logger, self.page, country, shop_code)
elif action == "spa":
return Spa(self.logger, self.page, country, shop_code)
shop_code_map = {
"JP": "JP-VC",
"UK": "UK-VC",
"DE": "DE-VC",
"FR": "FR-VC",
"CA": "CA-VC"
}
shop_code = shop_code_map.get(country, "VECELO")
# 创建处理器
processors = {
"payment": lambda: Payment(self.logger, self.page, country, self.payee_code, shop_code),
"payment_erp": lambda: PaymentPush(self.logger, country, shop_code),
"return": lambda: ReturnGoods(self.logger, self.page, country, shop_code),
"spa": lambda: Spa(self.logger, self.page, country, shop_code),
}
processor_factory = processors.get(action)
if processor_factory:
return processor_factory()
else:
raise ValueError(f"未知的功能类型{action}")
raise ValueError(f"未知的功能类型: {action}")
def get_payee_code(self):
def _get_payee_code(self):
"""获取回款Code"""
self.after(0, self._show_payee_dialog) # type: ignore
self.root.after(0, self._show_payee_dialog)
while self.running and not self.payee_code:
time.sleep(0.1)
def _show_payee_dialog(self):
"""显示回款Code输入对话框"""
self.payee_code = simpledialog.askstring(
"回款信息",
"💰 回款信息",
"请输入回款Code(从URL参数获取):",
parent=self,
parent=self.root,
initialvalue="VECET"
)
if not self.payee_code:
self.log("用户取消回款Code输入")
self._log("用户取消回款Code输入", 'warning')
self.running = False
def log(self, message):
"""记录日志到队列"""
self.logger.info(message)
# timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# self.log_queue.put(f"[{timestamp}] {message}")
def process_log_queue(self):
"""处理日志队列"""
while not self.log_queue.empty():
msg = self.log_queue.get()
self.log_text.configure(state=ttk.NORMAL)
self.log_text.insert(ttk.END, msg + "\n")
self.log_text.configure(state=ttk.DISABLED)
self.log_text.see(ttk.END)
self.after(100, self.process_log_queue) # type: ignore
def clear_log(self):
"""清除日志"""
self.log_text.configure(state=ttk.NORMAL)
self.log_text.delete(1.0, ttk.END)
self.log_text.configure(state=ttk.DISABLED)
def save_log(self):
def _save_log(self):
"""保存日志"""
# 获取 ScrolledText 控件中的内容,从头(1.0)到末尾(END)
content = self.log_text.get(1.0, ttk.END).strip()
content = self.log_text.text.get(1.0, ttk.END).strip()
if content:
# 获取当前工作目录并保存为 output.txt
current_dir = os.getcwd()
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
file_name = "running.log"
result_file_name = f"{current_datetime}_{self.action_var.get()}_{file_name}"
file_path = os.path.join(current_dir, result_file_name)
# 打开文件,写入内容并关闭
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
def cleanup_resources(self):
self._log(f"日志已保存: {result_file_name}", 'success')
def _cleanup_resources(self):
"""清理资源"""
if self.page:
try:
self.page.close()
self.log("浏览器资源已释放")
self._log("浏览器资源已释放", 'info')
except Exception as e:
self.log(f"释放浏览器资源时出错:{str(e)}")
self._log(f"释放浏览器资源时出错: {str(e)}", 'warning')
finally:
self.page = None
self.processor = None
self.domain_login = None
self.running = False
self.after(0, lambda: self.run_btn.config(state=ttk.NORMAL)) # type: ignore
self.root.after(0, lambda: self._set_running_state(False))
def _process_log_queue(self):
"""处理日志队列"""
import re
while not self.log_queue.empty():
msg = self.log_queue.get()
def quit_app(self):
"""安全退出程序"""
if messagebox.askokcancel("退出", "确认要退出程序吗?"):
self.cleanup_resources()
self.destroy()
# 解析进度信息 - 匹配 'index': X 的格式
index_match = re.search(r"['\"]index['\"]:\s*(\d+)", msg)
if index_match and self.total_count > 0:
current_index = int(index_match.group(1))
self._update_progress(current_index, self.total_count)
# 判断日志级别
level = 'info'
if '错误' in msg or 'Error' in msg or '失败' in msg:
level = 'error'
elif '成功' in msg or '完成' in msg:
level = 'success'
elif '警告' in msg or '暂停' in msg:
level = 'warning'
self._log(msg, level)
self.root.after(100, self._process_log_queue)
def run(self):
"""运行应用程序"""
try:
self._log("应用程序已启动", 'info')
self.root.mainloop()
finally:
self._cleanup_resources()
if __name__ == "__main__":
try:
log = GuiLog()
load_dotenv()
log = GuiLog()
app = VCManagerGUI(log)
app.mainloop()
app.run()
except KeyboardInterrupt:
exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment