Commit ff5e2359 authored by 邱阿朋's avatar 邱阿朋

refactor(gui): 重构Amazon价格爬取工具和VC Manager GUI

- 重构Amazon价格爬取工具,采用现代化的GUI设计和更完善的界面布局
- 改进日志功能,增加彩色日志显示和等级区分
- 添加用户友好的国家选择、文件加载和状态显示组件
- 支持异步爬取,提供暂停、继续功能和进度条反馈
- 优化浏览器初始化与清理逻辑,提升稳定性和错误处理能力
- 增加结果数据自动保存与RabbitMQ消息队列推送功能
- VC Manager GUI 也进行重构,使用ttkbootstrap深色主题和响应式布局
- 增加多国家、多功能选择,进度条,统计信息及操作按钮的清晰布局
- 统一代码风格,拆分GUI构建方法,提升维护性和用户体验
parent 09a12cf5
"""
Amazon价格爬取工具
"""
import csv import csv
import re import re
import time import time
from typing import Tuple from typing import Tuple, List, Optional
from datetime import datetime
import pandas as pd import pandas as pd
from DrissionPage import ChromiumPage, ChromiumOptions from DrissionPage import ChromiumPage, ChromiumOptions
from dotenv import load_dotenv from dotenv import load_dotenv
from lxml import etree from lxml import etree
import ttkbootstrap as ttk import ttkbootstrap as ttk
from ttkbootstrap.constants import * from ttkbootstrap.scrolled import ScrolledText
from tkinter import filedialog, scrolledtext from ttkbootstrap.tooltip import ToolTip
from tkinter import filedialog
from threading import Thread from threading import Thread
import logging
from app.helper.domain import switch_domain from app.helper.domain import switch_domain
class AmazonPriceScraper: # 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ModernAmazonPriceScraper:
"""Amazon价格爬取工具的现代化GUI应用程序"""
# 颜色配置
COLORS = {
'success': '#28a745',
'danger': '#dc3545',
'warning': '#ffc107',
'info': '#17a2b8',
'primary': '#0d6efd',
'secondary': '#6c757d',
'dark': '#343a40',
'light': '#f8f9fa',
'card_bg': '#2d3436',
'text_muted': '#6c757d',
}
# 国家域名映射配置
COUNTRY_DOMAINS = {
"美国 (US)": "US",
"英国 (UK)": "UK",
"日本 (JP)": "JP",
"法国 (FR)": "FR",
"德国 (DE)": "DE",
"加拿大 (CA)": "CA",
}
# 窗口配置
WINDOW_WIDTH = 550
WINDOW_HEIGHT = 780
def __init__(self): def __init__(self):
self.log_text = None """初始化应用程序"""
self.file_label = None self.asins: List[str] = []
self.country_var = None self.page: Optional[ChromiumPage] = None
self.asins = [] self.is_running: bool = False
self.page = None # 浏览器延迟初始化 self.current_index: int = 0 # 当前进度索引
self.is_running = False self.results: List[Tuple[str, str]] = [] # 保存结果
self.success_count: int = 0
# GUI 初始化 self.failed_count: int = 0
self.root = ttk.Window(themename="flatly") self._scrape_thread: Optional[Thread] = None
self.root.title("Amazon价格爬取工具")
self.root.geometry("500x600") # GUI组件引用
self.log_text: Optional[ScrolledText] = None
self.file_label: Optional[ttk.Label] = None
self.country_var: Optional[ttk.StringVar] = None
self.progress_bar: Optional[ttk.Progressbar] = None
self.progress_label: Optional[ttk.Label] = None
self.status_label: Optional[ttk.Label] = None
self.start_btn: Optional[ttk.Button] = None
self.stop_btn: Optional[ttk.Button] = None
self.stats_labels: dict = {}
# 初始化主窗口 - 使用深色主题
self.root = ttk.Window(themename="darkly")
self.root.title("🛒 Amazon 价格爬取工具 Pro")
self.root.resizable(False, False) self.root.resizable(False, False)
# 国家域名映射 # 设置自定义样式
self.domains = { self._setup_custom_styles()
"美国": "US", "英国": "UK", "日本": "JP",
"法国": "FR", "德国": "DE", "加拿大": "CA"
}
self.setup_gui() # 构建GUI
self._build_gui()
# 设置窗口尺寸并居中 # 窗口居中
self._center_window() self._center_window()
def _center_window(self): @staticmethod
"""设置窗口居中""" def _setup_custom_styles():
window_width = 500 """设置自定义样式"""
window_height = 600 style = ttk.Style()
# 卡片框架样式
style.configure(
'Card.TFrame',
background='#2d3436',
relief='flat'
)
# 标题标签样式
style.configure(
'Title.TLabel',
font=('Segoe UI', 24, 'bold'),
foreground='#ffffff'
)
# 副标题样式
style.configure(
'Subtitle.TLabel',
font=('Segoe UI', 10),
foreground='#b2bec3'
)
# 区域标题样式
style.configure(
'SectionTitle.TLabel',
font=('Segoe UI', 11, 'bold'),
foreground='#74b9ff'
)
# 统计数字样式
style.configure(
'Stats.TLabel',
font=('Segoe UI', 18, 'bold'),
foreground='#00cec9'
)
# 统计标签样式
style.configure(
'StatsLabel.TLabel',
font=('Segoe UI', 9),
foreground='#b2bec3'
)
# 成功按钮样式
style.configure(
'success.TButton',
font=('Segoe UI', 10, 'bold')
)
# 危险按钮样式
style.configure(
'danger.TButton',
font=('Segoe UI', 10, 'bold')
)
# 获取屏幕尺寸 def _center_window(self):
"""将窗口居中显示"""
screen_width = self.root.winfo_screenwidth() screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight() screen_height = self.root.winfo_screenheight()
# 计算居中坐标 x = int((screen_width - self.WINDOW_WIDTH) / 2)
x = int((screen_width - window_width) / 1.1) y = int((screen_height - self.WINDOW_HEIGHT) / 3)
y = int((screen_height - window_height) / 3)
# 设置窗口位置 self.root.geometry(f"{self.WINDOW_WIDTH}x{self.WINDOW_HEIGHT}+{x}+{y}")
self.root.geometry(f"{window_width}x{window_height}+{x}+{y}")
def setup_gui(self): def _build_gui(self):
ttk.Label(self.root, text="选择国家:").pack(pady=5) """构建完整的GUI界面"""
self.country_var = ttk.StringVar(value="US") # 主容器 - 添加内边距
ttk.OptionMenu(self.root, self.country_var, "US", *self.domains.values()).pack(pady=5) main_container = ttk.Frame(self.root, padding=20)
main_container.pack(fill='both', expand=True)
ttk.Button(self.root, text="选择ASIN Excel文件", command=self.load_excel).pack(pady=5) # 1. 标题区域
self.file_label = ttk.Label(self.root, text="未选择文件") self._build_header(main_container)
self.file_label.pack(pady=5)
self.log_text = scrolledtext.ScrolledText(self.root, height=20, width=80) # 2. 配置卡片
self.log_text.pack(pady=10, padx=10, fill='both', expand=True) self._build_config_card(main_container)
button_frame = ttk.Frame(self.root) # 3. 统计卡片
button_frame.pack(pady=5) self._build_stats_card(main_container)
ttk.Button(button_frame, text="开始爬取", style=SUCCESS, command=self.start_scraping).pack(side='left', padx=5)
ttk.Button(button_frame, text="停止", style=DANGER, command=self.stop_scraping).pack(side='left', padx=5)
def log(self, message): # 4. 日志区域
self.log_text.insert(END, f"{time.strftime('%Y-%m-%d %H:%M:%S')}: {message}\n") self._build_log_area(main_container)
self.log_text.see(END)
# 5. 底部状态栏
self._build_status_bar(main_container)
@staticmethod
def _build_header(parent: ttk.Frame):
"""构建标题区域"""
header_frame = ttk.Frame(parent)
header_frame.pack(fill='x', pady=(0, 20))
# 主标题
title_label = ttk.Label(
header_frame,
text="🛒 Amazon 价格爬取工具",
style='Title.TLabel'
)
title_label.pack()
# 副标题
subtitle_label = ttk.Label(
header_frame,
text="快速批量获取Amazon商品价格,支持多站点爬取",
style='Subtitle.TLabel'
)
subtitle_label.pack(pady=(5, 0))
def _build_config_card(self, parent: ttk.Frame):
"""构建配置卡片区域"""
# 卡片容器
card_frame = ttk.LabelFrame(
parent,
text=" ⚙️ 爬取配置 ",
padding=15,
style="info"
)
card_frame.pack(fill='x', pady=(0, 15))
# 上半部分:国家选择和文件选择
top_row = ttk.Frame(card_frame)
top_row.pack(fill='x', pady=(0, 10))
# 国家选择
country_frame = ttk.Frame(top_row)
country_frame.pack(side='left', fill='x', expand=True)
ttk.Label(
country_frame,
text="📍 目标站点",
style='SectionTitle.TLabel'
).pack(anchor='w')
self.country_var = ttk.StringVar(value="美国 (US)")
country_menu = ttk.Combobox(
country_frame,
textvariable=self.country_var,
values=list(self.COUNTRY_DOMAINS.keys()),
state='readonly',
width=20,
style="info"
)
country_menu.pack(anchor='w', pady=(5, 0))
ToolTip(country_menu, text="选择要爬取的Amazon站点")
# 文件选择按钮
file_frame = ttk.Frame(top_row)
file_frame.pack(side='right', padx=(20, 0))
select_btn = ttk.Button(
file_frame,
text="📂 选择Excel文件",
command=self._load_excel,
style="outline-info",
width=18
)
select_btn.pack()
ToolTip(select_btn, text="选择包含ASIN列表的Excel文件")
# 文件状态标签
self.file_label = ttk.Label(
card_frame,
text="📋 尚未选择文件",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.file_label.pack(anchor='w', pady=(5, 10))
# 分隔线
ttk.Separator(card_frame, style="secondary").pack(fill='x', pady=10)
# 进度条区域
progress_frame = ttk.Frame(card_frame)
progress_frame.pack(fill='x')
self.progress_label = ttk.Label(
progress_frame,
text="准备就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.progress_label.pack(anchor='w')
self.progress_bar = ttk.Progressbar(
progress_frame,
mode='determinate',
style="success-striped",
length=300
)
self.progress_bar.pack(fill='x', pady=(5, 10))
# 操作按钮
button_frame = ttk.Frame(card_frame)
button_frame.pack()
self.start_btn = ttk.Button(
button_frame,
text="🚀 开始爬取",
command=self._start_scraping,
style="success",
width=15
)
self.start_btn.pack(side='left', padx=5)
self.stop_btn = ttk.Button(
button_frame,
text="⏸ 暂停",
command=self._stop_scraping,
style="warning-outline",
width=15,
state='disabled'
)
self.stop_btn.pack(side='left', padx=5)
def _build_stats_card(self, parent: ttk.Frame):
"""构建统计信息卡片"""
stats_frame = ttk.Frame(parent)
stats_frame.pack(fill='x', pady=(0, 15))
# 三列统计
stats_config = [
("total", "📊 总ASIN数", "0", "info"),
("success", "✅ 成功获取", "0", "success"),
("failed", "❌ 获取失败", "0", "danger"),
]
for stat_id, label_text, value, sty in stats_config:
stat_card = ttk.LabelFrame(
stats_frame,
text=f" {label_text} ",
padding=10,
style=sty
)
stat_card.pack(side='left', fill='both', expand=True, padx=2)
value_label = ttk.Label(
stat_card,
text=value,
style='Stats.TLabel'
)
value_label.pack()
self.stats_labels[stat_id] = value_label
def _build_log_area(self, parent: ttk.Frame):
"""构建日志显示区域"""
log_frame = ttk.LabelFrame(
parent,
text=" 📜 运行日志 ",
padding=10,
style="secondary"
)
log_frame.pack(fill='both', expand=True, pady=(0, 10))
# 使用ttkbootstrap的ScrolledText
self.log_text = ScrolledText(
log_frame,
height=12,
autohide=True,
bootstyle="dark"
)
self.log_text.pack(fill='both', expand=True)
# 设置日志文本样式
self.log_text.text.configure(
font=('Consolas', 9),
bg='#1e272e',
fg='#dfe6e9',
insertbackground='#dfe6e9',
selectbackground='#74b9ff',
padx=10,
pady=10
)
# 配置日志标签颜色
self.log_text.text.tag_configure('info', foreground='#74b9ff')
self.log_text.text.tag_configure('success', foreground='#00b894')
self.log_text.text.tag_configure('warning', foreground='#fdcb6e')
self.log_text.text.tag_configure('error', foreground='#e17055')
self.log_text.text.tag_configure('time', foreground='#636e72')
def _build_status_bar(self, parent: ttk.Frame):
"""构建底部状态栏"""
status_frame = ttk.Frame(parent, padding=(0, 5))
status_frame.pack(fill='x')
self.status_label = ttk.Label(
status_frame,
text="就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.status_label.pack(side='left')
# 版本信息
version_label = ttk.Label(
status_frame,
text="v2.0.0",
font=('Segoe UI', 8),
foreground='#636e72'
)
version_label.pack(side='right')
def _log(self, message: str, level: str = 'info'):
"""
在日志区域添加带颜色的日志消息
Args:
message: 日志消息内容
level: 日志级别 (info, success, warning, error)
"""
timestamp = datetime.now().strftime('%H:%M:%S')
# 插入时间戳
self.log_text.text.insert('end', f"[{timestamp}] ", 'time')
# 根据级别设置前缀符号 - 使用简洁ASCII符号
level_prefixes = {
'info': 'i',
'success': '+',
'warning': '!',
'error': 'x'
}
prefix = level_prefixes.get(level, 'i')
# 插入消息 - 简洁格式
self.log_text.text.insert('end', f"{prefix} {message}\n", level)
self.log_text.text.see('end')
self.root.update() self.root.update()
def load_excel(self): # 同时记录到标准日志
file_path = filedialog.askopenfilename(filetypes=[("Excel files", "*.xlsx *.xls")]) logger.info(message)
if file_path:
def _update_stats(self, total: int = 0, success: int = 0, failed: int = 0):
"""
更新统计数据显示
Args:
total: 总ASIN数量
success: 成功获取数量
failed: 失败数量
"""
if total > 0:
self.stats_labels['total'].configure(text=str(total))
if success >= 0:
self.stats_labels['success'].configure(text=str(success))
if failed >= 0:
self.stats_labels['failed'].configure(text=str(failed))
def _update_progress(self, current: int, total: int, message: str = ""):
"""
更新进度条和进度文本
Args:
current: 当前进度
total: 总数
message: 进度消息
"""
if total > 0:
percentage = (current / total) * 100
self.progress_bar['value'] = percentage
self.progress_label.configure(
text=f"进度: {current}/{total} ({percentage:.1f}%) - {message}"
)
else:
self.progress_bar['value'] = 0
self.progress_label.configure(text=message or "准备就绪")
def _set_running_state(self, is_running: bool):
"""
设置运行状态,更新按钮和状态显示
Args:
is_running: 是否正在运行
"""
self.is_running = is_running
if is_running:
self.start_btn.configure(state='disabled')
self.stop_btn.configure(state='normal')
self.status_label.configure(
text="🔄 正在爬取中...",
foreground='#00b894'
)
else:
self.start_btn.configure(state='normal')
self.stop_btn.configure(state='disabled')
self.status_label.configure(
text="💡 准备就绪 - 请选择文件并开始爬取",
foreground='#b2bec3'
)
def _load_excel(self):
"""加载Excel文件并读取ASIN列表"""
file_path = filedialog.askopenfilename(
title="选择包含ASIN的Excel文件",
filetypes=[
("Excel文件", "*.xlsx *.xls"),
("所有文件", "*.*")
]
)
if not file_path:
return
try: try:
df = pd.read_excel(file_path) df = pd.read_excel(file_path)
# 验证ASIN列是否存在
if 'ASIN' not in df.columns:
self._log("Excel文件中未找到'ASIN'列,请检查文件格式", 'error')
return
self.asins = df['ASIN'].dropna().astype(str).tolist() self.asins = df['ASIN'].dropna().astype(str).tolist()
self.file_label.config(text=f"已加载: {file_path}")
self.log(f"成功加载 {len(self.asins)} 个ASIN") # 过滤空值和无效ASIN
self.asins = [asin.strip() for asin in self.asins if asin.strip()]
# 获取文件名用于显示
file_name = file_path.split('/')[-1].split('\\')[-1]
self.file_label.configure(
text=f"📁 已加载: {file_name}",
foreground='#00b894'
)
self._log(f"成功加载文件: {file_name}", 'success')
self._log(f"共读取到 {len(self.asins)} 个有效ASIN", 'info')
# 更新统计
self._update_stats(total=len(self.asins), success=0, failed=0)
self._update_progress(0, len(self.asins), "文件已加载,等待开始爬取")
except Exception as e: except Exception as e:
self.log(f"加载Excel失败: {str(e)}") error_msg = f"加载Excel文件失败: {str(e)}"
self._log(error_msg, 'error')
logger.error(error_msg, exc_info=True)
def init_browser(self): def _init_browser(self):
"""初始化浏览器实例"""
try:
options = ChromiumOptions() options = ChromiumOptions()
options.headless(False).no_imgs(True).mute(True).set_load_mode("none") options.headless(False).no_imgs(True).mute(True).set_load_mode("none")
options.set_argument('--window-size=1024,768') options.set_argument('--window-size=1024,768')
self.page = ChromiumPage(options) self.page = ChromiumPage(options)
self.log("浏览器初始化完成") self._log("浏览器初始化完成", 'success')
except Exception as e:
self._log(f"浏览器初始化失败: {str(e)}", 'error')
raise
@staticmethod @staticmethod
def clean_price(price: str) -> str: def _clean_price(price: str) -> str:
"""
清理价格字符串,移除货币符号
Args:
price: 原始价格字符串
Returns:
清理后的价格数字字符串
"""
currency_symbols = [r'\$', 'C\$', '¥', '£', '€', 'MX\$'] currency_symbols = [r'\$', 'C\$', '¥', '£', '€', 'MX\$']
cleaned = price.strip() cleaned = price.strip()
for symbol in currency_symbols: for symbol in currency_symbols:
cleaned = re.sub(symbol, '', cleaned) cleaned = re.sub(symbol, '', cleaned)
return cleaned.replace(',', '').strip() or "未找到价格" return cleaned.replace(',', '').strip() or "未找到价格"
def fetch_price(self, asin: str, max_retries: int = 3) -> Tuple[str, str]: def _get_country_code(self) -> str:
country = self.country_var.get() """获取当前选中的国家代码"""
selected = self.country_var.get()
return self.COUNTRY_DOMAINS.get(selected, "US")
def _fetch_price(self, asin: str, max_retries: int = 3) -> Tuple[str, str]:
"""
获取单个ASIN的价格
Args:
asin: Amazon ASIN编码
max_retries: 最大重试次数
Returns:
(ASIN, 价格) 元组
"""
# 检查是否已停止或浏览器已关闭
if not self.is_running or not self.page:
return asin, "已停止"
country = self._get_country_code()
host = switch_domain(country).replace("vendorcentral.", "") host = switch_domain(country).replace("vendorcentral.", "")
url = f"{host}dp/{asin}?th=1" url = f"{host}dp/{asin}?th=1"
for attempt in range(max_retries): for attempt in range(max_retries):
# 每次重试前检查状态
if not self.is_running or not self.page:
return asin, "已停止"
try: try:
self.page.get(url) self.page.get(url)
# 检查是否遇到验证码
if not self.page.ele('xpath://form[@action="/errors/validateCaptcha"]', timeout=1): if not self.page.ele('xpath://form[@action="/errors/validateCaptcha"]', timeout=1):
break break
self.log(f"ASIN {asin}: 检测到验证码页面") self._log(f"ASIN {asin}: 检测到验证码页面,尝试跳过...", 'warning')
continue_button = self.page.ele('css:button.a-button-text', timeout=2) continue_button = self.page.ele('css:button.a-button-text', timeout=2)
if continue_button: if continue_button:
self.log(f"ASIN {asin}: 点击 'Continue shopping'")
continue_button.click() continue_button.click()
except Exception as e: except Exception as e:
self.log(f"ASIN {asin}: 尝试 {attempt+1} 失败: {str(e)}") self._log(f"ASIN {asin}: 第{attempt + 1}次尝试失败: {str(e)}", 'warning')
if attempt == max_retries - 1: if attempt == max_retries - 1:
return asin, f"错误: {str(e)}" return asin, f"错误: {str(e)}"
# 最终获取价格前再次检查
if not self.is_running or not self.page:
return asin, "已停止"
try: try:
self.page.wait.ele_displayed('xpath://div[@id="corePrice_feature_div"]', timeout=3) self.page.wait.ele_displayed('xpath://div[@id="corePrice_feature_div"]', timeout=3)
html_content = self.page.html html_content = self.page.html
tree = etree.HTML(html_content) tree = etree.HTML(html_content)
price_whole = tree.xpath('//div[@id="corePrice_feature_div"]//span[@class="a-offscreen"]/text()') price_whole = tree.xpath('//div[@id="corePrice_feature_div"]//span[@class="a-offscreen"]/text()')
return asin, self.clean_price(price_whole[0]) if price_whole else "未找到价格元素" return asin, self._clean_price(price_whole[0]) if price_whole else "未找到价格元素"
except Exception as e: except Exception as e:
return asin, f"错误: {str(e)}" return asin, f"错误: {str(e)}"
def scrape(self): def _scrape(self):
"""执行爬取任务,支持暂停/继续"""
if not self.page: if not self.page:
self.init_browser() # 在开始爬取时初始化浏览器 self._init_browser()
start_time = time.time() start_time = time.time()
results = [] total = len(self.asins)
for asin in self.asins: # 从头开始时重置
if self.current_index == 0:
self.results = []
self.success_count = 0
self.failed_count = 0
self._log(f"开始爬取 {total} 个ASIN...", 'info')
else:
self._log(f"继续爬取 (从第{self.current_index + 1}个)...", 'info')
for index in range(self.current_index, total):
if not self.is_running: if not self.is_running:
break self.current_index = index
result = self.fetch_price(asin) self._log(f"已暂停 ({index}/{total})", 'warning')
results.append(result) self._cleanup_browser()
self.log(f"ASIN {result[0]}: {result[1]}") return # 暂停时不推送
asin = self.asins[index]
# 先更新进度条,显示当前正在处理的ASIN
self._update_progress(index + 1, total, asin)
result = self._fetch_price(asin)
if result[1] == "已停止":
self.current_index = index
self._log(f"已暂停 ({index}/{total})", 'warning')
return
self.results.append(result)
is_success = not result[1].startswith("错误") and result[1] != "未找到价格元素"
if is_success:
self.success_count += 1
self._log(f"[{index + 1}/{total}] {result[0]}: $ {result[1]}", 'success')
else:
self.failed_count += 1
self._log(f"[{index + 1}/{total}] {result[0]}: {result[1]}", 'error')
self._update_stats(success=self.success_count, failed=self.failed_count)
# 全部完成
self.current_index = 0
if self.results:
country = self._get_country_code()
output_file = f"amazon_prices_{country}_{int(time.time())}.csv"
output_file = f"amazon_prices_{self.country_var.get()}_{int(time.time())}.csv"
if results:
with open(output_file, "w", encoding="utf-8", newline="") as f: with open(output_file, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow(["ASIN", "Price"]) writer.writerow(["ASIN", "Price"])
writer.writerows(results) writer.writerows(self.results)
self.log(f"结果已保存至 {output_file}")
self.log(f"总耗时: {time.time() - start_time:.2f}秒") self._log(f"已保存: {output_file}", 'success')
self.is_running = False self._push_to_rabbitmq(output_file, country)
if self.page:
self.page.quit() # 爬取完成后关闭浏览器 elapsed_time = time.time() - start_time
self.page = None self._log(f"完成! 耗时: {elapsed_time:.2f}秒", 'info')
self.log("浏览器已关闭") self._update_progress(total, total, f"完成 {self.success_count}/{self.failed_count}")
self._cleanup_browser()
self._set_running_state(False)
self.results = []
# 读取Excel文件 def _push_to_rabbitmq(self, output_file: str, country: str):
data = pd.read_csv(output_file) """
# 提前转换 Price 列,错误值转为 NaN 将结果推送到RabbitMQ
Args:
output_file: 输出文件路径
country: 国家代码
"""
try:
# 使用utf-8-sig编码读取,处理BOM标记
data = pd.read_csv(output_file, encoding='utf-8-sig')
data['Price'] = pd.to_numeric(data['Price'], errors='coerce') data['Price'] = pd.to_numeric(data['Price'], errors='coerce')
# 过滤掉无效价格(NaN)
valid_data = data.dropna(subset=['Price']) valid_data = data.dropna(subset=['Price'])
self.log("开始推送消息....") if valid_data.empty:
self._log("没有有效的价格数据需要推送", 'warning')
return
self._log(f"开始推送 {len(valid_data)} 条数据到消息队列...", 'info')
from app.helper import rabbitmq from app.helper import rabbitmq
client = rabbitmq.RabbitMQClient() client = rabbitmq.RabbitMQClient()
client.connection() client.connection()
client.connect(queue='price_robot', routing_key='price_robot', exchange='product') client.connect(queue='price_robot', routing_key='price_robot', exchange='product')
country = self.country_var.get()
for _, item_row in valid_data.iterrows(): for _, item_row in valid_data.iterrows():
push_data = { push_data = {
'currency': country, 'currency': country,
...@@ -189,37 +732,73 @@ class AmazonPriceScraper: ...@@ -189,37 +732,73 @@ class AmazonPriceScraper:
client.send_message(push_data) client.send_message(push_data)
client.close() client.close()
self._log("消息推送完成", 'success')
except Exception as e:
self._log(f"推送消息失败: {str(e)}", 'error')
logger.error(f"推送消息失败: {str(e)}", exc_info=True)
self.log("推送消息完成") def _cleanup_browser(self):
"""清理浏览器资源"""
if self.page:
try:
self.page.quit()
self.page = None
self._log("浏览器已关闭", 'info')
except Exception as e:
logger.warning(f"关闭浏览器时发生错误: {str(e)}")
def start_scraping(self): def _start_scraping(self):
"""开始/继续爬取"""
if not self.asins: if not self.asins:
self.log("请先选择包含ASIN的Excel文件") self._log("请先选择Excel文件", 'warning')
return
if self.is_running or (self._scrape_thread and self._scrape_thread.is_alive()):
self._log("任务运行中", 'warning')
return return
self._set_running_state(True)
if self.current_index == 0:
self._update_stats(success=0, failed=0)
self._log("启动...", 'info')
else:
self._log(f"继续 (从第{self.current_index + 1}个)...", 'info')
self._scrape_thread = Thread(target=self._scrape, daemon=True)
self._scrape_thread.start()
def _stop_scraping(self):
"""暂停爬取"""
if not self.is_running: if not self.is_running:
self.is_running = True return
self.log("开始爬取...")
Thread(target=self.scrape).start()
def stop_scraping(self): self._set_running_state(False)
self.is_running = False self._log("暂停中...", 'warning')
self.log("已停止爬取")
if self.page:
self.page.quit() # 停止时关闭浏览器
self.page = None
self.log("浏览器已关闭")
def run(self): def run(self):
"""运行应用程序主循环"""
try: try:
self._log("应用程序已启动", 'info')
self.root.mainloop() self.root.mainloop()
finally: finally:
if self.page: self._cleanup_browser()
self.page.quit() # 确保程序退出时关闭浏览器
if __name__ == "__main__":
def main():
"""应用程序入口点"""
try: try:
load_dotenv() load_dotenv()
app = AmazonPriceScraper() app = ModernAmazonPriceScraper()
app.run() app.run()
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("用户中断程序")
exit(0)
except Exception as e:
logger.error(f"程序异常退出: {str(e)}", exc_info=True)
exit(1) exit(1)
if __name__ == "__main__":
main()
\ No newline at end of file
# coding: utf-8 # coding: utf-8
"""
Amazon 广告费采集工具
"""
import os import os
import threading import threading
import queue import queue
import time
import traceback import traceback
from tkinter import messagebox from datetime import datetime
from DrissionPage import ChromiumPage from DrissionPage import ChromiumPage
import ttkbootstrap as ttk import ttkbootstrap as ttk
from ttkbootstrap.scrolled import ScrolledText
from dotenv import load_dotenv from dotenv import load_dotenv
from ttkbootstrap.constants import *
from app.helper import domain, file from app.helper import domain, file
from app.logger.gui import GuiLog from app.logger.gui import GuiLog
...@@ -17,193 +19,559 @@ from app.vc.advert_cost import AdvertCost ...@@ -17,193 +19,559 @@ from app.vc.advert_cost import AdvertCost
from app.vc.product_sales import ProductSales from app.vc.product_sales import ProductSales
class VCManagerGUI(ttk.Window): class VCManagerGUI:
def __init__(self, logger):
super().__init__(themename="cosmo")
self.domain_login = None # 窗口配置
self.processor = None WINDOW_WIDTH = 580
self.actions = None WINDOW_HEIGHT = 720
self.countries = None
self.run_btn = None
self.pause_btn = None
self.shop_entry = None
self.country_var = None
self.action_var = None
self.log_text = None
self.title("广告费工具")
self.geometry("520x600")
# 设置窗口属性禁止缩放 # 国家配置
self.resizable(False, False) COUNTRY_OPTIONS = [
("全部", "all"),
("美国 (US)", "US"),
("日本 (JP)", "JP"),
("英国 (UK)", "UK"),
("法国 (FR)", "FR"),
("德国 (DE)", "DE"),
("加拿大 (CA)", "CA"),
]
# 初始化状态变量 # 功能配置
ACTION_OPTIONS = [
("全部功能", "all"),
("广告费用", "advert_cost"),
("商品销量", "product_sales"),
]
def __init__(self, logger):
"""初始化应用程序"""
# 状态变量
self.domain_login = None
self.processor = None
self.page = None self.page = None
self.running = False self.running = False
self.paused = False self.paused = False
self.log_queue = queue.Queue() self.log_queue = queue.Queue()
self.payee_code = None self.payee_code = None
# 设置日志处理 # GUI 组件引用
self.log_text = None
self.country_var = None
self.action_var = None
self.progress_bar = None
self.progress_label = None
self.status_label = None
self.start_btn = None
self.pause_btn = None
self.stats_labels = {}
# 设置日志
logger.set_console(self.log_queue) logger.set_console(self.log_queue)
self.logger = logger self.logger = logger
# 设置窗口尺寸并居中 # 初始化主窗口 - 使用深色主题
self._center_window() self.root = ttk.Window(themename="darkly")
self.root.title("🚀 VC Manager Pro")
self.root.resizable(False, False)
# 设置自定义样式
self._setup_custom_styles()
# 创建界面组件 # 构建 GUI
self.create_widgets() self._build_gui()
# 启动日志处理循环 # 窗口居中
self.after(100, self.process_log_queue) # type: ignore self._center_window()
# 启动日志处理
self.root.after(100, self._process_log_queue)
@staticmethod
def _setup_custom_styles():
"""设置自定义样式"""
style = ttk.Style()
# 标题样式
style.configure(
'Title.TLabel',
font=('Segoe UI', 24, 'bold'),
foreground='#ffffff'
)
# 副标题样式
style.configure(
'Subtitle.TLabel',
font=('Segoe UI', 10),
foreground='#b2bec3'
)
# 区域标题样式
style.configure(
'SectionTitle.TLabel',
font=('Segoe UI', 11, 'bold'),
foreground='#74b9ff'
)
# 统计数字样式
style.configure(
'Stats.TLabel',
font=('Segoe UI', 14, 'bold'),
foreground='#00cec9'
)
def _center_window(self): def _center_window(self):
"""设置窗口居中""" """窗口居中"""
window_width = 540 screen_width = self.root.winfo_screenwidth()
window_height = 600 screen_height = self.root.winfo_screenheight()
# 获取屏幕尺寸 x = int((screen_width - self.WINDOW_WIDTH) / 2)
screen_width = self.winfo_screenwidth() y = int((screen_height - self.WINDOW_HEIGHT) / 3)
screen_height = self.winfo_screenheight()
self.root.geometry(f"{self.WINDOW_WIDTH}x{self.WINDOW_HEIGHT}+{x}+{y}")
# 计算居中坐标
x = (screen_width - window_width) // 2 def _build_gui(self):
y = (screen_height - window_height) // 2 """构建完整 GUI 界面"""
# 主容器
# 设置窗口位置 main_container = ttk.Frame(self.root, padding=15)
self.geometry(f"{window_width}x{window_height}+{x}+{y}") main_container.pack(fill='both', expand=True)
def create_widgets(self): # 1. 标题区域
"""创建主界面布局""" self._build_header(main_container)
main_frame = ttk.Frame(self)
main_frame.pack(fill='both', expand=True, padx=15, pady=15) # 2. 配置卡片
self._build_config_card(main_container)
# 配置区域
config_frame = ttk.Labelframe(main_frame, text="配置参数", padding=10) # 3. 统计卡片
config_frame.pack(fill='x', pady=10) self._build_stats_card(main_container)
# 国家选择 # 4. 日志区域
country_frame = ttk.Frame(config_frame) self._build_log_area(main_container)
country_frame.grid(row=0, column=0, sticky=W, pady=5)
ttk.Label(country_frame, text="国家:", width=6).pack(side='left') # 5. 底部状态栏
self._build_status_bar(main_container)
@staticmethod
def _build_header(parent):
"""构建标题区域"""
header_frame = ttk.Frame(parent)
header_frame.pack(fill='x', pady=(0, 10))
# 主标题
title_label = ttk.Label(
header_frame,
text="🚀 VC Manager",
style='Title.TLabel'
)
title_label.pack()
# 副标题
subtitle_label = ttk.Label(
header_frame,
text="Amazon Vendor Central 数据采集工具,支持多站点批量处理",
style='Subtitle.TLabel'
)
subtitle_label.pack(pady=(3, 0))
def _build_config_card(self, parent):
"""构建配置卡片区域"""
# 卡片容器
card_frame = ttk.LabelFrame(
parent,
text=" ⚙️ 采集配置 ",
padding=10,
style="info"
)
card_frame.pack(fill='x', pady=(0, 10))
# === 国家选择区 ===
self.country_var = ttk.StringVar(value="all") self.country_var = ttk.StringVar(value="all")
self.countries = [
("全部", "all"), # 使用 grid 布局让按钮整齐排列
("美国", "US"), country_grid = ttk.Frame(card_frame)
("日本", "JP"), country_grid.pack(fill='x', pady=(0, 5))
("英国", "UK"),
("法国", "FR"), for i, (name, code) in enumerate(self.COUNTRY_OPTIONS):
("德国", "DE"), row = i // 4
("加拿大", "CA"), col = i % 4
] btn = ttk.Radiobutton(
for i, (text, value) in enumerate(self.countries): country_grid,
rb = ttk.Radiobutton(country_frame, text=text, variable=self.country_var, value=value) text=name,
rb.pack(side='left', padx=(10 if i != 0 else 0)) value=code,
variable=self.country_var,
# 功能选择 style="info-toolbutton",
action_frame = ttk.Frame(config_frame) width=10
action_frame.grid(row=1, column=0, sticky=W, pady=5) )
ttk.Label(action_frame, text="功能:", width=6).pack(side='left') btn.grid(row=row, column=col, padx=2, pady=2, sticky='ew')
# 让列等宽
for col in range(4):
country_grid.columnconfigure(col, weight=1)
# === 功能选择区 ===
self.action_var = ttk.StringVar(value="all") self.action_var = ttk.StringVar(value="all")
self.actions = [
("全部", "all"), action_grid = ttk.Frame(card_frame)
("广告费", "advert_cost"), action_grid.pack(fill='x', pady=(0, 5))
("商品销量", "product_sales")
for i, (name, code) in enumerate(self.ACTION_OPTIONS):
btn = ttk.Radiobutton(
action_grid,
text=name,
value=code,
variable=self.action_var,
style="success-toolbutton",
width=12
)
btn.grid(row=0, column=i, padx=2, pady=2, sticky='ew')
# 让列等宽
for col in range(len(self.ACTION_OPTIONS)):
action_grid.columnconfigure(col, weight=1)
# 分隔线
ttk.Separator(card_frame, style="secondary").pack(fill='x', pady=10)
# === 进度区域 ===
progress_frame = ttk.Frame(card_frame)
progress_frame.pack(fill='x')
self.progress_label = ttk.Label(
progress_frame,
text="准备就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.progress_label.pack(anchor='w')
self.progress_bar = ttk.Progressbar(
progress_frame,
mode='indeterminate',
style="success-striped",
length=300
)
self.progress_bar.pack(fill='x', pady=(5, 10))
# === 操作按钮 ===
button_frame = ttk.Frame(card_frame)
button_frame.pack()
self.start_btn = ttk.Button(
button_frame,
text="🚀 开始采集",
command=self._start_process,
style="success",
width=14
)
self.start_btn.pack(side='left', padx=5)
self.pause_btn = ttk.Button(
button_frame,
text="⏸ 暂停",
command=self._toggle_pause,
style="warning-outline",
width=14,
state='disabled'
)
self.pause_btn.pack(side='left', padx=5)
def _build_stats_card(self, parent):
"""构建统计信息卡片"""
stats_frame = ttk.Frame(parent)
stats_frame.pack(fill='x', pady=(0, 5))
# 三列统计 - 紧凑布局
stats_config = [
("countries", "处理国家", "0", "info"),
("actions", "执行功能", "0", "success"),
("errors", "错误数量", "0", "danger"),
] ]
for i, (text, value) in enumerate(self.actions):
rb = ttk.Radiobutton(action_frame, text=text, variable=self.action_var, value=value) for stat_id, label_text, value, sty in stats_config:
rb.pack(side='left', padx=(10 if i != 0 else 0)) stat_card = ttk.LabelFrame(
stats_frame,
# 控制按钮 text=f" {label_text} ",
btn_frame = ttk.Frame(main_frame) padding=(10, 2),
btn_frame.pack(fill='x', pady=15) style=sty
self.pause_btn = ttk.Button(btn_frame, text="暂停", command=self.toggle_pause, )
style='warning.TButton', width=12, state=ttk.DISABLED) stat_card.pack(side='left', fill='x', expand=True, padx=3)
self.pause_btn.pack(side='right', padx=5)
self.run_btn = ttk.Button(btn_frame, text="开始执行", command=self.start_process, value_label = ttk.Label(
style='success.TButton', width=12) stat_card,
self.run_btn.pack(side='right', padx=5) text=value,
style='Stats.TLabel'
# 日志显示 )
log_frame = ttk.Labelframe(main_frame, text="操作日志", padding=10) value_label.pack()
log_frame.pack(fill='both', expand=True)
self.stats_labels[stat_id] = value_label
self.log_text = ttk.ScrolledText(log_frame, state=DISABLED)
def _build_log_area(self, parent):
"""构建日志显示区域"""
log_frame = ttk.LabelFrame(
parent,
text=" 运行日志 ",
padding=8,
style="secondary"
)
log_frame.pack(fill='both', expand=True, pady=(0, 5))
# 使用 ScrolledText
self.log_text = ScrolledText(
log_frame,
height=18,
autohide=True,
bootstyle="dark"
)
self.log_text.pack(fill='both', expand=True) self.log_text.pack(fill='both', expand=True)
def start_process(self): # 设置日志文本样式
"""启动处理线程""" self.log_text.text.configure(
font=('Consolas', 9),
bg='#1e272e',
fg='#dfe6e9',
insertbackground='#dfe6e9',
selectbackground='#74b9ff',
padx=10,
pady=10
)
# 配置日志标签颜色
self.log_text.text.tag_configure('info', foreground='#74b9ff')
self.log_text.text.tag_configure('success', foreground='#00b894')
self.log_text.text.tag_configure('warning', foreground='#fdcb6e')
self.log_text.text.tag_configure('error', foreground='#e17055')
self.log_text.text.tag_configure('time', foreground='#636e72')
def _build_status_bar(self, parent):
"""构建底部状态栏"""
status_frame = ttk.Frame(parent, padding=(0, 5))
status_frame.pack(fill='x')
self.status_label = ttk.Label(
status_frame,
text="就绪",
font=('Segoe UI', 9),
foreground='#b2bec3'
)
self.status_label.pack(side='left')
# 版本信息
version_label = ttk.Label(
status_frame,
text="v2.0.0",
font=('Segoe UI', 8),
foreground='#636e72'
)
version_label.pack(side='right')
# ==================== 日志方法 ====================
def _log(self, message: str, level: str = 'info'):
"""添加日志消息"""
timestamp = datetime.now().strftime('%H:%M:%S')
# 插入时间戳
self.log_text.text.insert('end', f"[{timestamp}] ", 'time')
# 级别前缀
level_prefixes = {
'info': 'i',
'success': '+',
'warning': '!',
'error': 'x'
}
prefix = level_prefixes.get(level, 'i')
# 插入消息
self.log_text.text.insert('end', f"{prefix} {message}\n", level)
self.log_text.text.see('end')
self.root.update()
def _update_stats(self, countries: int = None, actions: int = None, errors: int = None):
"""更新统计数据"""
if countries is not None:
self.stats_labels['countries'].configure(text=str(countries))
if actions is not None:
self.stats_labels['actions'].configure(text=str(actions))
if errors is not None:
self.stats_labels['errors'].configure(text=str(errors))
def _set_running_state(self, is_running: bool):
"""设置运行状态"""
self.running = is_running
if is_running:
self.start_btn.configure(state='disabled', text="⏳ 执行中...")
self.pause_btn.configure(state='normal')
self.status_label.configure(
text="🔄 正在采集中...",
foreground='#00b894'
)
self.progress_bar.start(15)
else:
self.start_btn.configure(state='normal', text="🚀 开始采集")
self.pause_btn.configure(state='disabled', text="⏸ 暂停")
self.status_label.configure(
text="就绪",
foreground='#b2bec3'
)
self.progress_bar.stop()
# ==================== 业务逻辑 ====================
def _get_selected_country(self):
"""获取选中的国家代码"""
return self.country_var.get()
def _get_selected_action(self):
"""获取选中的功能代码"""
return self.action_var.get()
def _start_process(self):
"""开始处理"""
if self.running: if self.running:
messagebox.showwarning("警告", "已有任务正在运行") self._log("任务已在运行中", 'warning')
return
if not self.action_var.get():
messagebox.showerror("输入错误", "请选择要执行的功能")
return return
self.running = True self._set_running_state(True)
self.paused = False self.paused = False
self.run_btn.config(text="执行中...", style='info.TButton', state=ttk.DISABLED) self._update_stats(countries=0, actions=0, errors=0)
self.pause_btn.config(state=ttk.NORMAL) self._log("启动采集任务...", 'info')
threading.Thread(target=self.run_process, daemon=True).start()
def toggle_pause(self): threading.Thread(target=self._run_process, daemon=True).start()
"""切换暂停/继续状态"""
def _toggle_pause(self):
"""暂停/继续"""
self.paused = not self.paused self.paused = not self.paused
if self.paused: if self.paused:
self.pause_btn.config(text="继续", style='success.TButton') self.pause_btn.configure(text="▶ 继续", bootstyle="success")
self.logger.info("已暂停") self.progress_bar.stop()
self.status_label.configure(text="⏸ 已暂停", foreground='#fdcb6e')
self._log("已暂停", 'warning')
if self.processor: if self.processor:
self.processor.set_running(False) self.processor.set_running(False)
if self.domain_login: if self.domain_login:
self.domain_login.set_status(False) self.domain_login.set_status(False)
else: else:
self.pause_btn.config(text="暂停", style='warning.TButton') self.pause_btn.configure(text="⏸ 暂停", bootstyle="warning-outline")
self.logger.info("已继续") self.progress_bar.start(15)
self.status_label.configure(text="🔄 正在采集中...", foreground='#00b894')
self._log("已继续", 'info')
if self.processor: if self.processor:
self.processor.set_running(True) self.processor.set_running(True)
if self.domain_login: if self.domain_login:
self.domain_login.set_status(True) self.domain_login.set_status(True)
def run_process(self): def _run_process(self):
"""后台处理流程""" """后台处理流程"""
import time
error_count = 0
country_count = 0
action_count = 0
try: try:
self.logger.info("初始化浏览器引擎...") self._log("初始化浏览器引擎...", 'info')
self.init_browser() self._init_browser()
country = self.country_var.get()
action = self.action_var.get() country_code = self._get_selected_country()
action_code = self._get_selected_action()
# 处理国家选择 # 处理国家选择
countries = self.countries if country == "all" else [(None, country)] if country_code == "all":
countries = [(text, value) for text, value in countries if value != "all"] countries = [(name, code) for name, code in self.COUNTRY_OPTIONS if code != "all"]
else:
countries = [(None, country_code)]
# 登录所有选择的国家
for name, code in countries:
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
# 先登录选择的国家 if not self.running:
for _, country_value in countries: self._log("任务已停止", 'warning')
self.domain_login = domain.LoginDomain(self.logger, self.page, country_value) return
display_name = name.split(" ")[0] if name else code
self._log(f"登录 {display_name}...", 'info')
self.domain_login = domain.LoginDomain(self.logger, self.page, code)
self.domain_login.set_status(True) self.domain_login.set_status(True)
self.domain_login.login_check() self.domain_login.login_check()
for _, country_value in countries: # 执行任务
for name, code in countries:
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
if not self.running:
self._log("任务已停止", 'warning')
return
country_count += 1
self._update_stats(countries=country_count)
display_name = name.split(" ")[0] if name else code
# 处理动作选择 # 处理动作选择
actions = self.actions if action == "all" else [(None, action)] if action_code == "all":
actions = [(text, value) for text, value in actions if value != "all"] actions = [(n, c) for n, c in self.ACTION_OPTIONS if c != "all"]
for _, action_value in actions: else:
self.logger.info(f"开始执行 {action_value} 操作于国家 {country_value}...") actions = [(None, action_code)]
self.processor = self._get_processor(action_value, country_value)
for action_name, action_val in actions:
# 检查暂停状态
while self.paused:
time.sleep(0.5)
if not self.running:
return
if not self.running:
self._log("任务已停止", 'warning')
return
action_count += 1
self._update_stats(actions=action_count)
display_action = action_name or action_val
self._log(f"[{display_name}] 执行 {display_action}...", 'info')
try:
self.processor = self._get_processor(action_val, code)
if self.processor: if self.processor:
self.processor.set_running(True) self.processor.set_running(True)
self.processor.run("") self.processor.run("")
self.processor.push_data_queue() self.processor.push_data_queue()
self._log(f"[{display_name}] {display_action} 完成", 'success')
except Exception as e:
error_count += 1
self._update_stats(errors=error_count)
self._log(f"[{display_name}] {display_action} 失败: {e}", 'error')
self._log("全部任务执行完成!", 'success')
self.root.after(0, lambda: self.status_label.configure(
text="✅ 任务完成",
foreground='#00b894'
))
self.logger.info("操作成功完成!")
except Exception as e: except Exception as e:
self.logger.error(f"发生错误:{str(e)}") error_count += 1
self.logger.error(traceback.format_exc()) self._update_stats(errors=error_count)
self._log(f"发生错误: {e}", 'error')
self._log(traceback.format_exc(), 'error')
self.root.after(0, lambda: self.status_label.configure(
text="❌ 任务失败",
foreground='#e17055'
))
finally: finally:
self.cleanup_resources() self._cleanup_resources()
def _get_processor(self, action, country): def _get_processor(self, action, country):
"""根据动作类型返回处理器实例""" """获取处理器实例"""
processors = { processors = {
"advert_cost": AdvertCost, "advert_cost": AdvertCost,
"product_sales": ProductSales "product_sales": ProductSales
...@@ -213,60 +581,62 @@ class VCManagerGUI(ttk.Window): ...@@ -213,60 +581,62 @@ class VCManagerGUI(ttk.Window):
return processor_class(self.logger, self.page, str(country)) return processor_class(self.logger, self.page, str(country))
return None return None
def init_browser(self): def _init_browser(self):
"""初始化浏览器配置""" """初始化浏览器"""
self.page = ChromiumPage() self.page = ChromiumPage()
self.page.set.load_mode.normal() self.page.set.load_mode.normal()
self.page.set.when_download_file_exists('overwrite') self.page.set.when_download_file_exists('overwrite')
download_path = os.path.join(os.getcwd()) self.page.set.download_path(os.getcwd())
self.page.set.download_path(download_path)
file.make_dir('products') file.make_dir('products')
self._log("浏览器初始化完成", 'success')
def log(self, message): def _cleanup_resources(self):
"""记录日志到队列"""
self.logger.info(message)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_queue.put(f"[{timestamp}] {message}")
def process_log_queue(self):
"""处理日志队列"""
while not self.log_queue.empty():
msg = self.log_queue.get()
self.log_text.configure(state=ttk.NORMAL)
self.log_text.insert(ttk.END, msg + "\n")
self.log_text.configure(state=ttk.DISABLED)
self.log_text.see(ttk.END)
self.after(200, self.process_log_queue) # type: ignore
def clear_log(self):
"""清除日志"""
self.log_text.configure(state=ttk.NORMAL)
self.log_text.delete(1.0, ttk.END)
self.log_text.configure(state=ttk.DISABLED)
def cleanup_resources(self):
"""清理资源""" """清理资源"""
if self.page: if self.page:
try: try:
self.page.close() self.page.close()
self.logger.info("浏览器资源已释放") self._log("浏览器已关闭", 'info')
except Exception as e: except Exception as e:
self.logger.info(f"释放浏览器资源时出错:{str(e)}") self._log(f"关闭浏览器出错: {e}", 'warning')
finally: finally:
self.page = None self.page = None
self.running = False self.running = False
self.paused = False self.paused = False
self.run_btn.configure(text="开始执行", style='success.TButton') self.root.after(0, lambda: self._set_running_state(False))
self.run_btn['state'] = ttk.NORMAL
self.pause_btn.configure(text="暂停", style='warning.TButton') def _process_log_queue(self):
self.pause_btn['state'] = ttk.DISABLED """处理日志队列"""
while not self.log_queue.empty():
msg = self.log_queue.get()
# 判断日志级别
level = 'info'
if '错误' in msg or 'Error' in msg or '失败' in msg:
level = 'error'
elif '成功' in msg or '完成' in msg:
level = 'success'
elif '警告' in msg or '暂停' in msg:
level = 'warning'
self._log(msg, level)
self.root.after(150, self._process_log_queue)
def run(self):
"""运行应用程序"""
try:
self._log("应用程序已启动", 'info')
self.root.mainloop()
finally:
self._cleanup_resources()
if __name__ == "__main__": if __name__ == "__main__":
try: try:
load_dotenv() load_dotenv()
log = GuiLog() log = GuiLog()
app = VCManagerGUI(log) app = VCManagerGUI(log)
app.mainloop() app.run()
except KeyboardInterrupt: except KeyboardInterrupt:
exit(1) exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment