Commit dd001f0c authored by 邱阿朋's avatar 邱阿朋

spa查询,对比数据

parent b581dade
import pandas as pd
def compare_po_and_amount(file1, file2):
# 读取两个 Excel 文件
df1 = pd.read_excel(file1)
df2 = pd.read_excel(file2)
# 确保两个表格中都存在 'PO' 和 'Amount' 列
if 'PO' not in df1.columns or 'Amount' not in df1.columns:
raise ValueError("File 1 is missing required columns: 'PO' or 'Amount'")
if 'PO' not in df2.columns or 'Amount' not in df2.columns:
raise ValueError("File 2 is missing required columns: 'PO' or 'Amount'")
# 聚合数据,按照 'PO' 进行分组,并对 'Amount' 进行求和
df1_grouped = df1.groupby('PO', as_index=False)['Amount'].sum()
df2_grouped = df2.groupby('PO', as_index=False)['Amount'].sum()
# 合并两个数据框,使用 'PO' 列进行比较
merged_df = pd.merge(df1_grouped, df2_grouped, on='PO', how='outer', suffixes=('_file1', '_file2'))
# 找出两列 'Amount' 不相同的行
diff_df = merged_df[merged_df['Amount_file1'] != merged_df['Amount_file2']]
# 如果没有差异,输出消息
if diff_df.empty:
print("The PO amounts are identical in both files.")
else:
print("The following POs have different amounts:")
print(diff_df)
# 使用示例
compare_po_and_amount('file1.xlsx', 'file2.xlsx')
import pandas as pd
import argparse
class InvoiceIDComparator:
def __init__(self, file_a, file_b, invoice_column_name):
self.file_a = file_a
self.file_b = file_b
self.invoice_column_name = invoice_column_name
def get_invoice_ids_from_excel(self, file_path):
"""从Excel文件中获取所有sheet的Invoice ID"""
excel_file = pd.ExcelFile(file_path)
invoice_ids = set() # 使用集合去重
for sheet_name in excel_file.sheet_names:
# 读取每个sheet的内容
df = excel_file.parse(sheet_name)
# 确保指定的列存在
if self.invoice_column_name in df.columns:
invoice_ids.update(df[self.invoice_column_name].dropna().unique())
invoice_ids.add(sheet_name) # 将sheet名也加入到集合中
return invoice_ids
def compare_invoice_ids(self):
"""比较两个Excel文件中的Invoice ID"""
# 获取文件A中的Invoice ID和所有sheet名称
invoice_ids_a = self.get_invoice_ids_from_excel(self.file_a)
# 获取文件B中的Invoice ID和所有sheet名称
invoice_ids_b = self.get_invoice_ids_from_excel(self.file_b)
only_in_a = invoice_ids_a - invoice_ids_b
only_in_b = invoice_ids_b - invoice_ids_a
# 输出比较结果
print("文件A中存在,但文件B中没有的 Invoice IDs:")
print(only_in_a)
print("\n文件B中存在,但文件A中没有的 Invoice IDs:")
print(only_in_b)
def main():
# 设置命令行参数
parser = argparse.ArgumentParser(description="比较两个Excel文件中的Invoice ID差异")
parser.add_argument('--original_file', help="原文件路径")
parser.add_argument('--result_file', help="结果文件路径")
parser.add_argument('--invoice_column', default='Invoice ID', help="Invoice ID列的名称")
# 解析命令行参数
args = parser.parse_args()
# 创建 InvoiceIDComparator 实例并进行比较
comparator = InvoiceIDComparator(args.original_file, args.result_file, args.invoice_column)
comparator.compare_invoice_ids()
# 程序入口
if __name__ == "__main__":
main()
\ No newline at end of file
......@@ -32,7 +32,7 @@ page.set.download_path(download_path)
def page_get(url):
host = domain.switch_domain(country)
full_url = host + url
page.get(full_url, timeout=5)
page.get(full_url, timeout=10)
def export_list_read_data():
......@@ -63,6 +63,47 @@ def get_report_table_html(invoice_id):
page.refresh()
def get_report_agreement_text(invoice_id):
# 关闭下载报表窗口
page.ele("#return-to-invoice-overview").click()
# 点击选项卡
page.ele("#a-autoid-2-announce").click()
# 下载报表
page.ele(f"#invoiceDownloads-{invoice_id}_3").click()
page.wait(5)
# 获取报表表单内容
tree = etree.HTML(page.html)
# 找到包含表格的部分
table = tree.xpath('//table[@width="90%"]')[0] # 获取第一个(也是唯一一个)匹配的表格元素
# 获取所有表格行(tr),跳过表头
rows = table.xpath('.//tr[position()>1]')
# 定义一个列表,用于存储所有行数据
data_list = []
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'ASIN': row.xpath('string(./td[1])').strip(),
'UPC': row.xpath('string(./td[2])').strip(),
'Price Protected Quantity': row.xpath('string(./td[3])').strip(),
'Pending PO Codes': row.xpath('string(./td[4])').strip(),
'Description': row.xpath('string(./td[5])').strip(),
'Old Cost': row.xpath('string(./td[6])').strip().replace('$', ''),
'New Cost': row.xpath('string(./td[7])').strip().replace('$', ''),
'Delta': row.xpath('string(./td[8])').strip().replace('$', ''),
'Total by ASIN': row.xpath('string(./td[9])').strip().replace('$', '')
}
# 如果字段为空则设为空字符串
for key in data:
if not data[key]:
data[key] = "" # 将 None 转为 ""
# 将处理后的数据字典添加到列表
data_list.append(data)
return data_list
def export_item_read_data(invoice_id):
file_name = f"spa\\{invoice_id}.xlsx"
if os.path.isfile(file_name):
......@@ -74,6 +115,10 @@ def export_item_read_data(invoice_id):
tree = etree.HTML(report_table_html)
# 提取所有链接
links = tree.xpath('//table[@id="backup-report-table"]//a/@href')
if len(links) == 0:
data_list = get_report_agreement_text(invoice_id)
return pd.DataFrame(data_list)
for link in links:
# 解析链接中的查询参数
parsed_url = urlparse(link)
......@@ -82,7 +127,7 @@ def export_item_read_data(invoice_id):
filename = query_params.get('fileName', ['未找到文件名'])[0]
report_file_tmp_dir = f"spa\\{invoice_id}\\{filename}\\"
host = domain.switch_domain(country)
page.download(host + link, report_file_tmp_dir, show_msg=False)
page.download(host + link, report_file_tmp_dir, show_msg=True)
report_file = report_file_tmp_dir + "BackupReport.xls"
file.wait_for_downloads(report_file, 120)
......@@ -98,10 +143,15 @@ def export_item_read_data(invoice_id):
if not header_is_normal:
continue
data = df[df['Asin'].notna()]
excel.save_xls(data, file_name)
shutil.rmtree(f"spa\\{invoice_id}")
return pd.read_excel(file_name)
# 创建 ExcelFile 对象
excel_file = pd.ExcelFile(report_file)
# 获取所有工作表名称
sheet_names = excel_file.sheet_names
for sheet_name in sheet_names:
df = pd.read_excel(report_file, sheet_name=sheet_name)
data = df[df['Asin'].notna()]
excel.save_xls(data, file_name, sheet_name)
return pd.read_excel(file_name, sheet_name=None)
except ValueError:
pass
......@@ -119,7 +169,6 @@ def main():
sheet_data = [] # 用于保存小数据
large_sheet_data = {} # 保存大数据(需要分 Sheet)
max_sheet_data = {} # 保存超大数据(行数 > 5000)
# 遍历合作列表
for index, coop in coop_list.iterrows():
index += 1
......@@ -136,20 +185,25 @@ def main():
log.warning(f"{invoice_id} 暂无报告信息")
continue
# 按 item_list 的长度处理小数据或大数据
if len(item_list) >= 10:
# 如果是字典,遍历其值
if isinstance(item_list, dict):
for i, value in enumerate(item_list.values(), start=1):
invoice_id_num = f"{invoice_id}-{i}"
processed_items = process_large_items(value, relation_data)
if processed_items:
large_sheet_data[invoice_id_num] = processed_items
# 如果是列表且长度 >= 10
elif len(item_list) >= 10:
processed_items = process_large_items(item_list, relation_data)
if processed_items:
if len(processed_items) > 5000:
max_sheet_data[invoice_id] = processed_items
else:
large_sheet_data[invoice_id] = processed_items
large_sheet_data[invoice_id] = processed_items
else:
# 如果是较小的列表
processed_items = process_small_items(item_list, coop, relation_data)
sheet_data.extend(processed_items)
# 保存数据到 Excel 文件
save_excel(sheet_data, large_sheet_data, max_sheet_data, new_file_name)
save_excel(sheet_data, large_sheet_data, new_file_name)
def process_large_items(item_list, relation_data):
......@@ -177,6 +231,8 @@ def process_small_items(item_list, coop, relation_data):
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
if asin is None:
asin = item.get('ASIN', None)
if not validate_asin(asin):
continue
......@@ -199,14 +255,15 @@ def validate_asin(asin):
return asin and not (isinstance(asin, float) and math.isnan(asin))
def save_excel(sheet_data, large_sheet_data, max_sheet_data, new_file_name):
"""保存数据到 Excel 文件"""
# 创建一个写入函数
def write_sheet(writer, data, sheet_name):
log.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
df = pd.DataFrame(data) # 将数据转换为 DataFrame
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 创建一个写入函数
def write_sheet(writer, data, sheet_name):
log.info(f"开始写入 {sheet_name}, 共计 {len(data)} 条")
df = pd.DataFrame(data) # 将数据转换为 DataFrame
df.to_excel(writer, sheet_name=sheet_name, index=False)
def save_excel(sheet_data, large_sheet_data, new_file_name):
"""保存数据到 Excel 文件"""
# 初始化 Excel 写入器
with pd.ExcelWriter(new_file_name, engine="openpyxl") as writer:
# 写入小数据
......@@ -221,13 +278,6 @@ def save_excel(sheet_data, large_sheet_data, max_sheet_data, new_file_name):
for sheet_name, data in large_sheet_data.items():
executor.submit(write_sheet, writer, data, sheet_name)
# 写入超大数据
if max_sheet_data:
log.info(f"保存超大数据,共计 {sum(len(data) for data in max_sheet_data.values())} 条")
with ThreadPoolExecutor() as executor:
for sheet_name, data in max_sheet_data.items():
executor.submit(write_sheet, writer, data, sheet_name)
log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment