Commit 4d3149db authored by 邱阿朋's avatar 邱阿朋

feat(vc): 实现回款功能并优化相关模块

- 新增回款功能模块,包括处理正常回款和争议回款
- 添加 PO Code 查询和缓存逻辑
- 实现回款数据导出和推送
- 优化导入信息读取和处理
-调整 ReturnGoods 和 Spa 类的初始化参数
parent a9ef054e
......@@ -38,3 +38,28 @@ def save_xls(data, output_file, sheet_name='Sheet1', adjusted=False):
ws.column_dimensions[column_letter].width = adjusted_width
wb.save(output_file)
def remove_last_comma(csv_file, skip_rows=2):
# 创建一个空列表用于存储处理后的行
cleaned_lines = []
# 读取原始 CSV 文件并处理行末的逗号
with open(csv_file, 'r', encoding='utf-8') as file:
# 跳过指定数量的行
for _ in range(skip_rows):
next(file) # 跳过每一行
for line in file:
# 使用正则表达式替换 空格 + 数字 + 引号
cleaned_line = re.sub(r'(\s\d+)"', r'\1 ', line) # 去掉空格 + 数字后面的引号
# 使用正则表达式替换每个逗号前的空格为引号
cleaned_line = re.sub(r'\s+,\s*"', r'", "', cleaned_line)
# 去掉末尾的逗号和换行符
cleaned_line = cleaned_line.rstrip(',\n')
# 不添加换行符,待会写入时统一处理
cleaned_lines.append(cleaned_line)
# 将处理后的数据逐行写回文件
with open(csv_file, 'w', encoding='utf-8', newline='') as cleaned_file:
for line in cleaned_lines:
cleaned_file.write(line + '\n') # 每一行单独写入,确保每行独立处理
# coding: utf-8
from app.helper import logger
from app.helper import logger, redisx, rabbitmq
log = logger.ConsoleLog()
rdb = redisx.RedisClient()
rabbit = rabbitmq.RabbitMQClient()
# coding: utf-8
# 回款
import json
import os
import re
import urllib.parse
from datetime import datetime
from decimal import Decimal
import pandas as pd
from DrissionPage.errors import ElementNotFoundError
from lxml import etree
from app.helper import domain, file, rabbitmq, api, excel
from app.vc import log
from app.helper import domain, excel, file
from app.vc import log, rdb, rabbit
from app.vc.interface import AutoInterface
from DrissionPage import ChromiumPage as Page
......@@ -22,101 +27,387 @@ class Payment(AutoInterface):
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "退货明细.xlsx"
file_name = "回款数据.xlsx"
# 拼接新的文件名
self.result_file_name = f"{current_datetime}_{self.country}_{file_name}"
def __page_get(self, url: str):
def __page_get(self, url):
host = domain.switch_domain(self.country)
full_url = host + url
self.page.get(full_url, timeout=5)
def __export_item_read_data(self, return_id: str):
file_name = f"return_goods\\{return_id}.xls"
@staticmethod
def __export_list_read_data():
new_file_name = 'new_Payments.xlsx'
if os.path.isfile(new_file_name):
df = pd.read_excel(new_file_name)
return df
file_name = 'Payments.xlsx'
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
all_df = pd.read_excel(file_name, header=None)
# 找到所有空行的索引,这里假设完全空的行为表头之间的分界线
empty_rows = all_df[all_df.isnull().all(axis=1)].index.tolist()
# 定位表头与数据的分隔
first_header_start = empty_rows[1] + 1 # 第一个表头开始的行
second_header_start = empty_rows[2] + 4 # 第二个表头开始的行
first_df = pd.read_excel(file_name, skiprows=first_header_start, nrows=second_header_start - 7)
second_df = pd.read_excel(file_name, skiprows=second_header_start)
# 定义正则表达式模式,匹配包含 'Price' 或 'PCR' 或 'XXXXXXXX/XXXX/' 的描述
pattern = r'Price Claim|PCR|Missed Adjustment|Shortage Claim|^[A-Z0-9]{8}/[A-Z0-9]{4}/'
# 过滤符合条件的行
filtered_second_df = second_df[second_df['Description'].str.contains(pattern, na=False, regex=True)]
merged_df = pd.merge(filtered_second_df, first_df[['Payment Number', 'Payment Date']], on='Payment Number',
how='left')
excel.save_xls(merged_df, new_file_name, "Remittance payments")
return merged_df
def __invoice_details(self, invoice_number, last_two, last_three):
if len(invoice_number) > 8:
# 检查后两位是否在测试列表中
if last_two in ["MA", "PC"]:
invoice_number = invoice_number[:-2] # 去掉后两位
if last_three in ["PCR"]:
invoice_number = invoice_number[:-3] # 去掉最后三位
if last_three in ["+SC", "SC-"]:
invoice_number = invoice_number[:-3] # 去掉最后三位
invoice_number = invoice_number + 'SCR'
if last_two == "SC":
invoice_number = invoice_number + 'R'
params = {
"invoiceNumber": invoice_number,
"payeeCode": self.payeeCode,
"activeTab": "lineItems",
}
# 将字典转换为 URL 查询参数
query_string = urllib.parse.urlencode(params)
full_url = f"hz/vendor/members/inv-mgmt/invoice-details?" + query_string
self.__page_get(full_url)
def __export_details_read_data(self, file_name):
count = 0
while True:
try:
# 打开退回详情下载明细
self.__page_get(f"katalmonsapp/vendor/members/returns/{return_id}")
self.page.ele("#file-download-button").click.to_download(rename=file_name)
self.page.ele("#line-items-export-to-spreadsheet-announce").click.to_download(rename=file_name)
file.wait_for_downloads(file_name)
excel.remove_last_comma(file_name)
break
except ElementNotFoundError:
log.warning("元素未找到,刷新网页")
if count == 3: return None
count += 1
log.warning("导出按钮不存在刷新网页")
self.page.refresh()
# 读取回退商品详情
return pd.read_excel(file_name)
return pd.read_csv(file_name)
def push_data_queue(self):
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
@staticmethod
def __get_content(tree, row_index: int, cell_index: int) -> str:
"""获取指定行和列的内容,如果没有找到,则返回 None。"""
content = tree.xpath(f'//*[@role="row"][{row_index}]/*[@role="cell"][{cell_index}]/text()')
return content[0] if content else None
data = pd.read_excel(self.result_file_name, keep_default_na=False, na_values=[])
def __get_po_code(self, index, po_id) -> dict:
result = {
"index": index,
"po_id": po_id
}
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID', ''),
'asin': item_row.get('ASIN', ''), # ASIN
'order_no': item_row.get('Purchase order', ''), # 订单号
'sku_quantity': item_row.get('Quantity', 0), # 退回数量
'sku_amount': item_row.get('Total cost', 0), # Total cost
'currency': item_row.get('Currency code', ''), # Currency code
'data_date': str(item_row.get('Return Date', '')), # Return Date
'erp_sku': item_row.get("SKU", ''), # ERP SKU # SKU1匹配
'shop_code': self.shop_code, # 店铺code
'supplier_code': item_row.get('Vendor code', ''), # 供应商编码
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
po_id = po_id[:8]
cache_key = "payment"
payment_cache = rdb.get_client().hget(cache_key, po_id)
if payment_cache:
cache_value = json.loads(payment_cache)
result["vendor"] = cache_value['vendor']
result["payment_terms"] = cache_value['payment_terms']
return result
self.__page_get(f"po/vendor/members/po-mgmt/order?poId={po_id}")
po_table = self.page.ele("#po-header", timeout=5).html
# 使用 lxml 解析 HTML
tree = etree.HTML(po_table)
# 获取 Vendor 内容
result["vendor"] = self.__get_content(tree, 2, 2)
# 正则表达式查找数字和%之间的内容
match = re.search(r'Payment .*?(\d+%)', po_table)
if match:
result["payment_terms"] = match.group(1)[:-1] # 去掉%
else:
result["payment_terms"] = 0
cache_value = result.copy()
del cache_value['index']
rdb.get_client().hset(cache_key, po_id, json.dumps(cache_value))
return result
@staticmethod
def __price_extract_data(html_content):
# 使用正则表达式删除所有 HTML 注释
html_content = re.sub(r'<!--.*?-->', '', html_content)
# 使用 lxml 解析 HTML
tree = etree.HTML(html_content)
# 提取所有行的数据
rows = tree.xpath('//tr[contains(@class, "mt-row")]')
data_list = []
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'PO_NUMBER': row.xpath('string(./td[@data-column="PO_NUMBER"]/span/span/a)').strip(),
'ASIN': row.xpath('string(./td[@data-column="ASIN"]/span/span/a)').strip(),
'EXTERNAL_ID': row.xpath('string(./td[@data-column="EXTERNAL_ID"]/span/span/a)').strip(),
'TITLE': row.xpath('string(./td[@data-column="TITLE"])').strip(),
'QUANTITY': row.xpath('string(./td[@data-column="QUANTITY"])').strip(),
'INVOICE_COST': row.xpath('string(./td[@data-column="INVOICE_COST"])').strip().replace('$', ''),
'PO_COST': row.xpath('string(./td[@data-column="PO_COST"])').strip().replace('$', ''),
'INITIAL_RESEARCH_COST': row.xpath(
'string(./td[@data-column="INITIAL_RESEARCH_COST"])').strip().replace(
'$', ''),
'RESOLUTION_DECISION': row.xpath('string(./td[@data-column="RESOLUTION_DECISION"])').strip(),
'RESOLUTION_COST': row.xpath('string(./td[@data-column="RESOLUTION_COST"])').strip().replace('$', '')
}
# 推送数据
rabbit.send_message(push_data)
def run(self, file_name: str):
if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},文件不存在")
# 读取sku映射关系
relations_dict = api.sku_relations(self.country)
# 如果字段为空则设为空字符串
data = {k: "" if not v else v for k, v in data.items()}
data_list.append(data)
return data_list
@staticmethod
def __line_items_data(html_content):
# 使用正则表达式删除所有 HTML 注释
html_content = re.sub(r'<!--.*?-->', '', html_content)
# 使用 lxml 解析 HTML
tree = etree.HTML(html_content)
# 提取所有行的数据
rows = tree.xpath('//tr[contains(@class, "mt-row")]')
data_list = []
list_data = pd.read_excel(file_name)
# 下载并读取list数据
for row in rows:
# 定义 data 字典,提取并去除多余字符
data = {
'PO': row.xpath('string(./td[@data-column="PO_NUMBER"]/span/span/a)').strip(),
'External ID': row.xpath('string(./td[@data-column="EXTERNAL_ID"])').strip(),
'ASIN': row.xpath('string(./td[@data-column="ASIN"]/span/span/a)').strip(),
'TITLE': row.xpath('string(./td[@data-column="DESCRIPTION"])').strip(),
'Model': row.xpath('string(./td[@data-column="MODEL_NUMBER"])').strip(),
'Freight Term': row.xpath('string(./td[@data-column="FREIGHT_TERM"])').strip(),
'Qty': row.xpath('string(./td[@data-column="QUANTITY"])').strip(),
'Unit Cost': row.xpath('string(./td[@data-column="UNIT_COST"])').strip(),
'Amount': row.xpath('string(./td[@data-column="TOTAL_AMOUNT"])').strip(),
'Shortage quantity': row.xpath('string(./td[@data-column="SHORTAGE_QUANTITY"])').strip(),
'Amount shortage': row.xpath('string(./td[@data-column="SHORTAGE_AMOUNT"])').strip(),
'Last received date': row.xpath('string(./td[@data-column="LAST_RECEIVED_DATE"])').strip(),
'ASIN received': row.xpath('string(./td[@data-column="RECEIVED_ASIN"])').strip(),
'Quantity received': row.xpath('string(./td[@data-column="RECEIVED_QUANTITY"])').strip(),
'Unit cost': row.xpath('string(./td[@data-column="RECEIVED_COST_PRICE"])').strip(),
'Amount received': row.xpath('string(./td[@data-column="RECEIVED_AMOUNT"])').strip(),
}
# 如果字段为空则设为空字符串
data = {k: "" if not v else v for k, v in data.items()}
data_list.append(data)
return data_list
@staticmethod
def __calculate_unit_cost(data_list):
"""计算差异金额单价并返回金额"""
unit_cost_float = Decimal(data_list['INVOICE_COST']) - Decimal(data_list['INITIAL_RESEARCH_COST'])
unit_cost = unit_cost_float * int(data_list['QUANTITY'])
return unit_cost_float, float(f"{unit_cost:.2f}")
def __handle_price_data(self, price_data_list, invoice_amount):
total_price_data_amount = 0
# 计算总金额
for data_list in price_data_list:
unit_cost_float, amount = self.__calculate_unit_cost(data_list)
total_price_data_amount += amount
result = []
invoice_amount = abs(float(f"{invoice_amount:.2f}"))
# 如果总金额等于列表金额则已回款
if total_price_data_amount == invoice_amount:
for data_list in price_data_list:
unit_cost_float, amount = self.__calculate_unit_cost(data_list)
data = data_list.copy()
data['Quantity received'] = data_list['QUANTITY']
data['UnitCost'] = unit_cost_float
data['Amount'] = f"${amount:.2f}"
data['Shortage quantity'] = '0'
result.append(data)
else:
for data_list in price_data_list:
unit_cost_float, amount = self.__calculate_unit_cost(data_list)
if amount == invoice_amount:
data = data_list.copy()
data['Quantity received'] = data_list['QUANTITY']
data['UnitCost'] = unit_cost_float
data['Amount'] = f"${amount:.2f}"
data['Shortage quantity'] = '0' if data_list['RESOLUTION_DECISION'] == "Approved" else '1'
result.append(data)
break
return result
@staticmethod
def __handle_data(detail_datum, vendor, deduction_points):
"""处理正常数据"""
amount = detail_datum.get('Amount', '$0.00') # 默认值设为 '$0.00' 以避免错误
amount = float(amount.replace('$', '').replace(',', ''))
# 如果是0则回款完成
is_finished = "否"
shortage_quantity = detail_datum.get('Shortage quantity', -1)
if shortage_quantity == '0':
is_finished = "是"
amount_after_deduction = amount
if deduction_points > 0:
# 计算扣除后的金额
amount_after_deduction = amount - (amount * (deduction_points / 100))
# 复制原始行数据,避免直接修改
record = detail_datum.copy()
record.update({"Amount": amount})
record["IsFinished"] = is_finished
record["DeductionPoints"] = f"{deduction_points}%" # 拼接百分号
record["Code"] = vendor
record["AmountAfterDeduction"] = amount_after_deduction
return record
def run(self, file_name: str):
list_data = self.__export_list_read_data()
# 获取当前日期和时间并格式化
current_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M')
# 原文件名
file_name = "回款数据.xlsx"
# 拼接新的文件名
new_file_name = f"{current_datetime}_{self.country}_{file_name}"
excel.save_xls(list_data, new_file_name, "Remittance payments")
log.info(f"共计:{len(list_data)} 订单")
new_list_data = []
all_normal_pay_data = []
all_price_pay_data = []
i = 0
for _, data in list_data.iterrows():
i += 1
return_id = data.get('Return ID')
log.info({"index": i, "return_id": return_id})
# 下载退货详情表格读取数据
item_data = self.__export_item_read_data(return_id)
# 按 'Purchase order' 和 'ASIN' 分组,并对 'Quantity' 和 Total amount 进行求和
item_data_result = item_data.groupby(['Purchase order', 'ASIN', 'Reason'], as_index=False).agg({
'Quantity': 'sum',
'Total amount': 'sum',
})
for _, item_row in item_data_result.iterrows():
relation = relations_dict.get(item_row.get('ASIN'))
erp_sku = relation.get('erp_sku', "")
data_dict = data.to_dict()
data_dict.update({
'Return Date': data_dict['Return Date'].strftime('%m/%d/%Y'),
'Return ID': str(data_dict['Return ID']),
'PO': item_row.get('Purchase order', ""),
'ASIN': item_row.get('ASIN', ""),
'SKU': erp_sku,
'Quantity': item_row.get('Quantity', 0),
# 替换回会数量和金额为详情里面的值
'Return quantity': item_row.get('Quantity', 0), # 替换回会数量
'Reason': item_row.get('Reason', ""),
'Total cost': item_row.get('Total amount', 0), # 替换金额
'Group Name': relation.get("name", ""),
'Group Code': relation.get("code", ""),
})
# 追加数据
new_list_data.append(data_dict)
excel.save_xls(new_list_data, self.result_file_name)
invoice_number = data.get("Invoice Number")
invoice_amount = data.get("Invoice Amount")
# 获取当前订单的Payee和优惠比例
vendor_payment_terms = self.__get_po_code(i, invoice_number)
log.info(vendor_payment_terms)
vendor = vendor_payment_terms['vendor']
deduction_points = int(vendor_payment_terms['payment_terms'])
# 处理单号主要为了进入详情页
last_two = invoice_number[-2:] # 取后两位
last_three = invoice_number[-3:] # 取后三位
# 判断是否为争议订单
if len(invoice_number) > 8 and (last_three == "PCR" or last_two == "PC"):
cache_key = "price_data"
price_data = rdb.get_client().hget(cache_key, invoice_number)
if price_data:
price_data = json.loads(price_data)
else:
# 进入详情页
self.__invoice_details(invoice_number, last_two, last_three)
# 点击争议价tab
self.page.ele("#pd").click()
log.debug("等待争议数据加载,10秒后获取表单数据")
self.page.wait(10)
table_html = self.page.ele("#priceDiscrepancyWithDMSGridForm", timeout=5).html
# 抓取表单数据
price_data = self.__price_extract_data(table_html)
# 缓存数据
rdb.get_client().hset(cache_key, invoice_number, json.dumps(price_data))
# 争议回款
price_data = self.__handle_price_data(price_data, invoice_amount)
price_pay_data = []
for detail_datum in price_data:
# 争议回款数据
format_price_data = self.__handle_data(detail_datum, vendor, deduction_points)
# 将处理后的记录添加到临时列表
price_pay_data.append(format_price_data)
# 添加到汇总列表
all_price_pay_data.append(pd.DataFrame(price_pay_data))
else:
cache_key = "item_data"
detail_data = rdb.get_client().hget(cache_key, invoice_number)
if detail_data:
detail_data = json.loads(detail_data)
else:
# 进入详情页
self.__invoice_details(invoice_number, last_two, last_three)
self.page.wait(3)
table_html = self.page.ele("#invoiceLineItems", timeout=5).html
# 抓取表单数据
detail_data = self.__line_items_data(table_html)
# 缓存数据
rdb.get_client().hset(cache_key, invoice_number, json.dumps(detail_data))
# 初始化列表存储新字段数据
normal_pay_data = []
for detail_datum in detail_data:
# 正常回款数据
success_data = self.__handle_data(detail_datum, vendor, deduction_points)
# 将处理后的记录添加到临时列表
normal_pay_data.append(success_data)
# 添加到汇总列表
all_normal_pay_data.append(pd.DataFrame(normal_pay_data))
if all_normal_pay_data:
# 将所有数据合并为一个 DataFrame
normal_pay_summary = pd.concat(all_normal_pay_data, ignore_index=True)
excel.save_xls(normal_pay_summary, new_file_name, "正常回款导出明细")
if all_price_pay_data:
price_pay_summary = pd.concat(all_price_pay_data, ignore_index=True)
excel.save_xls(price_pay_summary, new_file_name, "Price导出明细")
def push_data_queue(self):
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
data = pd.read_excel(self.result_file_name, keep_default_na=False, na_values=[])
for _, item_row in data.iterrows():
push_data = {
'payment_number': item_row.get('Payment Number', ''), # 订单id
'order_date': str(item_row.get('Invoice Date', '')), # 发票时间
'payment_date': str(item_row.get('Payment Date', '')), # 支付时间
'order_no': item_row.get('Invoice Number', 0), # 订单号
'payment_type': item_row.get('Description', ''), # Description
'platform_payable_amount': item_row.get('Invoice Amount', ''), # 平台应付金额
'fee_amount': item_row.get("Terms Discount Taken", ''), # 手续费
'actual_payment': item_row.get('Amount Paid', ''), # 实际支付金额
'currency': item_row.get('Invoice Currency', ''), # 货币
'shop_code': self.shop_code, # 店铺code
}
# 推送数据
rabbit.send_message(push_data)
......@@ -2,19 +2,14 @@
# 回款明细
import pandas as pd
from DrissionPage import ChromiumPage as Page
from app.helper import rabbitmq
from app.vc import log
from app.vc import log, rabbit
from app.vc.interface import AutoInterface
class PaymentPush(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
self.page = page
def __init__(self, country: str, shop_code: str):
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
@staticmethod
......@@ -54,7 +49,6 @@ class PaymentPush(AutoInterface):
log.info(f"共计:{len(invoices)} 订单")
rabbit = rabbitmq.RabbitMQClient()
rabbit.connect(queue='refund_robot', routing_key='refund_robot', exchange='reports')
i = 0
......
......@@ -13,10 +13,9 @@ from DrissionPage import ChromiumPage as Page
class ReturnGoods(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
def __init__(self, page: Page, country: str,shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
# 获取当前日期和时间并格式化
......
......@@ -14,10 +14,9 @@ from app.helper import domain, file, excel, rabbitmq, api
class Spa(AutoInterface):
def __init__(self, page: Page, country: str, payee_code: str, shop_code: str):
def __init__(self, page: Page, country: str, shop_code: str):
self.page = page
self.country = country
self.payeeCode = payee_code
self.shop_code = shop_code
# 获取当前日期和时间并格式化
......
......@@ -3,6 +3,7 @@ import os
from app.helper import file, domain, helper
from app.vc.payment import Payment
from app.vc.payment_push import PaymentPush
from app.vc.return_goods import ReturnGoods
from app.vc.spa import Spa
from DrissionPage import ChromiumPage
......@@ -24,29 +25,32 @@ if __name__ == '__main__':
try:
country = helper.get_input_with_default("国家: [ DE, FR, JP, CA, UK, US ]", "US")
shop_code = helper.get_input_with_default("店铺编码: [ DE-VC, FR-VC, JP-VC, CA-VC, UK-VC, VECELO ]", "VECELO")
payee_code = helper.get_input_with_default("回款Code: [ 详情页url参数 payeeCode ]", "VECET")
action = helper.get_input_with_default("功能:[ spa, return, payment ]", "")
file_name = helper.get_input_with_default("文件名 : [ 例如: ContraCogsInvoices.xls ]", "")
action = helper.get_input_with_default("功能:[ spa, return, payment, payment_erp ]", "")
if action == "":
raise Exception("请输入要执行的功能")
if action.lower() == "payment":
payee_code = helper.get_input_with_default("回款Code: [ 详情页url参数 payeeCode ]", "VECET")
object_instate = Payment(page, country, payee_code, shop_code)
if action.lower() == "payment_erp":
object_instate = PaymentPush(country, shop_code)
elif action.lower() == "return":
object_instate = ReturnGoods(page, country, payee_code, shop_code)
object_instate = ReturnGoods(page, country, shop_code)
elif action.lower() == "spa":
object_instate = Spa(page, country, payee_code, shop_code)
object_instate = Spa(page, country, shop_code)
else:
raise Exception("请输入正确的功能")
file_name = helper.get_input_with_default("文件名 : [ 例如: ContraCogsInvoices.xls ]", "")
if file_name == "":
raise Exception("请输入文件名")
# 切换域名
domain.domain_page(page, country)
# 执行功能
object_instate.run(file_name)
# 推送数据到队列中
object_instate.push_data_queue()
except KeyboardInterrupt:
pass
except Exception as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment