Commit aadfac84 authored by 邱阿朋's avatar 邱阿朋

refactor(app): 重构登录模块并添加日志记录

- 修改 domain_page 函数,增加 logger 参数用于日志记录
-增加图形验证码验证逻辑
- 调整页面加载等待时间
- 更新 gui.py 和 spa.py 中的相关调用
parent d097d515
......@@ -23,11 +23,11 @@ def switch_domain(country):
return domain
def domain_page(page,country):
def domain_page(logger,page, country):
url = switch_domain(country)
page.get(url)
while True:
time.sleep(1)
time.sleep(3)
title = page.title
valid_titles = [
"Amazon Sign-In",
......@@ -37,12 +37,16 @@ def domain_page(page,country):
"Amazon Iniciar sesión"
]
if title in valid_titles:
print("请登录账号")
logger.info("请登录账号")
continue
if title in ["Authentication required"]:
logger.info("请输入图形验证码")
continue
tree = etree.HTML(page.html)
element = tree.xpath('//*[@id="cvf-submit-otp-button-announce"]')
if element:
print('请输入验证码')
logger.info('请输入验证码')
else:
break
\ No newline at end of file
break
# coding: utf-8
\ No newline at end of file
# coding: utf-8
import logging
from app.logger.logger import Logger
class ColoredFormatter(logging.Formatter):
def format(self, record):
# 为不同的日志级别定义一些颜色
colors = {
'DEBUG': '\033[94m', # blue
'INFO': '\033[92m', # green
'WARNING': '\033[93m', # yellow
'ERROR': '\033[91m', # red
'CRITICAL': '\033[95m' # magenta
}
# 从记录中获取原始消息
message = super().format(record)
# 如果日志级别定义了颜色,则添加颜色代码
if record.levelname in colors:
color_code = colors[record.levelname]
message = f"{color_code}{message}\033[0m"
return message
class ConsoleLog(Logger):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ConsoleLog, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
# 使用彩色格式化程序创建控制台处理程序
console_handler = logging.StreamHandler()
console_handler.setFormatter(ColoredFormatter('%(asctime)s %(message)s'))
# 将控制台处理程序添加到记录器
self.logger.addHandler(console_handler)
def info(self, arg):
self.logger.info(arg)
def debug(self, arg):
self.logger.debug(arg)
def warning(self, arg):
self.logger.warning(arg)
def error(self, arg):
self.logger.error(arg)
\ No newline at end of file
# coding: utf-8
import queue
import time
from app.logger.logger import Logger
class GuiLog(Logger):
_instance = None
_log_queue = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(GuiLog, cls).__new__(cls)
# 全局队列,用于传递日志消息
cls._queue = queue.Queue()
return cls._instance
def set_console(self, log_queue):
self._log_queue = log_queue
def info(self, arg):
self.__message("INFO", arg)
def debug(self, arg):
self.__message("DEBUG", arg)
def warning(self, arg):
self.__message("WARN", arg)
def error(self, arg):
self.__message("ERROR", arg)
def __message(self, t, message):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self._log_queue.put(f"[{t}] [{timestamp}] {message}")
# coding: utf-8
from abc import ABC,abstractmethod
class Logger(ABC):
@abstractmethod
def info(self, arg):
pass
@abstractmethod
def debug(self, arg):
pass
@abstractmethod
def warning(self, arg):
pass
@abstractmethod
def error(self, arg):
pass
\ No newline at end of file
......@@ -8,17 +8,19 @@ from datetime import datetime
from decimal import Decimal
import pandas as pd
from DrissionPage import ChromiumPage as Page
from DrissionPage.errors import ElementNotFoundError
from lxml import etree
from app.helper import domain, excel, file
from app.vc import log, rdb, rabbit
from app.logger.logger import Logger
from app.vc import rdb, rabbit
from app.vc.interface import AutoInterface
from DrissionPage import ChromiumPage as Page
class Payment(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
def __init__(self, logger: Logger, page: Page, country: str, payee_code: str, shop_code: str):
self.logger = logger
self.page = page
self.country = country
self.payeeCode = payee_code
......@@ -105,7 +107,7 @@ class Payment(AutoInterface):
except ElementNotFoundError:
if count == 3: return None
count += 1
log.warning("导出按钮不存在刷新网页")
self.logger.warning("导出按钮不存在刷新网页")
self.page.refresh()
return pd.read_csv(file_name)
......@@ -183,7 +185,6 @@ class Payment(AutoInterface):
'RESOLUTION_COST': row.xpath('string(./td[@data-column="RESOLUTION_COST"])').strip().replace('$', '')
}
# 如果字段为空则设为空字符串
data = {k: "" if not v else v for k, v in data.items()}
data_list.append(data)
......@@ -306,7 +307,7 @@ class Payment(AutoInterface):
# 拼接新的文件名
new_file_name = f"{current_datetime}_{self.country}_{file_name}"
excel.save_xls(list_data, new_file_name, "Remittance payments")
log.info(f"共计:{len(list_data)} 订单")
self.logger.info(f"共计:{len(list_data)} 订单")
all_normal_pay_data = []
all_price_pay_data = []
......@@ -318,7 +319,7 @@ class Payment(AutoInterface):
# 获取当前订单的Payee和优惠比例
vendor_payment_terms = self.__get_po_code(i, invoice_number)
log.info(vendor_payment_terms)
self.logger.info(vendor_payment_terms)
vendor = vendor_payment_terms['vendor']
deduction_points = int(vendor_payment_terms['payment_terms'])
......@@ -338,7 +339,7 @@ class Payment(AutoInterface):
self.__invoice_details(invoice_number, last_two, last_three)
# 点击争议价tab
self.page.ele("#pd").click()
log.debug("等待争议数据加载,10秒后获取表单数据")
self.logger.debug("等待争议数据加载,10秒后获取表单数据")
self.page.wait(10)
table_html = self.page.ele("#priceDiscrepancyWithDMSGridForm", timeout=5).html
# 抓取表单数据
......@@ -413,4 +414,4 @@ class Payment(AutoInterface):
# 推送数据
rabbit.send_message(push_data)
rabbit.close()
\ No newline at end of file
rabbit.close()
......@@ -2,13 +2,16 @@
# 回款明细
import pandas as pd
from app.vc import log, rabbit
from app.logger.logger import Logger
from app.vc import rabbit
from app.vc.interface import AutoInterface
class PaymentPush(AutoInterface):
def __init__(self, country: str, shop_code: str):
def __init__(self,logger: Logger, country: str, shop_code: str):
self.logger = logger
self.country = country
self.shop_code = shop_code
......@@ -47,7 +50,7 @@ class PaymentPush(AutoInterface):
# 转换为字典,orient='index' 表示以索引为键
payments_map = payments.to_dict(orient='index')
log.info(f"共计:{len(invoices)} 订单")
self.logger.info(f"共计:{len(invoices)} 订单")
rabbit.connection()
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
......
......@@ -4,16 +4,18 @@ import os
from datetime import datetime
import pandas as pd
from DrissionPage import ChromiumPage as Page
from DrissionPage.errors import ElementNotFoundError
from app.helper import domain, api, excel, rabbitmq, file
from app.vc import log, rabbit
from app.helper import domain, api, excel, file
from app.logger.logger import Logger
from app.vc import rabbit
from app.vc.interface import AutoInterface
from DrissionPage import ChromiumPage as Page
class ReturnGoods(AutoInterface):
def __init__(self, page: Page, country: str,shop_code: str):
def __init__(self,logger: Logger, page: Page, country: str,shop_code: str):
self.logger = logger
self.page = page
self.country = country
self.shop_code = shop_code
......@@ -41,7 +43,7 @@ class ReturnGoods(AutoInterface):
file.wait_for_downloads(file_name)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
self.logger.warning("元素未找到,刷新网页")
self.page.refresh()
# 读取回退商品详情
......@@ -56,14 +58,14 @@ class ReturnGoods(AutoInterface):
# 读取list数据
list_data = pd.read_excel(file_name)
log.info(f"共计:{len(list_data)} 订单")
self.logger.info(f"共计:{len(list_data)} 订单")
new_list_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
return_id = data.get('Return ID')
log.info({"index": i, "return_id": return_id})
self.logger.info({"index": i, "return_id": return_id})
# 下载退货详情表格读取数据
item_data = self.__export_item_read_data(return_id)
......
......@@ -8,13 +8,16 @@ from datetime import datetime
from urllib.parse import urlparse, parse_qs
from DrissionPage import ChromiumPage as Page
from DrissionPage.errors import ElementNotFoundError
from app.vc import log, rabbit
from app.logger.logger import Logger
from app.vc import rabbit
from app.vc.interface import AutoInterface
from app.helper import domain, file, excel, rabbitmq, api
from app.helper import domain, file, excel, api
class Spa(AutoInterface):
def __init__(self, page: Page, country: str, shop_code: str):
def __init__(self, logger: Logger, page: Page, country: str, shop_code: str):
self.logger = logger
self.page = page
self.country = country
self.shop_code = shop_code
......@@ -41,12 +44,12 @@ class Spa(AutoInterface):
# 获取报表表单内容
report_table_html = self.page.ele("#backup-report-table").html
if report_table_html is None or report_table_html == "":
log.warning("表单内容为空,刷新网页")
self.logger.warning("表单内容为空,刷新网页")
self.page.refresh()
continue
return report_table_html
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
self.logger.warning("元素未找到,刷新网页")
self.page.refresh()
def __export_item_read_data(self, invoice_id):
......@@ -78,7 +81,7 @@ class Spa(AutoInterface):
self.page.download(host + link, report_file_tmp_dir, show_msg=False)
is_down = file.wait_for_downloads(report_file, 60)
if is_down: break
log.warning(f"下载 {invoice_id} 失败,重新下载")
self.logger.warning(f"下载 {invoice_id} 失败,重新下载")
try:
df = pd.read_excel(report_file)
......@@ -167,12 +170,11 @@ class Spa(AutoInterface):
"""验证 ASIN 是否有效"""
return asin and not (isinstance(asin, float) and math.isnan(asin))
@staticmethod
def __write_sheet(writer, data, sheet_name):
def __write_sheet(self, writer, data, sheet_name):
if not isinstance(sheet_name, str):
sheet_name = str(sheet_name)
log.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
self.logger.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
df = pd.DataFrame(data) # 将数据转换为 DataFrame
df.to_excel(writer, sheet_name=sheet_name, index=False)
......@@ -182,26 +184,26 @@ class Spa(AutoInterface):
with pd.ExcelWriter(new_file_name) as writer:
# 写入小数据
if sheet_data:
log.info(f"保存小数据,共计 {len(sheet_data)} 条")
self.logger.info(f"保存小数据,共计 {len(sheet_data)} 条")
self.__write_sheet(writer, sheet_data, "Sheet1")
# 写入大数据(使用多线程并行写入不同表)
if large_sheet_data:
log.info(f"保存大数据,共计 {sum(len(data) for data in large_sheet_data.values())} 条")
self.logger.info(f"保存大数据,共计 {sum(len(data) for data in large_sheet_data.values())} 条")
for sheet_name, data in large_sheet_data.items():
self.__write_sheet(writer, data, sheet_name)
# with ThreadPoolExecutor() as executor:
# for sheet_name, data in large_sheet_data.items():
# executor.submit(write_sheet, writer, data, sheet_name)
log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
self.logger.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
def push_data_queue(self):
rabbit.connection()
rabbit.connect(queue='spa_robot', routing_key='spa_robot', exchange='reports')
log.info("开始读取数据....")
self.logger.info("开始读取数据....")
data_dict = pd.read_excel(self.result_file_name, sheet_name=None, keep_default_na=False, na_values=[])
log.info("开始推送消息....")
self.logger.info("开始推送消息....")
for sheet_name, values in data_dict.items():
for _, item_row in values.iterrows():
if sheet_name == "Sheet1":
......@@ -238,7 +240,7 @@ class Spa(AutoInterface):
# 读取数据列表
coop_list = pd.read_excel(file_name)
log.info(f"共计: {len(coop_list)} 条数据")
self.logger.info(f"共计: {len(coop_list)} 条数据")
sheet_data = [] # 用于保存小数据
large_sheet_data = {} # 保存大数据(需要分 Sheet)
......@@ -246,17 +248,17 @@ class Spa(AutoInterface):
for index, coop in coop_list.iterrows():
index += 1
invoice_id = coop.get("Invoice ID") # 获取发票 ID
log.info({"index": index, "invoice_id": invoice_id})
self.logger.info({"index": index, "invoice_id": invoice_id})
if not invoice_id:
log.warning(f"缺少 Invoice ID,跳过第 {index} 条数据")
self.logger.warning(f"缺少 Invoice ID,跳过第 {index} 条数据")
continue
# 获取当前发票的 item 列表
item_dict = self.__export_item_read_data(invoice_id)
if item_dict is None:
sheet_data.append(coop)
log.warning(f"{invoice_id} 暂无报告信息")
self.logger.warning(f"{invoice_id} 暂无报告信息")
continue
if len(item_dict) > 1:
......
pip.exe install -i https://mirrors.cloud.tencent.com/pypi/simple -r requirements.txt
pyinstaller -F -n amazon_vc.exe main.py
pyinstaller -F -n amazon.exe --noconsole gui.py
pyinstaller -F -n amazon_cmd.exe main.py
pyinstaller -F -n amazon_gui.exe --noconsole gui.py
pyinstaller -F -n easy_gui.exe --noconsole easy.py
rd /s /q build
del *.spec
\ No newline at end of file
# coding: utf-8
import json
import requests
import xmltodict
import pandas as pd
from datetime import datetime
class YcClient:
@classmethod
def __init__(cls, app_key, app_token):
cls.app_key = app_key
cls.app_token = app_token
cls.base_url = "http://8.210.223.221/default/svc/web-service"
@classmethod
def call_service(cls, service, params_json):
"""
调用 SOAP 服务的封装函数。
:param params_json: 请求的数据内容,Python 字典
:param service: 要调用的接口方法
:return: 响应内容
"""
# 构造 SOAP 请求的 XML 数据
payload = f"""<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns1="http://www.example.org/Ec/">
<SOAP-ENV:Body>
<ns1:callService>
<appToken>{cls.app_token}</appToken>
<appKey>{cls.app_key}</appKey>
<service>{service}</service>
<paramsJson>{json.dumps(params_json)}</paramsJson>
</ns1:callService>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
# 发送请求
response = requests.post(cls.base_url, data=payload, headers={"Content-Type": "text/xml;charset=UTF-8"})
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"HTTP 错误: {response.status_code}, 内容: {response.text}")
# 将 XML 转换为字典
response_dict = xmltodict.parse(response.text)
resp = (response_dict.get("SOAP-ENV:Envelope", {}).
get("SOAP-ENV:Body", {}).
get("ns1:callServiceResponse", {}).
get('response', {}))
json_res = resp
# 如果 resp 是字符串,则解析为 JSON 字典
if isinstance(resp, str):
json_res = json.loads(resp)
if json_res.get("ask", "Failure") != "Success":
raise Exception(json_res.get("message", "未知错误"))
return json_res.get("data", {})
@classmethod
def get_products(cls, sku: str) -> dict:
"""
获取商品列表
:param sku: 商品编码
"""
params_json = {
"pageSize": 10,
"page": 1,
"product_sku": sku,
}
return cls.call_service("getProductList", params_json)
@classmethod
def create_asn(cls, warehouse_code, desc, tracking_number, order_item_list):
"""创建入库单"""
params_json = {
"reference_no": tracking_number, # 参考号
"warehouse_code": warehouse_code, # 目的仓
"tracking_number": tracking_number, # 跟踪号
"bulk_cargo_type": 1, # 散货类型(托)
"pallet_cnt": 1, # 散货托数量
"receiving_desc": desc, # 描述
"items": order_item_list
}
return cls.call_service("createAsn", params_json)
class Process(YcClient):
def __init__(self):
# 调用 API
key = "c906cc46bda8cea593c0b6b20eadb1f2"
token = "2832175c4ee5c6efd79e129e919f2dfd"
super().__init__(key, token)
@classmethod
def read_data(cls, file):
# 读取 Excel 文件
result = pd.read_excel(file)
# 按入库单跟踪号和 SKU 去重并汇总数量
result = result.groupby(['Shipment Request ID', 'SKU'], as_index=False).agg({'Return quantity': 'sum'})
# 组装结果数据
result_list = {}
for _, item in result.iterrows():
tracking_number = item.get('Shipment Request ID', '')
order_item = {
"product_sku": item.get('SKU', ''),
"quantity": item.get('Return quantity', 0),
"box_no": 1
}
# 初始化或追加数据
if tracking_number not in result_list:
result_list[tracking_number] = {"item": []}
result_list[tracking_number]["item"].append(order_item)
return result_list
@classmethod
def send_data(cls, code, result_list):
# 获取当前时间
desc = datetime.now().strftime("%Y-%m")
result = {}
for tracking_number, order_data in result_list.items():
try:
resp = cls.create_asn(code, desc, tracking_number, order_data["item"])
print(f"跟踪号:{tracking_number}, 入库单号:{resp.get('receiving_code')}")
result[tracking_number] = resp.get('receiving_code')
except Exception as e:
print(e)
return result
@classmethod
def save_excel(cls, file, result):
# 读取 Excel 文件
data = pd.read_excel(file)
# 通过映射方式直接添加新列 'Receiving code'
data['Receiving code'] = data['Shipment Request ID'].map(result)
# 保存修改后的数据到原文件或另一个文件
output_file = file.replace('.xlsx', '_updated.xlsx')
data.to_excel(output_file, index=False)
print(f"结果已保存到 {output_file}")
# 仓库编码和名称的映射
warehouse_map = {
"USLAX01": "美西CA仓库",
"KHDCN": "国内仓-练习使用",
"USNJ01": "美东NJ仓库",
"USWA03": "Sumner仓",
"NOVILAND": "Noviland",
"USHOU01": "美南TX仓库",
"JPTOKYO01": "日本关东仓库",
"CNFUJ01": "CNFUJ01",
}
if __name__ == '__main__':
for warehouse_code, warehouse_name in warehouse_map.items():
print(f"仓库编码:{warehouse_code} , 名称:{warehouse_name}")
while True:
# 提示用户输入
warehouse_code = input("\n请输入仓库编码:").strip()
# 根据输入查找仓库名称
warehouse_name = warehouse_map.get(warehouse_code)
if warehouse_name:
break
else:
print(f"仓库编码 [{warehouse_code}] 未找到对应的仓库名称,请重新输入。")
print(f"\n仓库编码 [{warehouse_code}] 对应的仓库名称是:{warehouse_name}")
# 提示用户输入文件路径
file_path = input("请输入 Excel 文件路径:").strip()
process = Process()
# 处理文件
data_list = process.read_data(file_path)
# 发送数据
send_result = process.send_data(warehouse_code, data_list)
# 保存结果到 Excel 文件
process.save_excel(file_path, send_result)
# coding: utf-8
import json
import queue
import threading
from datetime import datetime
from tkinter import filedialog, scrolledtext
import pandas as pd
import requests
import ttkbootstrap as ttk
import xmltodict
from ttkbootstrap.constants import *
# 原有的YcClient和Process类重构为实例方法
class YcClient:
def __init__(self, app_key, app_token):
self.app_key = app_key
self.app_token = app_token
self.base_url = "http://8.210.223.221/default/svc/web-service"
def call_service(self, service, params_json):
"""调用SOAP服务的封装函数"""
payload = f"""<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns1="http://www.example.org/Ec/">
<SOAP-ENV:Body>
<ns1:callService>
<appToken>{self.app_token}</appToken>
<appKey>{self.app_key}</appKey>
<service>{service}</service>
<paramsJson>{json.dumps(params_json)}</paramsJson>
</ns1:callService>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
response = requests.post(
self.base_url,
data=payload,
headers={"Content-Type": "text/xml;charset=UTF-8"}
)
if response.status_code != 200:
raise Exception(f"HTTP错误: {response.status_code}, 内容: {response.text}")
response_dict = xmltodict.parse(response.text)
resp = (response_dict.get("SOAP-ENV:Envelope", {})
.get("SOAP-ENV:Body", {})
.get("ns1:callServiceResponse", {})
.get('response', {}))
json_res = json.loads(resp) if isinstance(resp, str) else resp
if json_res.get("ask", "Failure") != "Success":
raise Exception(json_res.get("message", "未知错误"))
return json_res.get("data", {})
def get_products(self, sku: str) -> dict:
"""获取商品列表"""
params_json = {"pageSize": 10, "page": 1, "product_sku": sku}
return self.call_service("getProductList", params_json)
def create_asn(self, warehouse_code, desc, tracking_number, order_item_list):
"""创建入库单"""
params_json = {
"reference_no": tracking_number,
"warehouse_code": warehouse_code,
"tracking_number": tracking_number,
"bulk_cargo_type": 1,
"pallet_cnt": 1,
"receiving_desc": desc,
"items": order_item_list
}
return self.call_service("createAsn", params_json)
class Process(YcClient):
def __init__(self, app_key, app_token):
super().__init__(app_key, app_token)
self.log_message = None
@staticmethod
def read_data(file):
"""读取并处理Excel数据"""
result = pd.read_excel(file)
result = result.groupby(['Shipment Request ID', 'SKU'], as_index=False).agg({'Return quantity': 'sum'})
result_list = {}
for _, item in result.iterrows():
tracking_number = item['Shipment Request ID']
order_item = {
"product_sku": item['SKU'],
"quantity": item['Return quantity'],
"box_no": 1
}
result_list.setdefault(tracking_number, {"item": []})["item"].append(order_item)
return result_list
def send_data(self, warehouse_code, data_list):
"""发送数据到API"""
desc = datetime.now().strftime("%Y-%m")
result = {}
for tracking_number, order_data in data_list.items():
try:
resp = self.create_asn(warehouse_code, desc, tracking_number, order_data["item"])
result[tracking_number] = resp.get('receiving_code')
self.log_message(f"跟踪号:{tracking_number}, 入库单号:{resp.get('receiving_code')}")
except Exception as e:
self.log_message(f"错误: {str(e)}")
return result
@staticmethod
def save_excel(file, result):
"""保存结果到Excel"""
data = pd.read_excel(file)
data['Receiving code'] = data['Shipment Request ID'].map(result)
output_file = file.replace('.xlsx', '_updated.xlsx')
data.to_excel(output_file, index=False)
return output_file
class Application(ttk.Window):
def __init__(self):
super().__init__(themename="cosmo")
self.log_area = None
self.run_btn = None
self.file_entry = None
self.warehouse_combo = None
self.title("入库单创建工具")
self.geometry("600x500")
self.resizable(False, False)
# 仓库编码映射
self.warehouse_map = {
"USLAX01": "美西CA仓库",
"KHDCN": "国内仓-练习使用",
"USNJ01": "美东NJ仓库",
"USWA03": "Sumner仓",
"NOVILAND": "Noviland",
"USHOU01": "美南TX仓库",
"JPTOKYO01": "日本关东仓库",
"CNFUJ01": "CNFUJ01",
}
# 设置窗口尺寸并居中
self._center_window()
# 创建日志队列
self.log_queue = queue.Queue()
self.create_widgets()
self.after(100, self.process_log_queue)
def _center_window(self):
"""设置窗口居中"""
window_width = 600
window_height = 500
# 获取屏幕尺寸
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
# 计算居中坐标
x = (screen_width - window_width) // 2
y = (screen_height - window_height) // 2
# 设置窗口位置
self.geometry(f"{window_width}x{window_height}+{x}+{y}")
def create_widgets(self):
"""创建界面组件"""
main_frame = ttk.Frame(self)
main_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# 仓库选择
warehouse_frame = ttk.Labelframe(main_frame, text="仓库选择", padding=10)
warehouse_frame.pack(fill=X, pady=5)
self.warehouse_combo = ttk.Combobox(
warehouse_frame,
values=[f"{code} - {name}" for code, name in self.warehouse_map.items()],
state="readonly"
)
self.warehouse_combo.pack(fill=X, padx=5)
# 文件选择
file_frame = ttk.Labelframe(main_frame, text="Excel文件", padding=10)
file_frame.pack(fill=X, pady=5)
self.file_entry = ttk.Entry(file_frame)
self.file_entry.pack(side=LEFT, fill=X, expand=True, padx=5)
ttk.Button(file_frame, text="浏览", command=self.select_file).pack(side=LEFT, padx=5)
# 控制按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=X, pady=10)
self.run_btn = ttk.Button(btn_frame, text="开始执行", command=self.start_process, style=PRIMARY)
self.run_btn.pack(pady=5)
# 日志显示
log_frame = ttk.Labelframe(main_frame, text="操作日志", padding=10)
log_frame.pack(fill=BOTH, expand=True)
self.log_area = scrolledtext.ScrolledText(log_frame, state=DISABLED)
self.log_area.pack(fill=BOTH, expand=True)
def select_file(self):
"""选择Excel文件"""
path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx *.xls")])
if path:
self.file_entry.delete(0, END)
self.file_entry.insert(0, path)
def start_process(self):
"""启动处理线程"""
app_key = "c906cc46bda8cea593c0b6b20eadb1f2"
app_token = "2832175c4ee5c6efd79e129e919f2dfd"
warehouse = self.warehouse_combo.get().split(" - ")[0]
file_path = self.file_entry.get().strip()
if not warehouse:
self.log("错误:请选择仓库")
return
if not file_path:
self.log("错误:请选择Excel文件")
return
self.run_btn.config(state=DISABLED)
threading.Thread(
target=self.process_data,
args=(app_key, app_token, warehouse, file_path),
daemon=True
).start()
def process_data(self, app_key, app_token, warehouse, file_path):
"""后台数据处理线程"""
try:
processor = Process(app_key, app_token)
processor.log_message = self.log # 传递日志方法
self.log("开始处理数据...")
data_list = processor.read_data(file_path)
if not data_list:
self.log("没有需要处理的数据")
return
self.log("开始创建入库单...")
result = processor.send_data(warehouse, data_list)
self.log("正在保存结果文件...")
output_path = processor.save_excel(file_path, result)
self.log(f"处理完成!结果已保存至:{output_path}")
except Exception as e:
self.log(f"处理出错:{str(e)}")
finally:
self.run_btn.config(state=NORMAL)
def log(self, message):
"""线程安全的日志记录"""
self.log_queue.put(message)
def process_log_queue(self):
"""处理日志队列"""
while not self.log_queue.empty():
msg = self.log_queue.get()
self.log_area.config(state=NORMAL)
self.log_area.insert(END, msg + "\n")
self.log_area.config(state=DISABLED)
self.log_area.see(END)
self.after(100, self.process_log_queue)
if __name__ == "__main__":
app = Application()
app.mainloop()
import ttkbootstrap as ttk
from tkinter import filedialog, messagebox, scrolledtext
# coding: utf-8
import os
import threading
import queue
import time
import traceback
from tkinter import filedialog, simpledialog, messagebox
from DrissionPage import ChromiumPage
from dotenv import load_dotenv
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from app.helper import file, domain
from app.logger.gui import GuiLog
from app.vc.payment import Payment
from app.vc.payment_push import PaymentPush
from app.vc.return_goods import ReturnGoods
from app.vc.spa import Spa
class VCManagerGUI(ttk.Window):
def __init__(self, logger):
super().__init__(themename="cosmo")
self.run_btn = None
self.file_entry = None
self.action_var = None
self.shop_entry = None
self.country_var = None
self.log_text = None
self.title("店铺管理工具")
self.geometry("700x800")
# 初始化状态变量
self.page = None
self.running = False
self.log_queue = queue.Queue()
self.payee_code = None
# 设置日志处理
logger.set_console(self.log_queue)
self.logger = logger
# 设置窗口尺寸并居中
self._center_window()
# 创建界面组件
self.create_widgets()
# 启动日志处理循环
self.after(100, self.process_log_queue)
def _center_window(self):
"""设置窗口居中"""
window_width = 700
window_height = 800
# 获取屏幕尺寸
screen_width = self.winfo_screenwidth()
screen_height = self.winfo_screenheight()
# 计算居中坐标
x = (screen_width - window_width) // 2
y = (screen_height - window_height) // 2
# 设置窗口位置
self.geometry(f"{window_width}x{window_height}+{x}+{y}")
def create_widgets(self):
"""创建主界面布局"""
main_frame = ttk.Frame(self)
main_frame.pack(fill=BOTH, expand=True, padx=15, pady=15)
# 配置区域
config_frame = ttk.Labelframe(main_frame, text="配置参数", padding=10)
config_frame.pack(fill=X, pady=10)
# 国家选择
country_frame = ttk.Frame(config_frame)
country_frame.grid(row=0, column=0, sticky=W, pady=5)
ttk.Label(country_frame, text="国家:", width=6).pack(side=LEFT)
self.country_var = ttk.StringVar(value="US")
countries = [
("美国", "US"),
("英国", "UK"),
("日本", "JP"),
("法国", "FR"),
("德国", "DE"),
("加拿大", "CA"),
]
for i, (text, value) in enumerate(countries):
rb = ttk.Radiobutton(
country_frame,
text=text,
variable=self.country_var,
value=value
)
rb.pack(side=LEFT, padx=(10 if i != 0 else 0))
# 功能选择
action_frame = ttk.Frame(config_frame)
action_frame.grid(row=1, column=0, sticky=W, pady=5)
ttk.Label(action_frame, text="功能:", width=6).pack(side=LEFT)
self.action_var = ttk.StringVar()
actions = [
("SPA查询", "spa"),
("退货查询", "return"),
("回款查询", "payment"),
("ERP回款", "payment_erp")
]
for i, (text, value) in enumerate(actions):
rb = ttk.Radiobutton(
action_frame,
text=text,
variable=self.action_var,
value=value
)
rb.pack(side=LEFT, padx=(10 if i != 0 else 0))
# 文件选择区域
file_frame = ttk.Labelframe(main_frame, text="数据文件", padding=10)
file_frame.pack(fill=X, pady=10)
self.file_entry = ttk.Entry(file_frame)
self.file_entry.pack(side=LEFT, fill=X, expand=True, padx=5)
ttk.Button(file_frame, text="浏览", command=self.select_file, width=8).pack(side=LEFT)
# 控制按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=X, pady=15)
self.run_btn = ttk.Button(btn_frame, text="开始执行", command=self.start_process,
style=PRIMARY, width=12)
self.run_btn.pack(side=LEFT, padx=5)
ttk.Button(btn_frame, text="清除日志", command=self.clear_log, width=10).pack(side=LEFT, padx=5)
ttk.Button(btn_frame, text="退出", command=self.quit_app, width=8).pack(side=RIGHT)
# 日志显示
log_frame = ttk.Labelframe(main_frame, text="操作日志", padding=10)
log_frame.pack(fill=BOTH, expand=True)
# 创建主窗口
root = ttk.Window(themename="cosmo", minsize=(1000, 380), resizable=(False, False))
root.title("vendorCentral工具")
self.log_text = ttk.ScrolledText(log_frame, state=DISABLED)
self.log_text.pack(fill=BOTH, expand=True)
# 国家下拉框
country_label = ttk.Label(root, text="国家")
country_label.grid(row=0, column=0, padx=5, pady=5, sticky="w")
def select_file(self):
"""选择数据文件"""
path = filedialog.askopenfilename(filetypes=[("Excel文件", "*.xlsx *.xls")])
if path:
self.file_entry.delete(0, ttk.END)
self.file_entry.insert(0, path)
country_var = ttk.StringVar(value="US") # 默认值为 "US"
country_combobox = ttk.Combobox(root, textvariable=country_var, values=["DE", "FR", "JP", "CA", "UK", "US"])
country_combobox.grid(row=0, column=1, padx=5, pady=5)
def start_process(self):
"""启动处理线程"""
if self.running:
messagebox.showwarning("警告", "已有任务正在运行")
return
# 店铺编码下拉框
shop_code_label = ttk.Label(root, text="店铺")
shop_code_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
if not self.validate_input():
return
shop_code_var = ttk.StringVar(value="VECELO")
shop_code_combobox = ttk.Combobox(root, textvariable=shop_code_var,
values=["DE-VC", "FR-VC", "JP-VC", "CA-VC", "UK-VC", "VECELO"])
shop_code_combobox.grid(row=1, column=1, padx=5, pady=5)
self.running = True
self.run_btn.config(state=ttk.DISABLED)
threading.Thread(target=self.run_process, daemon=True).start()
# 功能下拉框
action_label = ttk.Label(root, text="功能")
action_label.grid(row=2, column=0, padx=5, pady=5, sticky="w")
def validate_input(self):
"""验证输入有效性"""
if not self.file_entry.get():
messagebox.showerror("输入错误", "请选择数据文件")
return False
if not self.action_var.get():
messagebox.showerror("输入错误", "请选择要执行的功能")
return False
return True
action_var = ttk.StringVar(value="请选择")
action_combobox = ttk.Combobox(root, textvariable=action_var, values=["spa", "return", "payment", "payment_erp"])
action_combobox.grid(row=2, column=1, padx=5, pady=5)
def run_process(self):
"""后台处理流程"""
try:
# 初始化浏览器
self.log("初始化浏览器引擎...")
self.init_browser()
# 回款Code下拉框
payee_code_label = ttk.Label(root, text="code")
payee_code_label.grid(row=3, column=0, padx=5, pady=5, sticky="w")
# 获取参数
params = {
'country': self.country_var.get(),
'action': self.action_var.get(),
'file_name': self.file_entry.get()
}
payee_code_var = ttk.StringVar(value="VECET") # 默认值为 "VECET"
payee_code_combobox = ttk.Combobox(root, textvariable=payee_code_var, values=["VECET"])
payee_code_combobox.grid(row=3, column=1, padx=5, pady=5)
# 特殊参数处理
if params['action'] == "payment":
self.get_payee_code()
if not self.payee_code:
return
# 文件名输入框
file_name_label = ttk.Label(root, text="文件")
file_name_label.grid(row=4, column=0, padx=5, pady=5, sticky="w")
# 创建处理器实例
processor = self.create_processor(params)
file_name_var = ttk.StringVar(value="") # 默认值为空
file_name_entry = ttk.Entry(root, textvariable=file_name_var)
file_name_entry.grid(row=4, column=1, padx=5, pady=5)
# 切换域名
domain.domain_page(self.logger, self.page, self.country_var.get())
# 执行核心操作
self.log(f"开始执行 {params['action']} 操作...")
processor.run(params['file_name'])
processor.push_data_queue()
self.log("操作成功完成!")
# 文件选择函数
def select_file():
file = filedialog.askopenfilename(
title="选择文件",
filetypes=[("Excel 文件", "*.xls *.xlsx")]
)
if file: # 如果用户选择了文件
file_name_var.set(file) # 将文件路径写入输入框
except Exception as e:
self.log(f"发生错误:{str(e)}")
self.log(traceback.format_exc())
finally:
self.cleanup_resources()
def init_browser(self):
"""初始化浏览器配置"""
self.page = ChromiumPage()
self.page.set.load_mode.normal()
self.page.set.when_download_file_exists('overwrite')
download_path = os.path.join(os.getcwd(), "downloads")
file.make_dir(download_path)
self.page.set.download_path(download_path)
self.log(f"下载目录设置为:{download_path}")
# 绑定点击事件
file_name_entry.bind("<Button-1>", lambda event: select_file())
def create_processor(self, params):
"""创建功能处理器"""
# 根据国家编码获取店铺代码
country = params['country']
if country == "JP":
shop_code = "JP-VC"
elif country == "UK":
shop_code = "UK-VC"
elif country == "DE":
shop_code = "DE-VC"
elif country == "FR":
shop_code = "FR-VC"
elif country == "CA":
shop_code = "CA-VC"
else:
shop_code = "VECELO"
# 提交按钮
def on_submit():
# 禁用按钮
submit_button.config(state="disabled")
action = params['action']
if action == "payment":
return Payment(self.logger, self.page, country, self.payee_code, shop_code)
elif action == "payment_erp":
return PaymentPush(self.logger, country, shop_code)
elif action == "return":
return ReturnGoods(self.logger, self.page, country, shop_code)
elif action == "spa":
return Spa(self.logger, self.page, country, shop_code)
else:
raise ValueError(f"未知的功能类型:{action}")
# 获取所有下拉框和输入框的值
values = {
"国家": country_var.get(),
"店铺编码": shop_code_var.get(),
"功能": action_var.get(),
"回款Code": payee_code_var.get(),
"文件名": file_name_var.get()
}
# 显示结果
result = "\n".join([f"{key}: {value}" for key, value in values.items()])
messagebox.showinfo("提交结果", result)
def get_payee_code(self):
"""获取回款Code"""
self.after(0, self._show_payee_dialog)
while self.running and not self.payee_code:
time.sleep(0.1)
# 模拟日志输出
def log_simulation():
for i in range(1, 6):
time.sleep(1) # 模拟耗时操作
log_message = f"日志: 执行步骤 {i}\n"
log_console.insert(ttk.END, log_message)
log_console.see(ttk.END) # 自动滚动到最新日志
def _show_payee_dialog(self):
"""显示回款Code输入对话框"""
self.payee_code = simpledialog.askstring(
"回款信息",
"请输入回款Code(从URL参数获取):",
parent=self,
initialvalue="VECET"
)
if not self.payee_code:
self.log("用户取消回款Code输入")
self.running = False
# 启用按钮
submit_button.config(state="normal")
def log(self, message):
"""记录日志到队列"""
self.logger.info(message)
# timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# self.log_queue.put(f"[{timestamp}] {message}")
# 使用线程避免阻塞主界面
threading.Thread(target=log_simulation).start()
def process_log_queue(self):
"""处理日志队列"""
while not self.log_queue.empty():
msg = self.log_queue.get()
self.log_text.configure(state=ttk.NORMAL)
self.log_text.insert(ttk.END, msg + "\n")
self.log_text.configure(state=ttk.DISABLED)
self.log_text.see(ttk.END)
self.after(100, self.process_log_queue)
def clear_log(self):
"""清除日志"""
self.log_text.configure(state=ttk.NORMAL)
self.log_text.delete(1.0, ttk.END)
self.log_text.configure(state=ttk.DISABLED)
submit_button = ttk.Button(root, style="info", text="执行", width=25, command=on_submit)
submit_button.grid(row=5, column=0, columnspan=2, pady=5)
def cleanup_resources(self):
"""清理资源"""
if self.page:
try:
self.page.close()
self.log("浏览器资源已释放")
except Exception as e:
self.log(f"释放浏览器资源时出错:{str(e)}")
finally:
self.page = None
self.running = False
self.after(0, lambda: self.run_btn.config(state=ttk.NORMAL))
# 日志控制台
log_frame = ttk.Frame(root) # 创建一个框架用于日志框和描述
log_frame.grid(row=0, column=2, rowspan=6, padx=10, pady=5)
def quit_app(self):
"""安全退出程序"""
if messagebox.askokcancel("退出", "确认要退出程序吗?"):
self.cleanup_resources()
self.destroy()
# 日志框
log_console = scrolledtext.ScrolledText(log_frame, wrap=ttk.WORD, width=120, height=20)
log_console.pack(fill="both", expand=True, pady=10)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
try:
log = GuiLog()
load_dotenv()
app = VCManagerGUI(log)
app.mainloop()
except KeyboardInterrupt:
exit(1)
......@@ -2,6 +2,7 @@
import os
from app.helper import file, domain, helper
from app.helper.logger import ConsoleLog
from app.vc.payment import Payment
from app.vc.payment_push import PaymentPush
from app.vc.return_goods import ReturnGoods
......@@ -29,15 +30,17 @@ if __name__ == '__main__':
if action == "":
raise Exception("请输入要执行的功能")
logger = ConsoleLog()
if action.lower() == "payment":
payee_code = helper.get_input_with_default("回款Code: [ 详情页url参数 payeeCode ]", "VECET")
object_instate = Payment(page, country, payee_code, shop_code)
object_instate = Payment(logger, page, country, payee_code, shop_code)
if action.lower() == "payment_erp":
object_instate = PaymentPush(country, shop_code)
object_instate = PaymentPush(logger, country, shop_code)
elif action.lower() == "return":
object_instate = ReturnGoods(page, country, shop_code)
object_instate = ReturnGoods(logger, page, country, shop_code)
elif action.lower() == "spa":
object_instate = Spa(page, country, shop_code)
object_instate = Spa(logger, page, country, shop_code)
else:
raise Exception("请输入正确的功能")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment