Commit b52560cb authored by 邱阿朋's avatar 邱阿朋

refactor(spa): 重构 SPA 报告处理逻辑

-移除了未使用的 diff_spa.py 文件
- 新增了 extract_numeric_value 函数,用于提取字符串中的数字
- 修改了 large_items 和 small_items 的处理逻辑,增加了 VAT 计算
- 优化了总金额的计算方法,只计算有效数据行
- 删除了 requirements.txt 中未使用的依赖项
parent c26d3425
# coding: utf-8
import os
import re
import sys
import traceback
......@@ -17,3 +18,24 @@ def print_trace(title: str, err):
print(err_str)
for i in traceback.extract_tb(except_traceback):
print("函数{},文件:{},行:{}".format(i.name, i.filename, i.lineno))
def extract_numeric_value(value):
"""
从包含货币符号的字符串中提取纯数字
参数:
value: 输入值(可以是字符串、数字或NaN)
返回:
float: 提取的数字值,如果输入无效则返回0.0
"""
str_value = str(value).strip()
# 处理千分位分隔符(如1,000.00)
str_value = str_value.replace(',', '')
# 移除非数字字符(保留数字和小数点)
numeric_str = re.sub(r'[^\d.]', '', str_value)
try:
return float(numeric_str) if numeric_str else 0.0
except ValueError:
return 0.0
......@@ -2,17 +2,17 @@
# spa
import math
import os
import pandas as pd
from lxml import etree
from datetime import datetime
from urllib.parse import urlparse, parse_qs
import pandas as pd
from DrissionPage import ChromiumPage as Page
from DrissionPage.errors import ElementNotFoundError
from lxml import etree
from app.helper import domain, file, excel, api, helper
from app.logger.logger import Logger
from app.vc import rabbit
from app.vc.interface import AutoInterface
from app.helper import domain, file, excel, api
class Spa(AutoInterface):
......@@ -105,8 +105,9 @@ class Spa(AutoInterface):
except ValueError as e:
pass
def __process_large_items(self, item_list, relation_data, coop):
def __process_large_items(self, item_list, relation_data, coop, va_tax):
"""处理大数据列表 (item_list 长度 >= 10)"""
processed_items = []
for _, item in item_list.iterrows():
asin = item.get('Asin', None)
......@@ -116,6 +117,7 @@ class Spa(AutoInterface):
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
original_balance = rebate or vendor_funding
funding_type = coop.get('Funding Type', "")
if funding_type == "":
......@@ -130,12 +132,13 @@ class Spa(AutoInterface):
processed_item['ERP SKU'] = relation.get("erp_sku")
processed_item['Group Name'] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_item["Original balance"] = original_balance
processed_item["vat"] = va_tax
processed_items.append(processed_item)
return processed_items
def __process_small_items(self, item_list, relation_data, coop):
def __process_small_items(self, item_list, relation_data, coop, va_tax):
"""处理小数据列表 (item_list 长度 < 10)"""
processed_items = []
for _, item in item_list.iterrows():
......@@ -148,6 +151,7 @@ class Spa(AutoInterface):
relation = relation_data.get(asin, {})
rebate = item.get("Rebate In Agreement Currency", None)
vendor_funding = item.get("Vendor Funding In Agreement Currency", None)
original_balance = rebate or vendor_funding
processed_item = coop.copy()
processed_item.pop("Original balance")
......@@ -168,7 +172,8 @@ class Spa(AutoInterface):
processed_item["ERP SKU"] = relation.get("erp_sku")
processed_item["Group Name"] = relation.get("name")
processed_item['Group Code'] = relation.get("code")
processed_item["Original balance"] = rebate or vendor_funding
processed_item["Original balance"] = original_balance
processed_item["vat"] = va_tax
processed_items.append(processed_item)
return processed_items
......@@ -203,6 +208,24 @@ class Spa(AutoInterface):
# for sheet_name, data in large_sheet_data.items():
# executor.submit(write_sheet, writer, data, sheet_name)
@staticmethod
def __calculate_sheets_total_amount(sheets):
"""
计算每个sheet总金额
"""
total_amount = 0
rebate_column = "Rebate In Agreement Currency"
for sheet_name, df in sheets.items():
# 找到第一个空行(NaN值)的索引
first_empty = df[rebate_column].isna().idxmax()
# 只计算第一个空行之前的数据
valid_data = df.loc[:first_empty - 1, rebate_column]
# 计算总和,忽略NaN值
total_amount = total_amount + valid_data.sum()
return total_amount
def push_data_queue(self):
rabbit.connection()
rabbit.connect(queue='spa_robot', routing_key='spa_robot', exchange='reports')
......@@ -281,22 +304,29 @@ class Spa(AutoInterface):
self.logger.warning(f"{invoice_id} 暂无报告信息")
continue
# 计算每个sheet总金额
total_amount = self.__calculate_sheets_total_amount(item_dict)
original_balance = coop.get("Original balance", 0.0)
original_balance = helper.extract_numeric_value(original_balance)
# 0 没有税 1 有税
va_tax = int(total_amount != original_balance)
if len(item_dict) > 1:
for i, value in enumerate(item_dict.values(), start=1):
invoice_id_num = f"{invoice_id}-{i}"
processed_items = self.__process_large_items(value, relation_data, coop)
processed_items = self.__process_large_items(value, relation_data, coop, va_tax)
if processed_items:
large_sheet_data[invoice_id_num] = processed_items
else:
item_list = item_dict.get('Accrual For Current Period')
# 如果是列表且长度 >= 10 则新增sheet
if len(item_list) >= 10:
processed_items = self.__process_large_items(item_list, relation_data, coop)
processed_items = self.__process_large_items(item_list, relation_data, coop, va_tax)
if processed_items:
large_sheet_data[invoice_id] = processed_items
else:
# 如果是较小的列表
processed_items = self.__process_small_items(item_list, relation_data, coop)
processed_items = self.__process_small_items(item_list, relation_data, coop, va_tax)
sheet_data.extend(processed_items)
# 保存数据到 Excel 文件
......
import os.path
import pandas as pd
import argparse
class InvoiceIDComparator:
def __init__(self, file_a, file_b, invoice_column_name):
self.file_a = file_a
self.file_b = file_b
self.invoice_column_name = invoice_column_name
def get_invoice_ids_from_excel(self, file_path):
"""从Excel文件中获取所有sheet的Invoice ID"""
excel_file = pd.ExcelFile(file_path)
invoice_ids = set() # 使用集合去重
for sheet_name in excel_file.sheet_names:
# 读取每个sheet的内容
df = excel_file.parse(sheet_name)
# 确保指定的列存在
if self.invoice_column_name in df.columns:
invoice_ids.update(df[self.invoice_column_name].dropna().unique())
invoice_ids.add(sheet_name) # 将sheet名也加入到集合中
return invoice_ids
def compare_invoice_ids(self):
"""比较两个Excel文件中的Invoice ID"""
# 获取文件A中的Invoice ID和所有sheet名称
invoice_ids_a = self.get_invoice_ids_from_excel(self.file_a)
# 获取文件B中的Invoice ID和所有sheet名称
invoice_ids_b = self.get_invoice_ids_from_excel(self.file_b)
only_in_a = invoice_ids_a - invoice_ids_b
only_in_b = invoice_ids_b - invoice_ids_a
# 输出比较结果
print("文件A中存在,但文件B中没有的 Invoice IDs:")
print(only_in_a)
print("\n文件B中存在,但文件A中没有的 Invoice IDs:")
print(only_in_b)
def main():
# 设置命令行参数
parser = argparse.ArgumentParser(description="比较两个Excel文件中的Invoice ID差异")
parser.add_argument('--original_file', default="ContraCogsInvoices.xls", help="原文件路径")
parser.add_argument('--result_file', default="result.xls", help="结果文件路径")
parser.add_argument('--invoice_column', default='Invoice ID', help="Invoice ID列的名称")
# 解析命令行参数
args = parser.parse_args()
if os.path.exists(args.original_file) is False:
raise FileExistsError("源文件不存在")
if os.path.exists(args.result_file) is False:
raise FileExistsError("结果文件不存在")
# 创建 InvoiceIDComparator 实例并进行比较
comparator = InvoiceIDComparator(args.original_file, args.result_file, args.invoice_column)
comparator.compare_invoice_ids()
# 程序入口
if __name__ == "__main__":
try:
main()
except Exception as e:
print(e)
\ No newline at end of file
......@@ -9,6 +9,4 @@ redis==5.0.8
pika==1.3.2
xmltodict==0.14.2
python-dotenv==1.0.1
paramiko==3.5.0
ttkbootstrap==1.10.1
ldap3==2.9.1
\ No newline at end of file
ttkbootstrap==1.10.1
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment