Commit 0b546372 authored by 邱阿朋's avatar 邱阿朋

build: 更新环境配置和构建脚本

- 移除 .env 文件中的敏感信息
- 删除 .gitignore 文件,准备重新配置- 移除现有的源代码文件
- 更新 build.bat 脚本,适应新的项目结构
parent 8dc775b3
亚马逊-财务报表
\ No newline at end of file
# coding: utf-8
import re
import pandas as pd
from openpyxl.reader.excel import load_workbook
def save_xls(data, output_file, sheet_name='Sheet1', adjusted=False):
try:
# 如果文件已存在,则追加新的 sheet
......@@ -39,29 +38,3 @@ def save_xls(data, output_file, sheet_name='Sheet1', adjusted=False):
ws.column_dimensions[column_letter].width = adjusted_width
wb.save(output_file)
def remove_last_comma(csv_file, skip_rows=2):
# 创建一个空列表用于存储处理后的行
cleaned_lines = []
# 读取原始 CSV 文件并处理行末的逗号
with open(csv_file, 'r', encoding='utf-8') as file:
# 跳过指定数量的行
for _ in range(skip_rows):
next(file) # 跳过每一行
for line in file:
# 使用正则表达式替换 空格 + 数字 + 引号
cleaned_line = re.sub(r'(\s\d+)"', r'\1 ', line) # 去掉空格 + 数字后面的引号
# 使用正则表达式替换每个逗号前的空格为引号
cleaned_line = re.sub(r'\s+,\s*"', r'", "', cleaned_line)
# 去掉末尾的逗号和换行符
cleaned_line = cleaned_line.rstrip(',\n')
# 不添加换行符,待会写入时统一处理
cleaned_lines.append(cleaned_line)
# 将处理后的数据逐行写回文件
with open(csv_file, 'w', encoding='utf-8', newline='') as cleaned_file:
for line in cleaned_lines:
cleaned_file.write(line + '\n') # 每一行单独写入,确保每行独立处理
......@@ -4,17 +4,8 @@ import sys
import traceback
def make_dir(path):
# 检查下载目录是否存在,如果不存在则创建
if not os.path.exists(path):
os.makedirs(path)
return False
return True
def get_input_with_default(prompt, default):
user_input = input(f"{prompt}(默认为 '{default}'):")
user_input = input(f"{prompt}; ( 默认'{default}'):")
return user_input.upper() or default
......
# coding: utf-8
import os
import redis
......
# coding: utf-8
from app.helper import logger
log = logger.ConsoleLog()
\ No newline at end of file
# coding: utf-8
from abc import ABC, abstractmethod
# 定义一个接口
class AutoInterface(ABC):
@abstractmethod
def run(self, file_name: str):
pass
# coding: utf-8
# 回款
import os
from datetime import datetime
import pandas as pd
from DrissionPage.errors import ElementNotFoundError
from app.helper import domain, file, rabbitmq, api, excel
from app.vc import log
from app.vc.interface import AutoInterface
from DrissionPage import ChromiumPage as Page
class Payment(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
def __page_get(self, url: str):
host = domain.switch_domain(self.country)
full_url = host + url
self.page.get(full_url, timeout=5)
def __export_item_read_data(self, return_id: str):
file_name = f"return_goods\\{return_id}.xls"
if not os.path.isfile(file_name):
while True:
try:
# 打开退回详情下载明细
self.__page_get(f"katalmonsapp/vendor/members/returns/{return_id}")
self.page.ele("#file-download-button").click.to_download(rename=file_name)
file.wait_for_downloads(file_name)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
self.page.refresh()
# 读取回退商品详情
return pd.read_excel(file_name)
def __push_data_queue(self, file_name: str):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name, keep_default_na=False, na_values=[])
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID', ''),
'asin': item_row.get('ASIN', ''), # ASIN
'order_no': item_row.get('Purchase order', ''), # 订单号
'sku_quantity': item_row.get('Quantity', 0), # 退回数量
'sku_amount': item_row.get('Total cost', 0), # Total cost
'currency': item_row.get('Currency code', ''), # Currency code
'data_date': str(item_row.get('Return Date', '')), # Return Date
'erp_sku': item_row.get("SKU", ''), # ERP SKU # SKU1匹配
'shop_code': self.shop_code, # 店铺code
'supplier_code': item_row.get('Vendor code', ''), # 供应商编码
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
}
# 推送数据
rabbit.send_message(push_data)
def run(self, file_name: str):
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
# 读取sku映射关系
relations_dict = api.sku_relations(self.country)
list_data = pd.read_excel(file_name)
# 下载并读取list数据
log.info(f"共计:{len(list_data)} 订单")
new_list_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
return_id = data.get('Return ID')
log.info({"index": i, "return_id": return_id})
# 下载退货详情表格读取数据
item_data = self.__export_item_read_data(return_id)
# 按 'Purchase order' 和 'ASIN' 分组,并对 'Quantity' 和 Total amount 进行求和
item_data_result = item_data.groupby(['Purchase order', 'ASIN', 'Reason'], as_index=False).agg({
'Quantity': 'sum',
'Total amount': 'sum',
})
for _, item_row in item_data_result.iterrows():
relation = relations_dict.get(item_row.get('ASIN'))
erp_sku = relation.get('erp_sku', "")
data_dict = data.to_dict()
data_dict.update({
'Return Date': data_dict['Return Date'].strftime('%m/%d/%Y'),
'Return ID': str(data_dict['Return ID']),
'PO': item_row.get('Purchase order', ""),
'ASIN': item_row.get('ASIN', ""),
'SKU': erp_sku,
'Quantity': item_row.get('Quantity', 0),
# 替换回会数量和金额为详情里面的值
'Return quantity': item_row.get('Quantity', 0), # 替换回会数量
'Reason': item_row.get('Reason', ""),
'Total cost': item_row.get('Total amount', 0), # 替换金额
'Group Name': relation.get("name", ""),
'Group Code': relation.get("code", ""),
})
# 追加数据
new_list_data.append(data_dict)
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "退货明细.xlsx"
# 拼接新的文件名
new_file_name = f"{current_datetime}_{self.country}_{file_name}"
excel.save_xls(new_list_data, new_file_name)
# 推送消息
self.__push_data_queue(new_file_name)
\ No newline at end of file
# coding: utf-8
# 回款明细
import pandas as pd
from DrissionPage import ChromiumPage as Page
from app.helper import rabbitmq
from app.vc import log
from app.vc.interface import AutoInterface
class PaymentPush(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
@staticmethod
def __read_data(file_name):
df = pd.read_excel(file_name, header=None)
# 定位标题行
pay_title = df[df[0].str.contains('Remittance payments', case=False, na=False)].index[0]
inv_title = df[df[0].str.contains('Invoices', case=False, na=False)].index[0]
# 定位表头起始行(跳过标题后的空行)
pay_header = df.loc[pay_title + 1:].notna().any(axis=1).idxmax()
inv_header = df.loc[inv_title + 1:].notna().any(axis=1).idxmax()
# 计算第一个表格的结束位置(第二个标题前的空行)
empty_lines = df.index[df.isnull().all(axis=1)].tolist()
separator = max([x for x in empty_lines if pay_header < x < inv_title], default=inv_title - 1)
# 读取并清理数据
test = separator - pay_header - 1
payments = pd.read_excel(file_name, header=pay_header, nrows=test).dropna(how='all')
invoices = pd.read_excel(file_name, header=inv_header).dropna(how='all')
return [payments, invoices]
def run(self, file_name: str):
payments, invoices = self.__read_data(file_name)
# 将 'Payment Number' 列设置为索引
payments.set_index('Payment Number', inplace=True)
# 转换为字典,orient='index' 表示以索引为键
payments_map = payments.to_dict(orient='index')
log.info(f"共计:{len(invoices)} 订单")
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
i = 0
for _, data in invoices.iterrows():
i += 1
payment_number = data.get("Payment Number")
payment_date = payments_map.get(payment_number, {}).get('Payment Date', '')
platform_payable_amount = data.get('Invoice Amount', '')
if self.country == 'FR' or self.country == 'UK':
platform_payable_amount = data.get('Net Amount Paid', '')
push_data = {
'payment_number': data.get('Payment Number', ''), # 订单id
'order_date': str(data.get('Invoice Date', '')), # 发票时间
'payment_date': str(payment_date),
'order_no': data.get('Invoice Number', 0), # 订单号
'payment_type': data.get('Description', ''), # Description
'platform_payable_amount': platform_payable_amount, # 平台应付金额
'fee_amount': data.get("Terms Discount Taken", ''), # 手续费
'actual_payment': data.get('Amount Paid', ''), # 实际支付金额
'currency': data.get('Invoice Currency', ''), # 货币
'shop_code': self.shop_code, # 店铺code
}
# 推送数据
rabbit.send_message(push_data)
# coding: utf-8
# 退货
import os
from datetime import datetime
import pandas as pd
from DrissionPage.errors import ElementNotFoundError
from app.helper import domain, api, excel, rabbitmq, file
from app.vc import log
from app.vc.interface import AutoInterface
from DrissionPage import ChromiumPage as Page
class ReturnGoods(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
def __page_get(self, url):
host = domain.switch_domain(self.country)
full_url = host + url
self.page.get(full_url, timeout=5)
def __export_item_read_data(self, return_id: str):
file_name = f"return_goods\\{return_id}.xls"
if not os.path.isfile(file_name):
while True:
try:
# 打开退回详情下载明细
self.__page_get(f"katalmonsapp/vendor/members/returns/{return_id}")
self.page.ele("#file-download-button").click.to_download(rename=file_name)
file.wait_for_downloads(file_name)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
self.page.refresh()
# 读取回退商品详情
return pd.read_excel(file_name)
def __push_data_queue(self, file_name):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name, keep_default_na=False, na_values=[])
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID', ''),
'asin': item_row.get('ASIN', ''), # ASIN
'order_no': item_row.get('Purchase order', ''), # 订单号
'sku_quantity': item_row.get('Quantity', 0), # 退回数量
'sku_amount': item_row.get('Total cost', 0), # Total cost
'currency': item_row.get('Currency code', ''), # Currency code
'data_date': str(item_row.get('Return Date', '')), # Return Date
'erp_sku': item_row.get("SKU", ''), # ERP SKU # SKU1匹配
'shop_code': self.shop_code, # 店铺code
'supplier_code': item_row.get('Vendor code', ''), # 供应商编码
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
}
# 推送数据
rabbit.send_message(push_data)
def run(self, file_name: str):
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
# 读取sku映射关系
relations_dict = api.sku_relations(self.country)
# 读取list数据
list_data = pd.read_excel(file_name)
log.info(f"共计:{len(list_data)} 订单")
new_list_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
return_id = data.get('Return ID')
log.info({"index": i, "return_id": return_id})
# 下载退货详情表格读取数据
item_data = self.__export_item_read_data(return_id)
# 按 'Purchase order' 和 'ASIN' 分组,并对 'Quantity' 和 Total amount 进行求和
item_data_result = item_data.groupby(['Purchase order', 'ASIN', 'Reason'], as_index=False).agg({
'Quantity': 'sum',
'Total amount': 'sum',
})
for _, item_row in item_data_result.iterrows():
relation = relations_dict.get(item_row.get('ASIN'))
erp_sku = relation.get('erp_sku', "")
data_dict = data.to_dict()
data_dict.update({
'Return Date': data_dict['Return Date'].strftime('%m/%d/%Y'),
'Return ID': str(data_dict['Return ID']),
'PO': item_row.get('Purchase order', ""),
'ASIN': item_row.get('ASIN', ""),
'SKU': erp_sku,
'Quantity': item_row.get('Quantity', 0),
# 替换回会数量和金额为详情里面的值
'Return quantity': item_row.get('Quantity', 0), # 替换回会数量
'Reason': item_row.get('Reason', ""),
'Total cost': item_row.get('Total amount', 0), # 替换金额
'Group Name': relation.get("name", ""),
'Group Code': relation.get("code", ""),
})
# 追加数据
new_list_data.append(data_dict)
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "退货明细.xlsx"
# 拼接新的文件名
new_file_name = f"{current_datetime}_{self.country}_{file_name}"
excel.save_xls(new_list_data, new_file_name)
# 推送消息
self.__push_data_queue(new_file_name)
# coding: utf-8
# spa
import math
import os
import pandas as pd
from lxml import etree
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from DrissionPage import ChromiumPage as Page
from DrissionPage.errors import ElementNotFoundError
from app.vc import log
from app.vc.interface import AutoInterface
from app.helper import domain, file, excel, rabbitmq, api
class Spa(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
def __page_get(self, url):
host = domain.switch_domain(self.country)
full_url = host + url
self.page.get(full_url, timeout=10)
def __get_report_table_html(self, invoice_id):
while True:
try:
self.__page_get(f"hz/vendor/members/coop?searchText={invoice_id}")
# 点击选项卡
self.page.ele("#a-autoid-2-announce").click()
# 下载报表
self.page.ele(f"#invoiceDownloads-{invoice_id}_2").click()
self.page.wait(1)
# 获取报表表单内容
report_table_html = self.page.ele("#backup-report-table").html
if report_table_html is None or report_table_html == "":
log.warning("表单内容为空,刷新网页")
self.page.refresh()
continue
return report_table_html
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
self.page.refresh()
def __export_item_read_data(self, invoice_id):
file_name = f"spa\\{invoice_id}.xlsx"
if os.path.isfile(file_name):
df = pd.read_excel(file_name, sheet_name=None)
return df
# 获取报表表单内容
report_table_html = self.__get_report_table_html(invoice_id)
tree = etree.HTML(report_table_html)
# 提取所有链接
links = tree.xpath('//table[@id="backup-report-table"]//a/@href')
if len(links) == 0:
# data_list = get_report_agreement_text(invoice_id)
# return {"Accrual For Current Period": pd.DataFrame(data_list)}
return None
for link in links:
# 解析链接中的查询参数
parsed_url = urlparse(link)
query_params = parse_qs(parsed_url.query)
# 提取 filename 参数
filename = query_params.get('fileName', ['未找到文件名'])[0]
report_file_tmp_dir = os.getcwd() + f"\\spa\\{invoice_id}\\{filename}\\"
host = domain.switch_domain(self.country)
report_file = report_file_tmp_dir + "BackupReport.xls"
while True:
self.page.download(host + link, report_file_tmp_dir, show_msg=False)
is_down = file.wait_for_downloads(report_file, 60)
if is_down: break
log.warning(f"下载 {invoice_id} 失败,重新下载")
try:
df = pd.read_excel(report_file)
# 获取表头
headers = df.columns.tolist()
# 要检查的列名
column_names_to_check = ["Rebate In Agreement Currency", "Vendor Funding In Agreement Currency"]
# 判断头文件是否满足读取条件,不满足删除文件夹
header_is_normal = any(column in headers for column in column_names_to_check)
if not header_is_normal:
continue
# 创建 ExcelFile 对象
excel_file = pd.ExcelFile(report_file)
# 获取所有工作表名称
sheet_names = excel_file.sheet_names
for sheet_name in sheet_names:
df = pd.read_excel(report_file, sheet_name=sheet_name)
data = df[df['Asin'].notna()]
excel.save_xls(data, file_name, sheet_name)
return pd.read_excel(file_name, sheet_name=None)
except ValueError:
pass
def __process_large_items(self, item_list, relation_data, coop):
"""处理大数据列表 (item_list 长度 >= 10)"""
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
if not self.__validate_asin(asin):
continue
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
processed_item = item.copy()
processed_item.pop("Title")
processed_item.pop("Asin")
processed_item["Invoice date"] = coop.get("Invoice date")
processed_item['Funding Type'] = coop.get("Funding Type")
processed_item['Asin'] = asin
processed_item['ERP SKU'] = relation.get("erp_sku")
processed_item['Group Name'] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_items.append(processed_item)
return processed_items
def __process_small_items(self, item_list, relation_data, coop):
"""处理小数据列表 (item_list 长度 < 10)"""
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
if asin is None:
asin = item.get('ASIN', None)
if not self.__validate_asin(asin):
continue
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
processed_item = coop.copy()
processed_item.pop("Agreement title")
processed_item.pop("Original balance")
processed_item.pop("Invoice date")
processed_item.pop("Funding Type")
processed_item["Invoice date"] = coop.get("Invoice date")
processed_item['Funding Type'] = coop.get("Funding Type")
processed_item["Order Date"] = item.get("Order Date")
processed_item['Purchase Order'] = relation.get("Purchase Order")
processed_item["Agreement Currency"] = item.get("Agreement Currency")
processed_item["Asin"] = asin
processed_item["ERP SKU"] = relation.get("erp_sku")
processed_item["Group Name"] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_items.append(processed_item)
return processed_items
@staticmethod
def __validate_asin(asin):
"""验证 ASIN 是否有效"""
return asin and not (isinstance(asin, float) and math.isnan(asin))
@staticmethod
def __write_sheet(writer, data, sheet_name):
if not isinstance(sheet_name, str):
sheet_name = str(sheet_name)
log.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
df = pd.DataFrame(data) # 将数据转换为 DataFrame
df.to_excel(writer, sheet_name=sheet_name, index=False)
def __save_excel(self, sheet_data, large_sheet_data, new_file_name):
"""保存数据到 Excel 文件"""
# 初始化 Excel 写入器
with pd.ExcelWriter(new_file_name) as writer:
# 写入小数据
if sheet_data:
log.info(f"保存小数据,共计 {len(sheet_data)} 条")
self.__write_sheet(sheet_data, "Sheet1")
# 写入大数据(使用多线程并行写入不同表)
if large_sheet_data:
log.info(f"保存大数据,共计 {sum(len(data) for data in large_sheet_data.values())} 条")
for sheet_name, data in large_sheet_data.items():
self.__write_sheet(data, sheet_name)
# with ThreadPoolExecutor() as executor:
# for sheet_name, data in large_sheet_data.items():
# executor.submit(write_sheet, writer, data, sheet_name)
log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
def __push_data_queue(self, file_name):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='spa_robot', routing_key='spa_robot', exchange='reports')
log.info("开始读取数据....")
data_dict = pd.read_excel(file_name, sheet_name=None, keep_default_na=False, na_values=[])
log.info("开始推送消息....")
for sheet_name, values in data_dict.items():
for _, item_row in values.iterrows():
if sheet_name == "Sheet1":
parent_id = item_row.get('Invoice ID', "")
else:
parent_id = sheet_name
push_data = {
'ad_date': item_row.get('Invoice date', ""), # spa费用数据日期
'erp_sku': item_row.get('ERP SKU', ""), # ERP SKU
'ad_amount': item_row.get('Original balance', ""), # spa费用金额
'ad_amount_currency': item_row.get('Agreement Currency', ""), # spa费用币制
'funding_type': item_row.get('Funding Type', ""), # 资金类型
'transaction_type': item_row.get('Transaction Type', ""), # 交易类型
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
'asin': item_row.get('Asin', ""), # ASIN
'shop_code': self.shop_code, # 店铺code
'type': 2, # 1 sheet1 2 其他sheet
'parent_id': parent_id, # sheet1 为Invoice ID 其他sheet为sheet名称
'order_no': item_row.get('Purchase Order', ""), # 订单号
}
# 推送数据
rabbit.send_message(push_data)
def run(self, file_name: str):
# 获取数据
relation_data = api.sku_relations(self.country) # 获取 ASIN 与 SKU 的对应关系数据
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
# 读取数据列表
coop_list = pd.read_excel(file_name)
log.info(f"共计: {len(coop_list)} 条数据")
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
file_name = "spa.xlsx"
new_file_name = f"{current_datetime}_{self.country}_{file_name}"
sheet_data = [] # 用于保存小数据
large_sheet_data = {} # 保存大数据(需要分 Sheet)
# 遍历合作列表
for index, coop in coop_list.iterrows():
index += 1
invoice_id = coop.get("Invoice ID") # 获取发票 ID
log.info({"index": index, "invoice_id": invoice_id})
if not invoice_id:
log.warning(f"缺少 Invoice ID,跳过第 {index} 条数据")
continue
# 获取当前发票的 item 列表
item_dict = self.__export_item_read_data(invoice_id)
if item_dict is None:
sheet_data.append(coop)
log.warning(f"{invoice_id} 暂无报告信息")
continue
if len(item_dict) > 1:
for i, value in enumerate(item_dict.values(), start=1):
invoice_id_num = f"{invoice_id}-{i}"
processed_items = self.__process_large_items(value, relation_data, coop)
if processed_items:
large_sheet_data[invoice_id_num] = processed_items
else:
item_list = item_dict.get('Accrual For Current Period')
# 如果是列表且长度 >= 10 则新增sheet
if len(item_list) >= 10:
processed_items = self.__process_large_items(item_list, relation_data, coop)
if processed_items:
large_sheet_data[invoice_id] = processed_items
else:
# 如果是较小的列表
processed_items = self.__process_small_items(item_list, relation_data, coop)
sheet_data.extend(processed_items)
# 保存数据到 Excel 文件
self.__save_excel(sheet_data, large_sheet_data, new_file_name)
# 推送消息
self.__push_data_queue(new_file_name)
pip.exe install -i https://mirrors.cloud.tencent.com/pypi/simple -r requirements.txt
pyinstaller -F -n payment.exe .\src\payment.py
pyinstaller -F -n payment_erp.exe .\src\payment_erp.py
pyinstaller -F -n advertise_erp.exe .\src\advertise_erp.py
pyinstaller -F -n return_goods.exe .\src\return_goods.py
pyinstaller -F -n spa_search.exe .\src\spa_search.py
pyinstaller -F -n easy_storage.exe .\src\easy_storage.py
pyinstaller -F -n diff_spa.exe .\cmd\diff_spa.py
pyinstaller -F -n amazon_vc.exe main.py
rd /s /q build
del *.spec
\ No newline at end of file
import os.path
import pandas as pd
import argparse
class InvoiceIDComparator:
def __init__(self, file_a, file_b, invoice_column_name):
self.file_a = file_a
self.file_b = file_b
self.invoice_column_name = invoice_column_name
def get_invoice_ids_from_excel(self, file_path):
"""从Excel文件中获取所有sheet的Invoice ID"""
excel_file = pd.ExcelFile(file_path)
invoice_ids = set() # 使用集合去重
for sheet_name in excel_file.sheet_names:
# 读取每个sheet的内容
df = excel_file.parse(sheet_name)
# 确保指定的列存在
if self.invoice_column_name in df.columns:
invoice_ids.update(df[self.invoice_column_name].dropna().unique())
invoice_ids.add(sheet_name) # 将sheet名也加入到集合中
return invoice_ids
def compare_invoice_ids(self):
"""比较两个Excel文件中的Invoice ID"""
# 获取文件A中的Invoice ID和所有sheet名称
invoice_ids_a = self.get_invoice_ids_from_excel(self.file_a)
# 获取文件B中的Invoice ID和所有sheet名称
invoice_ids_b = self.get_invoice_ids_from_excel(self.file_b)
only_in_a = invoice_ids_a - invoice_ids_b
only_in_b = invoice_ids_b - invoice_ids_a
# 输出比较结果
print("文件A中存在,但文件B中没有的 Invoice IDs:")
print(only_in_a)
print("\n文件B中存在,但文件A中没有的 Invoice IDs:")
print(only_in_b)
def main():
# 设置命令行参数
parser = argparse.ArgumentParser(description="比较两个Excel文件中的Invoice ID差异")
parser.add_argument('--original_file', default="ContraCogsInvoices.xls", help="原文件路径")
parser.add_argument('--result_file', default="result.xls", help="结果文件路径")
parser.add_argument('--invoice_column', default='Invoice ID', help="Invoice ID列的名称")
# 解析命令行参数
args = parser.parse_args()
if os.path.exists(args.original_file) is False:
raise FileExistsError("源文件不存在")
if os.path.exists(args.result_file) is False:
raise FileExistsError("结果文件不存在")
# 创建 InvoiceIDComparator 实例并进行比较
comparator = InvoiceIDComparator(args.original_file, args.result_file, args.invoice_column)
comparator.compare_invoice_ids()
# 程序入口
if __name__ == "__main__":
try:
main()
except Exception as e:
print(e)
\ No newline at end of file
# coding: utf-8
import json
from datetime import datetime
import requests
import xmltodict
import pandas as pd
from datetime import datetime
class YcClient:
......
# coding: utf-8
import os
from app.helper import file, domain, helper
from app.vc.payment import Payment
from app.vc.return_goods import ReturnGoods
from app.vc.spa import Spa
from DrissionPage import ChromiumPage
from dotenv import load_dotenv
if __name__ == '__main__':
load_dotenv()
page = ChromiumPage()
page.set.load_mode.normal()
page.set.when_download_file_exists('overwrite')
# 下载目录
download_path = os.getcwd()
# 检查下载目录是否存在,如果不存在则创建
file.make_dir(download_path)
# 设置下载路径,确保在打开浏览器前设置
page.set.download_path(download_path)
try:
country = helper.get_input_with_default("国家: [ DE, FR, JP, CA, UK, US ]", "US")
shop_code = helper.get_input_with_default("店铺编码: [ DE-VC, FR-VC, JP-VC, CA-VC, UK-VC, VECELO ]", "VECELO")
payee_code = helper.get_input_with_default("回款Code: [ 详情页url参数 payeeCode ]", "VECET")
action = helper.get_input_with_default("功能:[ spa, return, payment ]", "")
file_name = helper.get_input_with_default("文件名 : [ 例如: ContraCogsInvoices.xls ]", "")
if action.lower() == "payment":
object_instate = Payment(page, country, payee_code, shop_code)
elif action.lower() == "return":
object_instate = ReturnGoods(page, country, payee_code, shop_code)
elif action.lower() == "spa":
object_instate = Spa(page, country, payee_code, shop_code)
else:
raise Exception("请输入正确的功能")
if file_name == "":
raise Exception("请输入文件名")
domain.domain_page(page, country)
object_instate.run(file_name)
except KeyboardInterrupt:
pass
except Exception as e:
helper.print_trace("main", e)
finally:
page.close()
# coding: utf-8
# 广告费
import os
import warnings
import pandas as pd
from dotenv import load_dotenv
from helper import helper, logger, redisx, rabbitmq
from helper import api
# 忽略 openpyxl 样式警告
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
country = None
payeeCode = None
shop_code = None
load_dotenv()
log = logger.ConsoleLog()
rdb = redisx.RedisClient()
def main():
# 读取sku映射关系
relations_dict = api.sku_relations(country)
file_name = "Sponsored_Products_Advertised_product_report.xlsx"
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
result = pd.read_excel(file_name, keep_default_na=False, na_values=[])
log.info(f"共计:{len(result)} 订单")
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='advertising_robot', routing_key='advertising_robot', exchange='reports')
for _, data in result.iterrows():
relation = relations_dict.get(data.get('Advertised ASIN',""), {})
push_data = {
'ad_date': str(data.get("End Date", "")), # 日期
'erp_sku': relation.get('erp_sku', ""), # erp_sku
'ad_amount': data.get("Spend", ""), # 金额
'ad_amount_currency': data.get("Currency", ""), # 币种
'operator_name': "", # 运营名
'group_code': relation.get("code", ""),
'asin': data.get("Advertised ASIN", ""), # asin
'shop_code': shop_code, # 店铺code
}
rabbit.send_message(push_data)
if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
shop_code = helper.get_input_with_default("店铺编码(DE-VC,FR-VC,JP-VC,CA-VC,UK-VC,VECELO])", "VECELO")
main()
except KeyboardInterrupt:
pass
except Exception as e:
log.error(e)
helper.print_trace("main", e)
# coding: utf-8
# 回款明细
import json
import os
import re
import urllib.parse
import warnings
from datetime import datetime
from decimal import Decimal
import pandas as pd
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError
from dotenv import load_dotenv
from lxml import etree
from helper import helper, excel, file, domain, logger, redisx, rabbitmq
country = None
payeeCode = None
shop_code = None
load_dotenv()
log = logger.ConsoleLog()
rdb = redisx.RedisClient()
page = ChromiumPage()
page.set.load_mode.normal()
page.set.when_download_file_exists('overwrite')
# 下载目录
download_path = os.getcwd()
# 检查下载目录是否存在,如果不存在则创建
helper.make_dir(download_path)
# 设置下载路径,确保在打开浏览器前设置
page.set.download_path(download_path)
# 忽略 openpyxl 样式警告
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
def page_get(url):
host = domain.switch_domain(country)
full_url = host + url
page.get(full_url, timeout=5)
def export_list_read_data():
new_file_name = 'new_Payments.xlsx'
if os.path.isfile(new_file_name):
df = pd.read_excel(new_file_name)
return df
file_name = 'Payments.xlsx'
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
all_df = pd.read_excel(file_name, header=None)
# 找到所有空行的索引,这里假设完全空的行为表头之间的分界线
empty_rows = all_df[all_df.isnull().all(axis=1)].index.tolist()
# 定位表头与数据的分隔
first_header_start = empty_rows[1] + 1 # 第一个表头开始的行
second_header_start = empty_rows[2] + 4 # 第二个表头开始的行
first_df = pd.read_excel(file_name, skiprows=first_header_start, nrows=second_header_start - 7)
second_df = pd.read_excel(file_name, skiprows=second_header_start)
# 定义正则表达式模式,匹配包含 'Price' 或 'PCR' 或 'XXXXXXXX/XXXX/' 的描述
pattern = r'Price Claim|PCR|Missed Adjustment|Shortage Claim|^[A-Z0-9]{8}/[A-Z0-9]{4}/'
# 过滤符合条件的行
filtered_second_df = second_df[second_df['Description'].str.contains(pattern, na=False, regex=True)]
merged_df = pd.merge(filtered_second_df, first_df[['Payment Number', 'Payment Date']], on='Payment Number',
how='left')
excel.save_xls(merged_df, new_file_name, "Remittance payments")
return merged_df
def invoice_details(invoice_number, last_two, last_three):
if len(invoice_number) > 8:
# 检查后两位是否在测试列表中
if last_two in ["MA", "PC"]:
invoice_number = invoice_number[:-2] # 去掉后两位
if last_three in ["PCR"]:
invoice_number = invoice_number[:-3] # 去掉最后三位
if last_three in ["+SC", "SC-"]:
invoice_number = invoice_number[:-3] # 去掉最后三位
invoice_number = invoice_number + 'SCR'
if last_two == "SC":
invoice_number = invoice_number + 'R'
params = {
"invoiceNumber": invoice_number,
"payeeCode": payeeCode,
"activeTab": "lineItems",
}
# 将字典转换为 URL 查询参数
query_string = urllib.parse.urlencode(params)
full_url = f"hz/vendor/members/inv-mgmt/invoice-details?" + query_string
page_get(full_url)
def export_details_read_data(file_name):
count = 0
while True:
try:
page.ele("#line-items-export-to-spreadsheet-announce").click.to_download(rename=file_name)
file.wait_for_downloads(file_name)
excel.remove_last_comma(file_name)
break
except ElementNotFoundError:
if count == 3: return None
count += 1
log.warning("导出按钮不存在刷新网页")
page.refresh()
return pd.read_csv(file_name)
def get_content(tree, row_index: int, cell_index: int) -> str:
"""获取指定行和列的内容,如果没有找到,则返回 None。"""
content = tree.xpath(f'//*[@role="row"][{row_index}]/*[@role="cell"][{cell_index}]/text()')
return content[0] if content else None
def get_po_code(index, po_id) -> dict:
result = {
"index": index,
"po_id": po_id
}
po_id = po_id[:8]
cache_key = "payment"
payment_cache = rdb.get_client().hget(cache_key, po_id)
if payment_cache:
cache_value = json.loads(payment_cache)
result["vendor"] = cache_value['vendor']
result["payment_terms"] = cache_value['payment_terms']
return result
page_get(f"po/vendor/members/po-mgmt/order?poId={po_id}")
po_table = page.ele("#po-header", timeout=5).html
# 使用 lxml 解析 HTML
tree = etree.HTML(po_table)
# 获取 Vendor 内容
result["vendor"] = get_content(tree, 2, 2)
# 正则表达式查找数字和%之间的内容
match = re.search(r'Payment .*?(\d+%)', po_table)
if match:
result["payment_terms"] = match.group(1)[:-1] # 去掉%
else:
result["payment_terms"] = 0
cache_value = result.copy()
del cache_value['index']
rdb.get_client().hset(cache_key, po_id, json.dumps(cache_value))
return result
def price_extract_data(html_content):
# 使用正则表达式删除所有 HTML 注释
html_content = re.sub(r'<!--.*?-->', '', html_content)
# 使用 lxml 解析 HTML
tree = etree.HTML(html_content)
# 提取所有行的数据
rows = tree.xpath('//tr[contains(@class, "mt-row")]')
data_list = []
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'PO_NUMBER': row.xpath('string(./td[@data-column="PO_NUMBER"]/span/span/a)').strip(),
'ASIN': row.xpath('string(./td[@data-column="ASIN"]/span/span/a)').strip(),
'EXTERNAL_ID': row.xpath('string(./td[@data-column="EXTERNAL_ID"]/span/span/a)').strip(),
'TITLE': row.xpath('string(./td[@data-column="TITLE"])').strip(),
'QUANTITY': row.xpath('string(./td[@data-column="QUANTITY"])').strip(),
'INVOICE_COST': row.xpath('string(./td[@data-column="INVOICE_COST"])').strip().replace('$', ''),
'PO_COST': row.xpath('string(./td[@data-column="PO_COST"])').strip().replace('$', ''),
'INITIAL_RESEARCH_COST': row.xpath('string(./td[@data-column="INITIAL_RESEARCH_COST"])').strip().replace(
'$', ''),
'RESOLUTION_DECISION': row.xpath('string(./td[@data-column="RESOLUTION_DECISION"])').strip(),
'RESOLUTION_COST': row.xpath('string(./td[@data-column="RESOLUTION_COST"])').strip().replace('$', '')
}
# 如果字段为空则设为空字符串
for key in data:
if not data[key]:
data[key] = "" # 将 None 转为 ""
data_list.append(data)
return data_list
def line_items_data(html_content):
# 使用正则表达式删除所有 HTML 注释
html_content = re.sub(r'<!--.*?-->', '', html_content)
# 使用 lxml 解析 HTML
tree = etree.HTML(html_content)
# 提取所有行的数据
rows = tree.xpath('//tr[contains(@class, "mt-row")]')
data_list = []
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'PO': row.xpath('string(./td[@data-column="PO_NUMBER"]/span/span/a)').strip(),
'External ID': row.xpath('string(./td[@data-column="EXTERNAL_ID"])').strip(),
'ASIN': row.xpath('string(./td[@data-column="ASIN"]/span/span/a)').strip(),
'TITLE': row.xpath('string(./td[@data-column="DESCRIPTION"])').strip(),
'Model': row.xpath('string(./td[@data-column="MODEL_NUMBER"])').strip(),
'Freight Term': row.xpath('string(./td[@data-column="FREIGHT_TERM"])').strip(),
'Qty': row.xpath('string(./td[@data-column="QUANTITY"])').strip(),
'Unit Cost': row.xpath('string(./td[@data-column="UNIT_COST"])').strip(),
'Amount': row.xpath('string(./td[@data-column="TOTAL_AMOUNT"])').strip(),
'Shortage quantity': row.xpath('string(./td[@data-column="SHORTAGE_QUANTITY"])').strip(),
'Amount shortage': row.xpath('string(./td[@data-column="SHORTAGE_AMOUNT"])').strip(),
'Last received date': row.xpath('string(./td[@data-column="LAST_RECEIVED_DATE"])').strip(),
'ASIN received': row.xpath('string(./td[@data-column="RECEIVED_ASIN"])').strip(),
'Quantity received': row.xpath('string(./td[@data-column="RECEIVED_QUANTITY"])').strip(),
'Unit cost': row.xpath('string(./td[@data-column="RECEIVED_COST_PRICE"])').strip(),
'Amount received': row.xpath('string(./td[@data-column="RECEIVED_AMOUNT"])').strip(),
}
# 如果字段为空则设为空字符串
for key in data:
if not data[key]:
data[key] = "" # 将 None 转为 ""
data_list.append(data)
return data_list
def calculate_unit_cost(data_list):
"""计算差异金额单价并返回金额"""
unit_cost_float = Decimal(data_list['INVOICE_COST']) - Decimal(data_list['INITIAL_RESEARCH_COST'])
unit_cost = unit_cost_float * int(data_list['QUANTITY'])
return unit_cost_float, float(f"{unit_cost:.2f}")
def handle_price_data(price_data_list, invoice_amount):
total_price_data_amount = 0
# 计算总金额
for data_list in price_data_list:
unit_cost_float, amount = calculate_unit_cost(data_list)
total_price_data_amount += amount
result = []
invoice_amount = abs(float(f"{invoice_amount:.2f}"))
# 如果总金额等于列表金额则已回款
if total_price_data_amount == invoice_amount:
for data_list in price_data_list:
unit_cost_float, amount = calculate_unit_cost(data_list)
data = data_list.copy()
data['Quantity received'] = data_list['QUANTITY']
data['UnitCost'] = unit_cost_float
data['Amount'] = f"${amount:.2f}"
data['Shortage quantity'] = '0'
result.append(data)
else:
for data_list in price_data_list:
unit_cost_float, amount = calculate_unit_cost(data_list)
if amount == invoice_amount:
data = data_list.copy()
data['Quantity received'] = data_list['QUANTITY']
data['UnitCost'] = unit_cost_float
data['Amount'] = f"${amount:.2f}"
data['Shortage quantity'] = '0' if data_list['RESOLUTION_DECISION'] == "Approved" else '1'
result.append(data)
break
return result
def handle_data(detail_datum, vendor, deduction_points):
"""处理正常数据"""
amount = detail_datum.get('Amount', '$0.00') # 默认值设为 '$0.00' 以避免错误
amount = float(amount.replace('$', '').replace(',', ''))
# 如果是0则回款完成
is_finished = "否"
shortage_quantity = detail_datum.get('Shortage quantity', -1)
if shortage_quantity == '0':
is_finished = "是"
amount_after_deduction = amount
if deduction_points > 0:
# 计算扣除后的金额
amount_after_deduction = amount - (amount * (deduction_points / 100))
# 复制原始行数据,避免直接修改
record = detail_datum.copy()
record.update({"Amount": amount})
record["IsFinished"] = is_finished
record["DeductionPoints"] = f"{deduction_points}%" # 拼接百分号
record["Code"] = vendor
record["AmountAfterDeduction"] = amount_after_deduction
return record
def main():
list_data = export_list_read_data()
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "回款数据.xlsx"
# 拼接新的文件名
new_file_name = f"{current_datetime}_{country}_{file_name}"
excel.save_xls(list_data, new_file_name, "Remittance payments")
log.info(f"共计:{len(list_data)} 订单")
all_normal_pay_data = []
all_price_pay_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
invoice_number = data.get("Invoice Number")
invoice_amount = data.get("Invoice Amount")
# 获取当前订单的Payee和优惠比例
vendor_payment_terms = get_po_code(i, invoice_number)
log.info(vendor_payment_terms)
vendor = vendor_payment_terms['vendor']
deduction_points = int(vendor_payment_terms['payment_terms'])
# 处理单号主要为了进入详情页
last_two = invoice_number[-2:] # 取后两位
last_three = invoice_number[-3:] # 取后三位
# 判断是否为争议订单
if len(invoice_number) > 8 and (last_three == "PCR" or last_two == "PC"):
cache_key = "price_data"
price_data = rdb.get_client().hget(cache_key, invoice_number)
if price_data:
price_data = json.loads(price_data)
else:
# 进入详情页
invoice_details(invoice_number, last_two, last_three)
# 点击争议价tab
page.ele("#pd").click()
log.debug("等待争议数据加载,10秒后获取表单数据")
page.wait(10)
table_html = page.ele("#priceDiscrepancyWithDMSGridForm", timeout=5).html
# 抓取表单数据
price_data = price_extract_data(table_html)
# 缓存数据
rdb.get_client().hset(cache_key, invoice_number, json.dumps(price_data))
# 争议回款
price_data = handle_price_data(price_data, invoice_amount)
price_pay_data = []
for detail_datum in price_data:
# 争议回款数据
format_price_data = handle_data(detail_datum, vendor, deduction_points)
# 将处理后的记录添加到临时列表
price_pay_data.append(format_price_data)
# 添加到汇总列表
all_price_pay_data.append(pd.DataFrame(price_pay_data))
else:
cache_key = "item_data"
detail_data = rdb.get_client().hget(cache_key, invoice_number)
if detail_data:
detail_data = json.loads(detail_data)
else:
# 进入详情页
invoice_details(invoice_number, last_two, last_three)
page.wait(3)
table_html = page.ele("#invoiceLineItems", timeout=5).html
# 抓取表单数据
detail_data = line_items_data(table_html)
# 缓存数据
rdb.get_client().hset(cache_key, invoice_number, json.dumps(detail_data))
# 初始化列表存储新字段数据
normal_pay_data = []
for detail_datum in detail_data:
# 正常回款数据
success_data = handle_data(detail_datum, vendor, deduction_points)
# 将处理后的记录添加到临时列表
normal_pay_data.append(success_data)
# 添加到汇总列表
all_normal_pay_data.append(pd.DataFrame(normal_pay_data))
if all_normal_pay_data:
# 将所有数据合并为一个 DataFrame
normal_pay_summary = pd.concat(all_normal_pay_data, ignore_index=True)
excel.save_xls(normal_pay_summary, new_file_name, "正常回款导出明细")
if all_price_pay_data:
price_pay_summary = pd.concat(all_price_pay_data, ignore_index=True)
excel.save_xls(price_pay_summary, new_file_name, "Price导出明细")
# 推送消息
push_data_queue(new_file_name)
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
data = pd.read_excel(file_name, keep_default_na=False, na_values=[])
for _, item_row in data.iterrows():
push_data = {
'payment_number': item_row.get('Payment Number', ''), # 订单id
'order_date': str(item_row.get('Invoice Date', '')), # 发票时间
'payment_date': str(item_row.get('Payment Date', '')), # 支付时间
'order_no': item_row.get('Invoice Number', 0), # 订单号
'payment_type': item_row.get('Description', ''), # Description
'platform_payable_amount': item_row.get('Invoice Amount', ''), # 平台应付金额
'fee_amount': item_row.get("Terms Discount Taken", ''), # 手续费
'actual_payment': item_row.get('Amount Paid', ''), # 实际支付金额
'currency': item_row.get('Invoice Currency', ''), # 货币
'shop_code': shop_code, # 店铺code
}
# 推送数据
rabbit.send_message(push_data)
if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
payeeCode = helper.get_input_with_default("payeeCode", "VECET")
shop_code = helper.get_input_with_default("店铺编码(DE-VC,FR-VC,JP-VC,CA-VC,UK-VC,VECELO])", "VECELO")
domain.domain_page(page, country)
main()
page.close()
except KeyboardInterrupt:
pass
except Exception as e:
log.error(e)
helper.print_trace("main", e)
# coding: utf-8
# 回款明细
import os
import warnings
import pandas as pd
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError
from dotenv import load_dotenv
from helper import helper, file, domain, logger, redisx, rabbitmq
country = None
shop_code = None
load_dotenv()
log = logger.ConsoleLog()
rdb = redisx.RedisClient()
page = ChromiumPage()
page.set.load_mode.normal()
page.set.when_download_file_exists('overwrite')
# 下载目录
download_path = os.getcwd()
# 检查下载目录是否存在,如果不存在则创建
helper.make_dir(download_path)
# 设置下载路径,确保在打开浏览器前设置
page.set.download_path(download_path)
# 忽略 openpyxl 样式警告
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
def page_get(url):
host = domain.switch_domain(country)
full_url = host + url
page.get(full_url, timeout=5)
def export_list_read_data():
file_name = 'Payments.xlsx'
# try:
# while True:
# page_get(f"hz/vendor/members/remittance/home")
#
# # 选择日期下拉框
# page.ele("#date-range-option-wrap").click()
# page.wait(1)
# # 点击第一个选项
# page.ele("#date-range-option_0").click()
# page.wait(1)
# # 点击搜索按钮
# page.ele("#remittanceSearchForm-submit-aui-button").click()
# page.wait(2)
#
# page.ele("#remittance-home-select-all").click()
# page.ele("#remittance-home-export-link").click.to_download()
# is_down = file.wait_for_downloads(file_name, 30)
# if is_down: break
# log.warning(f"下载失败,重新下载")
# except ElementNotFoundError:
# log.error("页面加载失败,刷新重新加载")
# page.refresh()
df = pd.read_excel(file_name, header=None)
# 定位标题行
pay_title = df[df[0].str.contains('Remittance payments', case=False, na=False)].index[0]
inv_title = df[df[0].str.contains('Invoices', case=False, na=False)].index[0]
# 定位表头起始行(跳过标题后的空行)
pay_header = df.loc[pay_title + 1:].notna().any(axis=1).idxmax()
inv_header = df.loc[inv_title + 1:].notna().any(axis=1).idxmax()
# 计算第一个表格的结束位置(第二个标题前的空行)
empty_lines = df.index[df.isnull().all(axis=1)].tolist()
separator = max([x for x in empty_lines if pay_header < x < inv_title], default=inv_title - 1)
# 读取并清理数据
test = separator - pay_header - 1
payments = pd.read_excel(file_name, header=pay_header, nrows=test).dropna(how='all')
invoices = pd.read_excel(file_name, header=inv_header).dropna(how='all')
return [payments, invoices]
def main():
payments, invoices = export_list_read_data()
# 将 'Payment Number' 列设置为索引
payments.set_index('Payment Number', inplace=True)
# 转换为字典,orient='index' 表示以索引为键
payments_map = payments.to_dict(orient='index')
log.info(f"共计:{len(invoices)} 订单")
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
i = 0
for _, data in invoices.iterrows():
i += 1
payment_number = data.get("Payment Number")
payment_date = payments_map.get(payment_number, {}).get('Payment Date', '')
platform_payable_amount = data.get('Invoice Amount', '')
if country == 'FR' or country == 'UK':
platform_payable_amount = data.get('Net Amount Paid', '')
push_data = {
'payment_number': data.get('Payment Number', ''), # 订单id
'order_date': str(data.get('Invoice Date', '')), # 发票时间
'payment_date': str(payment_date),
'order_no': data.get('Invoice Number', 0), # 订单号
'payment_type': data.get('Description', ''), # Description
'platform_payable_amount': platform_payable_amount, # 平台应付金额
'fee_amount': data.get("Terms Discount Taken", ''), # 手续费
'actual_payment': data.get('Amount Paid', ''), # 实际支付金额
'currency': data.get('Invoice Currency', ''), # 货币
'shop_code': shop_code, # 店铺code
}
# 推送数据
rabbit.send_message(push_data)
if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
shop_code = helper.get_input_with_default("店铺编码(DE-VC,FR-VC,JP-VC,CA-VC,UK-VC,VECELO])", "VECELO")
domain.domain_page(page, country)
main()
except KeyboardInterrupt:
pass
except Exception as e:
log.error(e)
helper.print_trace("main", e)
# coding: utf-8
# 导出退款记录
import os
from datetime import datetime
import pandas as pd
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError
from dotenv import load_dotenv
from helper import helper, excel, file, domain, logger, api, rabbitmq
country = None
shop_code = None
load_dotenv()
log = logger.ConsoleLog()
page = ChromiumPage()
page.set.load_mode.normal()
page.set.when_download_file_exists('overwrite')
# 下载目录
download_path = os.getcwd()
# 检查下载目录是否存在,如果不存在则创建
helper.make_dir(download_path)
# 设置下载路径,确保在打开浏览器前设置
page.set.download_path(download_path)
def page_get(url):
host = domain.switch_domain(country)
full_url = host + url
page.get(full_url, timeout=5)
def export_list_read_data():
file_name = "Return_Summary.xls"
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
return pd.read_excel(file_name)
def export_item_read_data(return_id):
file_name = f"{country}_return_goods\\{return_id}.xls"
if not os.path.isfile(file_name):
while True:
try:
# 打开退回详情下载明细
page_get(f"katalmonsapp/vendor/members/returns/{return_id}")
page.ele("#file-download-button").click.to_download(rename=file_name)
file.wait_for_downloads(file_name)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
page.refresh()
# 读取回退商品详情
return pd.read_excel(file_name)
def main():
# 读取sku映射关系
relations_dict = api.sku_relations(country)
# 下载并读取list数据
list_data = export_list_read_data()
log.info(f"共计:{len(list_data)} 订单")
new_list_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
return_id = data.get('Return ID')
log.info({"index": i, "return_id": return_id})
# 下载退货详情表格读取数据
item_data = export_item_read_data(return_id)
# 按 'Purchase order' 和 'ASIN' 分组,并对 'Quantity' 和 Total amount 进行求和
item_data_result = item_data.groupby(['Purchase order', 'ASIN', 'Reason'], as_index=False).agg({
'Quantity': 'sum',
'Total amount': 'sum',
})
for _, item_row in item_data_result.iterrows():
relation = relations_dict.get(item_row.get('ASIN'))
erp_sku = relation.get('erp_sku', "")
data_dict = data.to_dict()
data_dict.update({
'Return Date': data_dict['Return Date'].strftime('%m/%d/%Y'),
'Return ID': str(data_dict['Return ID']),
'PO': item_row.get('Purchase order', ""),
'ASIN': item_row.get('ASIN', ""),
'SKU': erp_sku,
'Quantity': item_row.get('Quantity', 0),
# 替换回会数量和金额为详情里面的值
'Return quantity': item_row.get('Quantity', 0), # 替换回会数量
'Reason': item_row.get('Reason', ""),
'Total cost': item_row.get('Total amount', 0), # 替换金额
'Group Name': relation.get("name", ""),
'Group Code': relation.get("code", ""),
})
# 追加数据
new_list_data.append(data_dict)
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "退货明细.xlsx"
# 拼接新的文件名
new_file_name = f"{current_datetime}_{country}_{file_name}"
excel.save_xls(new_list_data, new_file_name)
# 推送消息
push_data_queue(new_file_name)
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name, keep_default_na=False, na_values=[])
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID', ''),
'asin': item_row.get('ASIN', ''), # ASIN
'order_no': item_row.get('Purchase order', ''), # 订单号
'sku_quantity': item_row.get('Quantity', 0), # 退回数量
'sku_amount': item_row.get('Total cost', 0), # Total cost
'currency': item_row.get('Currency code', ''), # Currency code
'data_date': str(item_row.get('Return Date', '')), # Return Date
'erp_sku': item_row.get("SKU", ''), # ERP SKU # SKU1匹配
'shop_code': shop_code, # 店铺code
'supplier_code': item_row.get('Vendor code', ''), # 供应商编码
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
}
# 推送数据
rabbit.send_message(push_data)
if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
shop_code = helper.get_input_with_default("店铺编码(DE-VC,FR-VC,JP-VC,CA-VC,UK-VC,VECELO])", "VECELO")
domain.domain_page(page, country)
main()
page.close()
except KeyboardInterrupt:
pass
except Exception as e:
log.error(e)
helper.print_trace("main", e)
# coding: utf-8
# spa查询
import math
import os
from datetime import datetime
from urllib.parse import urlparse, parse_qs
import pandas as pd
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError
from dotenv import load_dotenv
from lxml import etree
from helper import helper, excel, file, domain, logger, api, rabbitmq
country = None
shop_code = None
load_dotenv()
log = logger.ConsoleLog()
page = ChromiumPage()
page.set.load_mode.normal()
page.set.when_download_file_exists('overwrite')
# 下载目录
download_path = os.getcwd()
# 检查下载目录是否存在,如果不存在则创建
helper.make_dir(download_path)
# 设置下载路径,确保在打开浏览器前设置
page.set.download_path(download_path)
def page_get(url):
host = domain.switch_domain(country)
full_url = host + url
page.get(full_url, timeout=10)
def export_list_read_data():
file_name = "spa-de.xls"
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
return pd.read_excel(file_name)
def get_report_table_html(invoice_id):
while True:
try:
page_get(f"hz/vendor/members/coop?searchText={invoice_id}")
# 点击选项卡
page.ele("#a-autoid-2-announce").click()
# 下载报表
page.ele(f"#invoiceDownloads-{invoice_id}_2").click()
page.wait(1)
# 获取报表表单内容
report_table_html = page.ele("#backup-report-table").html
if report_table_html is None or report_table_html == "":
log.warning("表单内容为空,刷新网页")
page.refresh()
continue
return report_table_html
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
page.refresh()
def get_report_agreement_text(invoice_id):
page.refresh()
while True:
try:
# 点击选项卡
page.ele("#a-autoid-2-announce").click()
# 下载报表
page.ele(f"#invoiceDownloads-{invoice_id}_3").click()
page.wait(3)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
page.refresh()
# 获取报表表单内容
tree = etree.HTML(page.html)
# 找到包含表格的部分, # 获取第一个(也是唯一一个)匹配的表格元素
table = tree.xpath('//table[@width="90%"]')
if len(table) == 0:
return []
table = table[0]
# 获取所有表格行(tr),跳过表头
rows = table.xpath('.//tr[position()>1]')
# 定义一个列表,用于存储所有行数据
data_list = []
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'ASIN': row.xpath('string(./td[1])').strip(),
'UPC': row.xpath('string(./td[2])').strip(),
'Price Protected Quantity': row.xpath('string(./td[3])').strip(),
'Pending PO Codes': row.xpath('string(./td[4])').strip(),
'Description': row.xpath('string(./td[5])').strip(),
'Old Cost': row.xpath('string(./td[6])').strip().replace('$', ''),
'New Cost': row.xpath('string(./td[7])').strip().replace('$', ''),
'Delta': row.xpath('string(./td[8])').strip().replace('$', ''),
'Total by ASIN': row.xpath('string(./td[9])').strip().replace('$', '')
}
# 如果字段为空则设为空字符串
for key in data:
if not data[key]:
data[key] = "" # 将 None 转为 ""
# 将处理后的数据字典添加到列表
data_list.append(data)
return data_list
def export_item_read_data(invoice_id):
file_name = f"{country}_spa\\{invoice_id}.xlsx"
if os.path.isfile(file_name):
df = pd.read_excel(file_name, sheet_name=None)
return df
# 获取报表表单内容
report_table_html = get_report_table_html(invoice_id)
tree = etree.HTML(report_table_html)
# 提取所有链接
links = tree.xpath('//table[@id="backup-report-table"]//a/@href')
if len(links) == 0:
data_list = get_report_agreement_text(invoice_id)
return {"Accrual For Current Period": pd.DataFrame(data_list)}
for link in links:
# 解析链接中的查询参数
parsed_url = urlparse(link)
query_params = parse_qs(parsed_url.query)
# 提取 filename 参数
filename = query_params.get('fileName', ['未找到文件名'])[0]
report_file_tmp_dir = os.getcwd() + f"\\spa\\{invoice_id}\\{filename}\\"
host = domain.switch_domain(country)
report_file = report_file_tmp_dir + "BackupReport.xls"
while True:
page.download(host + link, report_file_tmp_dir, show_msg=False)
is_down = file.wait_for_downloads(report_file, 60)
if is_down: break
log.warning(f"下载 {invoice_id} 失败,重新下载")
try:
df = pd.read_excel(report_file)
# 获取表头
headers = df.columns.tolist()
# 要检查的列名
column_names_to_check = ["Rebate In Agreement Currency", "Vendor Funding In Agreement Currency"]
# 判断头文件是否满足读取条件,不满足删除文件夹
header_is_normal = any(column in headers for column in column_names_to_check)
if not header_is_normal:
continue
# 创建 ExcelFile 对象
excel_file = pd.ExcelFile(report_file)
# 获取所有工作表名称
sheet_names = excel_file.sheet_names
for sheet_name in sheet_names:
df = pd.read_excel(report_file, sheet_name=sheet_name)
data = df[df['Asin'].notna()]
excel.save_xls(data, file_name, sheet_name)
return pd.read_excel(file_name, sheet_name=None)
except ValueError:
pass
def main():
# 获取数据
relation_data = api.sku_relations(country) # 获取 ASIN 与 SKU 的对应关系数据
coop_list = export_list_read_data() # 获取合作数据列表
log.info(f"共计: {len(coop_list)} 条数据")
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
file_name = "spa.xlsx"
new_file_name = f"{current_datetime}_{country}_{file_name}"
sheet_data = [] # 用于保存小数据
large_sheet_data = {} # 保存大数据(需要分 Sheet)
# 遍历合作列表
for index, coop in coop_list.iterrows():
index += 1
invoice_id = coop.get("Invoice ID") # 获取发票 ID
log.info({"index": index, "invoice_id": invoice_id})
if not invoice_id:
log.warning(f"缺少 Invoice ID,跳过第 {index} 条数据")
continue
# 获取当前发票的 item 列表
item_dict = export_item_read_data(invoice_id)
if item_dict is None:
log.warning(f"{invoice_id} 暂无报告信息")
continue
if len(item_dict) > 1:
for i, value in enumerate(item_dict.values(), start=1):
invoice_id_num = f"{invoice_id}-{i}"
processed_items = process_large_items(value, relation_data, coop)
if processed_items:
large_sheet_data[invoice_id_num] = processed_items
else:
item_list = item_dict.get('Accrual For Current Period')
# 如果是列表且长度 >= 10 则新增sheet
if len(item_list) >= 10:
processed_items = process_large_items(item_list, relation_data, coop)
if processed_items:
large_sheet_data[invoice_id] = processed_items
else:
# 如果是较小的列表
processed_items = process_small_items(item_list, relation_data, coop)
sheet_data.extend(processed_items)
# 保存数据到 Excel 文件
save_excel(sheet_data, large_sheet_data, new_file_name)
# 推送消息
push_data_queue(new_file_name)
def process_large_items(item_list, relation_data, coop):
"""处理大数据列表 (item_list 长度 >= 10)"""
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
if not validate_asin(asin):
continue
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
processed_item = item.copy()
processed_item.pop("Title")
processed_item.pop("Asin")
processed_item["Invoice date"] = coop.get("Invoice date")
processed_item['Funding Type'] = coop.get("Funding Type")
processed_item['Asin'] = asin
processed_item['ERP SKU'] = relation.get("erp_sku")
processed_item['Group Name'] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_items.append(processed_item)
return processed_items
def process_small_items(item_list, relation_data, coop):
"""处理小数据列表 (item_list 长度 < 10)"""
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
if asin is None:
asin = item.get('ASIN', None)
if not validate_asin(asin):
continue
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
processed_item = coop.copy()
processed_item.pop("Agreement title")
processed_item.pop("Original balance")
processed_item.pop("Invoice date")
processed_item.pop("Funding Type")
processed_item["Invoice date"] = coop.get("Invoice date")
processed_item['Funding Type'] = coop.get("Funding Type")
processed_item["Order Date"] = item.get("Order Date")
processed_item['Purchase Order'] = relation.get("Purchase Order")
processed_item["Agreement Currency"] = item.get("Agreement Currency")
processed_item["Asin"] = asin
processed_item["ERP SKU"] = relation.get("erp_sku")
processed_item["Group Name"] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_items.append(processed_item)
return processed_items
def validate_asin(asin):
"""验证 ASIN 是否有效"""
return asin and not (isinstance(asin, float) and math.isnan(asin))
# 创建一个写入函数
def write_sheet(writer, data, sheet_name):
if not isinstance(sheet_name, str):
sheet_name = str(sheet_name)
log.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
df = pd.DataFrame(data) # 将数据转换为 DataFrame
df.to_excel(writer, sheet_name=sheet_name, index=False)
def save_excel(sheet_data, large_sheet_data, new_file_name):
"""保存数据到 Excel 文件"""
# 初始化 Excel 写入器
with pd.ExcelWriter(new_file_name) as writer:
# 写入小数据
if sheet_data:
log.info(f"保存小数据,共计 {len(sheet_data)} 条")
write_sheet(writer, sheet_data, "Sheet1")
# 写入大数据(使用多线程并行写入不同表)
if large_sheet_data:
log.info(f"保存大数据,共计 {sum(len(data) for data in large_sheet_data.values())} 条")
for sheet_name, data in large_sheet_data.items():
write_sheet(writer, data, sheet_name)
# with ThreadPoolExecutor() as executor:
# for sheet_name, data in large_sheet_data.items():
# executor.submit(write_sheet, writer, data, sheet_name)
log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='spa_robot', routing_key='spa_robot', exchange='reports')
log.info("开始读取数据....")
data_dict = pd.read_excel(file_name, sheet_name=None, keep_default_na=False, na_values=[])
log.info("开始推送消息....")
for sheet_name, values in data_dict.items():
for _, item_row in values.iterrows():
if sheet_name == "Sheet1":
parent_id = item_row.get('Invoice ID', "")
else:
parent_id = sheet_name
push_data = {
'ad_date': item_row.get('Invoice date', ""), # spa费用数据日期
'erp_sku': item_row.get('ERP SKU', ""), # ERP SKU
'ad_amount': item_row.get('Original balance', ""), # spa费用金额
'ad_amount_currency': item_row.get('Agreement Currency', ""), # spa费用币制
'funding_type': item_row.get('Funding Type', ""), # 资金类型
'transaction_type': item_row.get('Transaction Type', ""), # 交易类型
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
'asin': item_row.get('Asin', ""), # ASIN
'shop_code': shop_code, # 店铺code
'type': 2, # 1 sheet1 2 其他sheet
'parent_id': parent_id, # sheet1 为Invoice ID 其他sheet为sheet名称
'order_no': item_row.get('Purchase Order', ""), # 订单号
}
# 推送数据
rabbit.send_message(push_data)
def test(file_name, result_file_name):
log.info("开始读取数据....")
invoices = pd.read_excel(file_name)
invoices['Original balance'] = invoices['Original balance'].replace({'\$': '', ',': '', '\€': '', },
regex=True).astype(float)
# 通过 Invoice ID 去重,并累加 Original balance
invoices = invoices.groupby('Invoice ID', as_index=False)['Original balance'].sum()
# 根据 Original balance 从小到大排序
sorted_invoices = invoices.sort_values(by='Original balance')
# 保存排序后的数据到 origin.xls
sorted_invoices.to_excel('origin.xlsx', index=False)
log.info("开始读取数据....")
data_dict = pd.read_excel(result_file_name, sheet_name=None, keep_default_na=False, na_values=[])
sheet_data = []
for sheet_name, values in data_dict.items():
sheet_total = 0
for _, item_row in values.iterrows():
balance = item_row.get('Original balance', "")
if isinstance(balance, str):
balance = balance.replace('$', '').replace(',', '').replace('€', '')
if sheet_name == 'Sheet1':
invoice_id = item_row.get('Invoice ID', "")
temp_sheet_data = {"sheet": invoice_id, "total": float(balance)}
sheet_data.append(temp_sheet_data)
else:
sheet_total = sheet_total + float(balance)
temp_sheet_data = {"sheet": sheet_name, "total": sheet_total}
sheet_data.append(temp_sheet_data)
log.info(f"sheet:{sheet_name},共计:{sheet_total}")
df = pd.DataFrame(sheet_data)
# 根据 total 从小到大排序
df = df.sort_values(by='total')
# 保存排序后的数据到 result.xls
df.to_excel('result.xlsx', index=False)
if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
shop_code = helper.get_input_with_default("店铺编码(DE-VC,FR-VC,JP-VC,CA-VC,UK-VC,VECELO])", "VECELO")
domain.domain_page(page, country)
main()
page.close()
except KeyboardInterrupt:
pass
except Exception as e:
log.error(e)
helper.print_trace("main", e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment