Commit 131c9a08 authored by 邱阿朋's avatar 邱阿朋

爬虫处理

parent ccca3f9a
...@@ -7,4 +7,5 @@ ContraCogsInvoices.xls ...@@ -7,4 +7,5 @@ ContraCogsInvoices.xls
Payments.xlsx Payments.xlsx
returns returns
invoices invoices
coop coop
\ No newline at end of file *.xlsx
\ No newline at end of file
...@@ -4,7 +4,7 @@ import os ...@@ -4,7 +4,7 @@ import os
import pandas as pd import pandas as pd
from DrissionPage import ChromiumPage from DrissionPage import ChromiumPage
from DrissionPage.errors import PageDisconnectedError from DrissionPage.errors import PageDisconnectedError, ElementNotFoundError
from helper import helper from helper import helper
...@@ -20,31 +20,40 @@ helper.make_dir(download_path) ...@@ -20,31 +20,40 @@ helper.make_dir(download_path)
page.set.download_path(download_path) page.set.download_path(download_path)
def export_list(invoice_id):
try:
file_name = f"coop\\{invoice_id}.csv"
if not os.path.isfile(file_name):
page.get(f"https://vendorcentral.amazon.com/hz/vendor/members/coop?searchText={invoice_id}")
# 点击选项卡
page.ele("#a-autoid-2-announce").click()
# 下载报表
file_name = f"coop\\{invoice_id}.csv"
page.ele(f"#invoiceDownloads-{invoice_id}_1").click.to_download(rename=file_name).wait()
except ElementNotFoundError:
print("导出按钮不存在刷新网页")
page.refresh()
export_list(invoice_id)
def main(): def main():
page.get("https://vendorcentral.amazon.com/hz/vendor/members/coop?ref_=vc_xx_subNav") page.get("https://vendorcentral.amazon.com/hz/vendor/members/coop?ref_=vc_xx_subNav")
# 全选 file_name = "ContraCogsInvoices.xls"
page.ele("#select-all").click() if not os.path.isfile(file_name):
# 点击选项卡 # 全选
page.ele("#cc-invoice-actions-dropdown").click() page.ele("#select-all").click()
# 点击下载报表 # 点击选项卡
page.ele("#cc-invoice-actions-dropdown_2").click.to_download() page.ele("#cc-invoice-actions-dropdown").click()
page.download.wait() # 点击下载报表
page.ele("#cc-invoice-actions-dropdown_2").click.to_download().wait()
file_name = "ContraCogsInvoices.xls" file_name = "ContraCogsInvoices.xls"
coop_data = pd.read_excel(file_name, engine='xlrd') coop_data = pd.read_excel(file_name, engine='xlrd')
for _, data in coop_data.iterrows(): for _, data in coop_data.iterrows():
# 根据回款id搜索下载报表 # 根据回款id搜索下载报表
invoice_id = data.get("Invoice ID") invoice_id = data.get("Invoice ID")
page.get(f"https://vendorcentral.amazon.com/hz/vendor/members/coop?searchText={invoice_id}") print(invoice_id)
# 点击选项卡 export_list(invoice_id)
page.ele("#a-autoid-2-announce").click()
# 下载报表
file_name = f"coop\\{invoice_id}.csv"
page.ele(f"#invoiceDownloads-{invoice_id}_1").click.to_download(rename=file_name)
# invoice_data = pd.read_csv(file_name)
# for _, invoice, in invoice_data.iterrows():
# print(f"{invoice.to_dict()}")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -11,28 +11,38 @@ def open_xls(file_path): ...@@ -11,28 +11,38 @@ def open_xls(file_path):
return workbook.sheet_by_index(0) # 选择第一个工作表 return workbook.sheet_by_index(0) # 选择第一个工作表
def save_xls(data, output_file): def save_xls(data, output_file, sheet_name='Sheet1', adjusted=True):
df = pd.DataFrame(data) try:
# 将 DataFrame 写入 Excel 文件 # 如果文件已存在,则追加新的 sheet
df.to_excel(output_file, index=False) # index=False 表示不写入行索引 with pd.ExcelWriter(output_file, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer:
df = pd.DataFrame(data)
df.to_excel(writer, index=False, sheet_name=sheet_name)
except FileNotFoundError:
# 如果文件不存在,创建一个新的文件并写入
with pd.ExcelWriter(output_file, engine='openpyxl', mode='w') as writer:
df = pd.DataFrame(data)
df.to_excel(writer, index=False, sheet_name=sheet_name)
# 使用 openpyxl 重新加载工作簿 # 使用 openpyxl 重新加载工作簿
wb = load_workbook(output_file) wb = load_workbook(output_file)
ws = wb.active # 获取活动工作表 ws = wb[sheet_name]
if not adjusted:
return
# 自动调整列宽 # 自动调整列宽
for column in ws.columns: for column in ws.columns:
max_length = 0 max_length = 0
# 获取列字母
column_letter = column[0].column_letter column_letter = column[0].column_letter
for cell in column: for cell in column:
try: try:
if len(str(cell.value)) > max_length: # 判断 cell.value 是否为 None,并尝试转换为字符串计算长度
if cell.value is not None and len(str(cell.value)) > max_length:
max_length = len(str(cell.value)) max_length = len(str(cell.value))
except: except (TypeError, AttributeError) as e:
pass print(f"Error processing cell {cell.coordinate}: {e}")
# 增加一些宽度以美观 continue
adjusted_width = (max_length + 2) adjusted_width = max_length + 3
ws.column_dimensions[column_letter].width = adjusted_width ws.column_dimensions[column_letter].width = adjusted_width
# 保存调整后的工作簿
wb.save(output_file) wb.save(output_file)
# coding: utf-8 # coding: utf-8
# 回款明细 # 回款明细
import os import os
import time
import urllib.parse import urllib.parse
import warnings import warnings
import pandas as pd import pandas as pd
from DrissionPage import ChromiumPage from DrissionPage import ChromiumPage
from DrissionPage.errors import PageDisconnectedError from DrissionPage.errors import PageDisconnectedError, ElementNotFoundError
from helper import helper from helper import helper, excel
page = ChromiumPage() page = ChromiumPage()
page.set.load_mode.normal() page.set.load_mode.normal()
...@@ -25,24 +24,12 @@ page.set.download_path(download_path) ...@@ -25,24 +24,12 @@ page.set.download_path(download_path)
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl") warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
# 对过滤后的数据,进一步处理 Description 列 def export_list_filter_data():
def process_description(description):
# 按空格分割最后一段
parts = description.split('/')
# 检查分割后的最后一个部分的前面是否为8位数
if len(parts) > 0 and len(parts[-1]) >= 8:
# 返回分割后的前8位
return parts[-1][:8]
return None
def download_filter_data():
file_name = 'Payments.xlsx' file_name = 'Payments.xlsx'
if not os.path.isfile(file_name): if not os.path.isfile(file_name):
page.get(f"https://vendorcentral.amazon.com/hz/vendor/members/remittance/home") page.get(f"https://vendorcentral.amazon.com/hz/vendor/members/remittance/home")
page.ele("#remittance-home-select-all").click() page.ele("#remittance-home-select-all").click()
page.ele("#remittance-home-export-link").click.to_download() page.ele("#remittance-home-export-link").click.to_download().wait()
page.download.wait()
df = pd.read_excel('Payments.xlsx', skiprows=22) df = pd.read_excel('Payments.xlsx', skiprows=22)
# 定义正则表达式模式,匹配包含 'Price' 或 'PCR' 或 'XXXXXXXX/XXXX/' 的描述 # 定义正则表达式模式,匹配包含 'Price' 或 'PCR' 或 'XXXXXXXX/XXXX/' 的描述
...@@ -51,8 +38,38 @@ def download_filter_data(): ...@@ -51,8 +38,38 @@ def download_filter_data():
return df[df['Description'].str.contains(pattern, na=False, regex=True)] return df[df['Description'].str.contains(pattern, na=False, regex=True)]
def export_details_read_data(invoice_number):
# 读取详情内容
file_name = f"invoices\\{invoice_number}.csv"
if not os.path.isfile(file_name):
params = {
"invoiceNumber": invoice_number,
"payeeCode": "VECET",
"activeTab": "lineItems",
}
# 将字典转换为 URL 查询参数
query_string = urllib.parse.urlencode(params)
page.get(
f"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-details?" + query_string)
try:
page.ele("#line-items-export-to-spreadsheet-announce").click.to_download(rename=file_name).wait()
except ElementNotFoundError:
print("导出按钮不存在刷新网页")
page.refresh()
export_details_read_data(invoice_number)
if not os.path.isfile(file_name):
export_details_read_data(invoice_number)
return pd.read_csv(file_name, skiprows=2, engine='python', on_bad_lines='skip')
def main(): def main():
list_data = download_filter_data() list_data = export_list_filter_data()
excel.save_xls(list_data, "回款数据.xlsx", "Remittance payments")
all_normal_pay_data = []
all_price_pay_data = []
for _, data in list_data.iterrows(): for _, data in list_data.iterrows():
invoice_number = data.get("Invoice Number") invoice_number = data.get("Invoice Number")
description = data.get("Description") description = data.get("Description")
...@@ -62,26 +79,45 @@ def main(): ...@@ -62,26 +79,45 @@ def main():
print(invoice_number) print(invoice_number)
page.get( # invoice_search_page = page.new_tab(f"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-po-search?searchByNumberToken={invoice_number}")
f"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-po-search?searchByNumberToken={invoice_number}") # invoice_search_page.close()
params = { # 下载excel文件并读取数据
"invoiceNumber": invoice_number, detail_data = export_details_read_data(invoice_number)
"payeeCode": "VECET",
"activeTab": "lineItems", # 初始化列表存储新字段数据
} normal_pay_data = []
# 将字典转换为 URL 查询参数 price_pay_data = []
query_string = urllib.parse.urlencode(params)
page.get(f"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-details?" + query_string) for index, detail_datum in detail_data.iterrows():
amount = detail_datum.get('Amount', 0) # 使用默认值 0 防止 None
# 读取详情内容 code = "VECET"
file_name = f"invoices\\{invoice_number}.csv" deduction_points = 1
page.ele("#line-items-export-to-spreadsheet-announce").click.to_download(rename=file_name)
time.sleep(3) # 计算扣除后的金额
amount_after_deduction = amount - (amount * deduction_points / 100)
detail_data = pd.read_csv(file_name, skiprows=2, engine='python', on_bad_lines='skip') # 复制原始行数据,避免直接修改
# for _, detail, in detail_data.iterrows(): new_record = detail_datum.copy()
# print(f"{detail.to_dict()}") new_record["IsFinished"] = "是"
new_record["DeductionPoints"] = f"{deduction_points}%" # 拼接百分号
new_record["Code"] = code
new_record["AmountAfterDeduction"] = amount_after_deduction
# 将处理后的记录添加到临时列表
normal_pay_data.append(new_record)
# 将处理后的记录添加到临时列表
price_pay_data.append(new_record)
# 添加到汇总列表
all_normal_pay_data.append(pd.DataFrame(normal_pay_data))
all_price_pay_data.append(pd.DataFrame(price_pay_data))
# 将所有数据合并为一个 DataFrame
normal_pay_summary = pd.concat(all_normal_pay_data, ignore_index=True)
price_pay_summary = pd.concat(all_price_pay_data, ignore_index=True)
excel.save_xls(normal_pay_summary, "回款数据.xlsx", "正常回款导出明细")
excel.save_xls(price_pay_summary, "回款数据.xlsx", "Price导出明细")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -46,6 +46,40 @@ def open_url(url): ...@@ -46,6 +46,40 @@ def open_url(url):
# todo 识别图形码 # todo 识别图形码
def asin_sku_relations():
relations_dict = {}
# 读取ASIN和sku映射关系
df = pd.read_excel('relations.xlsx')
for index, row in df.iterrows():
row_dict = row.to_dict()
relations_dict[row_dict['ASIN']] = row_dict['SKU']
return relations_dict
def export_list():
# 访问网页
open_url("https://vendorcentral.amazon.com/hz/vendor/members/returns?ref_=vc_xx_subNav")
# 导出退货单
page.ele("#file-download-button").click.to_download().wait()
return pd.read_excel('Return_Summary.xls', engine='xlrd')
def export_item(return_id):
returns_dir = "returns"
helper.make_dir(returns_dir)
file_name = f"{returns_dir}\\{return_id}.xls"
if not os.path.isfile(file_name):
# 打开退回详情下载明细
open_url(f"https://vendorcentral.amazon.com/katalmonsapp/vendor/members/returns/{return_id}")
page.ele("#file-download-button").click.to_download(rename=file_name).wait()
# 读取回退商品详情
return pd.read_excel(file_name, engine='xlrd')
def main(): def main():
# 读取asin和sku映射关系 # 读取asin和sku映射关系
relations_dict = asin_sku_relations() relations_dict = asin_sku_relations()
...@@ -84,42 +118,6 @@ def main(): ...@@ -84,42 +118,6 @@ def main():
page.close() page.close()
def asin_sku_relations():
relations_dict = {}
# 读取ASIN和sku映射关系
df = pd.read_excel('relations.xlsx')
for index, row in df.iterrows():
row_dict = row.to_dict()
relations_dict[row_dict['ASIN']] = row_dict['SKU']
return relations_dict
def export_list():
# 访问网页
open_url("https://vendorcentral.amazon.com/hz/vendor/members/returns?ref_=vc_xx_subNav")
# 导出退货单
page.ele("#file-download-button").click.to_download()
page.download.wait()
return pd.read_excel('Return_Summary.xls', engine='xlrd')
def export_item(return_id):
returns_dir = "returns"
helper.make_dir(returns_dir)
file_name = f"{returns_dir}\\{return_id}.xls"
if not os.path.isfile(file_name):
# 打开退回详情下载明细
open_url(f"https://vendorcentral.amazon.com/katalmonsapp/vendor/members/returns/{return_id}")
page.ele("#file-download-button").click.to_download(rename=file_name)
page.download.wait()
# 读取回退商品详情
return pd.read_excel(file_name, engine='xlrd')
if __name__ == '__main__': if __name__ == '__main__':
try: try:
email = helper.get_input_with_default("请输入账户", "us-cs001@khdtek.com") email = helper.get_input_with_default("请输入账户", "us-cs001@khdtek.com")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment