Commit 84df1637 authored by 邱阿朋's avatar 邱阿朋

feat(gui): 添加 Amazon 价格爬取工具

- 新增 price_gui.py 文件,实现了一个 Amazon 价格爬取的 GUI 工具- 可选择国家域名,加载 ASIN Excel 文件,批量爬取 Amazon 产品价格
- 优化了浏览器初始化和验证码处理逻辑- 添加了日志记录和结果显示功能
- 移除了未使用的 test.py 文件
- 更新了 build.bat 脚本,增加了新工具的打包命令
parent 92b3eed8
...@@ -15,12 +15,15 @@ REM 安装依赖 ...@@ -15,12 +15,15 @@ REM 安装依赖
uv pip install -i https://mirrors.cloud.tencent.com/pypi/simple -r requirements.txt uv pip install -i https://mirrors.cloud.tencent.com/pypi/simple -r requirements.txt
REM 使用版本号生成 exe 文件 REM 使用版本号生成 exe 文件
REM pyinstaller -F -n tool_cmd_%VERSION%.exe tool_cmd.py REM pyinstaller -F -n tool_cmd_%VERSION%.exe tool_cmd.py
pyinstaller -F -n tool_gui_%VERSION%.exe --noconsole tool_gui.py
REM pyinstaller -F -n easy_gui.exe --noconsole easy_gui.py REM pyinstaller -F -n easy_gui.exe --noconsole easy_gui.py
pyinstaller -F -n tools_gui_%VERSION%.exe --noconsole tool_gui.py
pyinstaller -F -n super_gui_%VERSION%.exe --noconsole super_gui.py pyinstaller -F -n super_gui_%VERSION%.exe --noconsole super_gui.py
pyinstaller -F -n price_gui_%VERSION%.exe --noconsole gui.py
REM 删除生成的 .spec 文件 REM 删除生成的 .spec 文件
del *.spec del *.spec
echo build success echo build success
\ No newline at end of file
import csv
import re
import time
from typing import Tuple
import pandas as pd
from DrissionPage import ChromiumPage, ChromiumOptions
from lxml import etree
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from tkinter import filedialog, scrolledtext
from threading import Thread
from app.helper.domain import switch_domain
class AmazonPriceScraper:
def __init__(self):
self.log_text = None
self.file_label = None
self.country_var = None
self.asins = []
self.page = None # 浏览器延迟初始化
self.is_running = False
# GUI 初始化
self.root = ttk.Window(themename="flatly")
self.root.title("Amazon价格爬取工具")
self.root.geometry("500x600")
self.root.resizable(False, False)
# 国家域名映射
self.domains = {
"美国": "US", "英国": "UK", "日本": "JP",
"法国": "FR", "德国": "DE", "加拿大": "CA"
}
self.setup_gui()
# 设置窗口尺寸并居中
self._center_window()
def _center_window(self):
"""设置窗口居中"""
window_width = 500
window_height = 600
# 获取屏幕尺寸
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
# 计算居中坐标
x = int((screen_width - window_width) / 1.1)
y = int((screen_height - window_height) / 3)
# 设置窗口位置
self.root.geometry(f"{window_width}x{window_height}+{x}+{y}")
def setup_gui(self):
ttk.Label(self.root, text="选择国家:").pack(pady=5)
self.country_var = ttk.StringVar(value="美国")
ttk.OptionMenu(self.root, self.country_var, "美国", *self.domains.values()).pack(pady=5)
ttk.Button(self.root, text="选择ASIN Excel文件", command=self.load_excel).pack(pady=5)
self.file_label = ttk.Label(self.root, text="未选择文件")
self.file_label.pack(pady=5)
self.log_text = scrolledtext.ScrolledText(self.root, height=20, width=80)
self.log_text.pack(pady=10, padx=10, fill=BOTH, expand=True)
button_frame = ttk.Frame(self.root)
button_frame.pack(pady=5)
ttk.Button(button_frame, text="开始爬取", bootstyle=SUCCESS, command=self.start_scraping).pack(side=LEFT, padx=5)
ttk.Button(button_frame, text="停止", bootstyle=DANGER, command=self.stop_scraping).pack(side=LEFT, padx=5)
def log(self, message):
self.log_text.insert(END, f"{time.strftime('%Y-%m-%d %H:%M:%S')}: {message}\n")
self.log_text.see(END)
self.root.update()
def load_excel(self):
file_path = filedialog.askopenfilename(filetypes=[("Excel files", "*.xlsx *.xls")])
if file_path:
try:
df = pd.read_excel(file_path)
self.asins = df['ASIN'].dropna().astype(str).tolist()
self.file_label.config(text=f"已加载: {file_path}")
self.log(f"成功加载 {len(self.asins)} 个ASIN")
except Exception as e:
self.log(f"加载Excel失败: {str(e)}")
def init_browser(self):
options = ChromiumOptions()
options.headless(False).no_imgs(True).mute(True).set_load_mode("none")
options.set_argument('--window-size=1024,768')
self.page = ChromiumPage(options)
self.log("浏览器初始化完成")
@staticmethod
def clean_price(price: str) -> str:
currency_symbols = [r'\$', 'C\$', '¥', '£', '€', 'MX\$']
cleaned = price.strip()
for symbol in currency_symbols:
cleaned = re.sub(symbol, '', cleaned)
return cleaned.replace(',', '').strip() or "未找到价格"
def fetch_price(self, asin: str, max_retries: int = 3) -> Tuple[str, str]:
country = self.country_var.get()
host = switch_domain(country).replace("vendorcentral.", "")
url = f"{host}dp/{asin}?th=1"
for attempt in range(max_retries):
try:
self.page.get(url)
if not self.page.ele('xpath://form[@action="/errors/validateCaptcha"]', timeout=1):
break
self.log(f"ASIN {asin}: 检测到验证码页面")
continue_button = self.page.ele('css:button.a-button-text', timeout=2)
if continue_button:
self.log(f"ASIN {asin}: 点击 'Continue shopping'")
continue_button.click()
except Exception as e:
self.log(f"ASIN {asin}: 尝试 {attempt+1} 失败: {str(e)}")
if attempt == max_retries - 1:
return asin, f"错误: {str(e)}"
try:
self.page.wait.ele_displayed('xpath://div[@id="corePrice_feature_div"]', timeout=3)
html_content = self.page.html
tree = etree.HTML(html_content)
price_whole = tree.xpath('//div[@id="corePrice_feature_div"]//span[@class="a-offscreen"]/text()')
return asin, self.clean_price(price_whole[0]) if price_whole else "未找到价格元素"
except Exception as e:
return asin, f"错误: {str(e)}"
def scrape(self):
if not self.page:
self.init_browser() # 在开始爬取时初始化浏览器
start_time = time.time()
results = []
for asin in self.asins:
if not self.is_running:
break
result = self.fetch_price(asin)
results.append(result)
self.log(f"ASIN {result[0]}: {result[1]}")
if results:
output_file = f"amazon_prices_{self.country_var.get()}_{int(time.time())}.csv"
with open(output_file, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["ASIN", "Price"])
writer.writerows(results)
self.log(f"结果已保存至 {output_file}")
self.log(f"总耗时: {time.time() - start_time:.2f}秒")
self.is_running = False
if self.page:
self.page.quit() # 爬取完成后关闭浏览器
self.page = None
self.log("浏览器已关闭")
def start_scraping(self):
if not self.asins:
self.log("请先选择包含ASIN的Excel文件")
return
if not self.is_running:
self.is_running = True
self.log("开始爬取...")
Thread(target=self.scrape).start()
def stop_scraping(self):
self.is_running = False
self.log("已停止爬取")
if self.page:
self.page.quit() # 停止时关闭浏览器
self.page = None
self.log("浏览器已关闭")
def run(self):
try:
self.root.mainloop()
finally:
if self.page:
self.page.quit() # 确保程序退出时关闭浏览器
if __name__ == "__main__":
app = AmazonPriceScraper()
app.run()
\ No newline at end of file
import pandas as pd
from DrissionPage import ChromiumPage
from app.vc.spa import Spa
from app.helper.logger import ConsoleLog
def calculate_totals(file_path):
# 读取Excel文件
xls = pd.ExcelFile(file_path)
# 初始化总金额和总行数
total_amount = 0
total_rows = 0
# 处理Sheet1的Original balance
sheet1_df = pd.read_excel(xls, 'Sheet1')
# 清洗Original balance列,去除$和千位分隔符
sheet1_df['Original balance'] = sheet1_df['Original balance'].astype(str).str.replace(r'[\$,]', '', regex=True)
# 转换为数值型,处理非数值数据
sheet1_df['Original balance'] = pd.to_numeric(sheet1_df['Original balance'], errors='coerce')
# 计算总和,忽略NaN
sheet1_total = sheet1_df['Original balance'].sum(skipna=True)
print(f"Sheet1 的总金额: {sheet1_total}")
total_amount += sheet1_total
# 根据Invoice ID去重,保留第一条记录
unique_invoices = sheet1_df['Invoice ID'].drop_duplicates()
# 计算去重后的行数
unique_count = len(unique_invoices)
total_rows += unique_count
# 处理其他sheet的Rebate In Agreement Currency或Vendor Funding In Agreement Currency
for sheet_name in xls.sheet_names:
if sheet_name == 'Sheet1':
continue
try:
df = pd.read_excel(xls, sheet_name)
# 优先检查Rebate In Agreement Currency
target_column = None
if 'Rebate In Agreement Currency' in df.columns:
target_column = 'Rebate In Agreement Currency'
elif 'Vendor Funding In Agreement Currency' in df.columns:
target_column = 'Vendor Funding In Agreement Currency'
if target_column:
# 转换为数值型,处理非数值数据
df[target_column] = pd.to_numeric(df[target_column], errors='coerce')
rebate_total = df[target_column].sum(skipna=True)
total_amount += rebate_total
total_rows += 1
else:
print(
f"{sheet_name}中既缺少'Rebate In Agreement Currency'列,也缺少'Vendor Funding In Agreement Currency'列")
except Exception as e:
print(f"处理{sheet_name}时发生错误: {e}")
# 输出最终结果
print("\n最终结果:")
print(f"所有sheet的总金额: {total_amount}")
print(f"所有sheet的总行数: {total_rows}")
return total_amount, total_rows
def calculate_spa():
# 使用示例
file_path = '2025-07-23-14-41_US_spa.xlsx' # 替换为你的Excel文件路径
total_amount, total_rows = calculate_totals(file_path)
def sap():
logger = ConsoleLog()
page = ChromiumPage()
spa = Spa(logger, page, "UK", "UK-VC")
spa.result_file_name = "2025-07-21-15-29_UK_spa.xlsx"
spa.push_data_queue()
calculate_spa()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment