Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
amazon_reports
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
common
amazon_reports
Commits
e61bb4a6
Commit
e61bb4a6
authored
Oct 18, 2024
by
邱阿朋
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
回款数据
parent
7690d798
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
203 additions
and
39 deletions
+203
-39
excel.py
helper/excel.py
+27
-0
file.py
helper/file.py
+5
-5
invoices.py
invoices.py
+171
-34
No files found.
helper/excel.py
View file @
e61bb4a6
# coding: utf-8
import
re
import
pandas
as
pd
import
xlrd
from
openpyxl.reader.excel
import
load_workbook
...
...
@@ -46,3 +48,28 @@ def save_xls(data, output_file, sheet_name='Sheet1', adjusted=True):
ws
.
column_dimensions
[
column_letter
]
.
width
=
adjusted_width
wb
.
save
(
output_file
)
def
remove_last_comma
(
csv_file
,
skip_rows
=
2
):
# 创建一个空列表用于存储处理后的行
cleaned_lines
=
[]
# 读取原始 CSV 文件并处理行末的逗号
with
open
(
csv_file
,
'r'
,
encoding
=
'utf-8'
)
as
file
:
# 跳过指定数量的行
for
_
in
range
(
skip_rows
):
next
(
file
)
# 跳过每一行
for
line
in
file
:
# 使用正则表达式替换 空格 + 数字 + 引号
cleaned_line
=
re
.
sub
(
r'(\s\d+)"'
,
r'\1 '
,
line
)
# 去掉空格 + 数字后面的引号
# 使用正则表达式替换每个逗号前的空格为引号
cleaned_line
=
re
.
sub
(
r'\s+,\s*"'
,
r'", "'
,
cleaned_line
)
# 去掉末尾的逗号和换行符
cleaned_line
=
cleaned_line
.
rstrip
(
',
\n
'
)
# 不添加换行符,待会写入时统一处理
cleaned_lines
.
append
(
cleaned_line
)
# 将处理后的数据写入同一个文件
with
open
(
csv_file
,
'w'
,
encoding
=
'utf-8'
,
newline
=
''
)
as
cleaned_file
:
cleaned_file
.
write
(
'
\n
'
.
join
(
cleaned_lines
)
+
'
\n
'
)
# 使用 join 处理换行符
helper/file.py
View file @
e61bb4a6
...
...
@@ -3,16 +3,16 @@ import os
import
time
def
wait_for_downloads
(
download_dir
,
timeout
=
60
):
def
wait_for_downloads
(
file_name
,
timeout
=
60
):
"""
监控下载
目录
,等待新文件下载完成。
:param
download_dir
: 文件下载目录
监控下载
文件
,等待新文件下载完成。
:param
file_name
: 文件下载目录
:param timeout: 超时时间,单位:秒
"""
end_time
=
time
.
time
()
+
timeout
while
time
.
time
()
<
end_time
:
files
=
os
.
listdir
(
download_dir
)
if
files
:
# 如果文件夹内有文件
files
=
os
.
path
.
isfile
(
file_name
)
if
files
:
return
True
time
.
sleep
(
1
)
return
False
...
...
invoices.py
View file @
e61bb4a6
# coding: utf-8
# 回款明细
import
os
import
re
import
time
import
urllib.parse
import
warnings
import
pandas
as
pd
from
DrissionPage
import
ChromiumPage
from
DrissionPage.errors
import
PageDisconnectedError
,
ElementNotFoundError
from
lxml
import
etree
from
helper
import
helper
,
excel
from
helper
import
helper
,
excel
,
file
page
=
ChromiumPage
()
page
.
set
.
load_mode
.
normal
()
...
...
@@ -27,13 +30,14 @@ warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
def
export_list_read_data
():
file_name
=
'Payments.xlsx'
if
not
os
.
path
.
isfile
(
file_name
):
page
.
get
(
f
"https://vendorcentral.amazon.com/hz/vendor/members/remittance/home"
)
page
.
ele
(
"#remittance-home-select-all"
)
.
click
()
page
.
ele
(
"#remittance-home-export-link"
)
.
click
.
to_download
()
.
wait
()
page
.
get
(
f
"https://vendorcentral.amazon.com/hz/vendor/members/remittance/home"
,
timeout
=
3
)
page
.
ele
(
"#remittance-home-select-all"
,
timeout
=
2
)
.
click
()
page
.
ele
(
"#remittance-home-export-link"
,
timeout
=
2
)
.
click
.
to_download
()
file
.
wait_for_downloads
(
file_name
)
df
=
pd
.
read_excel
(
file_name
,
skiprows
=
22
)
# 定义正则表达式模式,匹配包含 'Price' 或 'PCR' 或 'XXXXXXXX/XXXX/' 的描述
pattern
=
r'Price
|PCR|Missed|Shortage
|^[A-Z0-9]{8}/[A-Z0-9]{4}/'
pattern
=
r'Price
Claim|PCR|Missed Adjustment|Shortage Claim
|^[A-Z0-9]{8}/[A-Z0-9]{4}/'
# 过滤符合条件的行
return
df
[
df
[
'Description'
]
.
str
.
contains
(
pattern
,
na
=
False
,
regex
=
True
)]
...
...
@@ -41,18 +45,23 @@ def export_list_read_data():
def
export_details_read_data
(
invoice_number
):
# 读取详情内容
file_name
=
f
"invoices
\\
{invoice_number}.csv"
if
not
os
.
path
.
isfile
(
file_name
):
try
:
params
=
{
"invoiceNumber"
:
invoice_number
,
"payeeCode"
:
"VECET"
,
"activeTab"
:
"lineItems"
,
"invoiceNumber"
:
invoice_number
,
}
# 将字典转换为 URL 查询参数
query_string
=
urllib
.
parse
.
urlencode
(
params
)
page
.
get
(
f
"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-details?"
+
query_string
)
try
:
page
.
ele
(
"#line-items-export-to-spreadsheet-announce"
)
.
click
.
to_download
(
rename
=
file_name
)
.
wait
()
if
not
os
.
path
.
isfile
(
file_name
):
page
.
ele
(
"#line-items-export-to-spreadsheet-announce"
,
timeout
=
2
)
.
click
.
to_download
(
rename
=
file_name
)
file
.
wait_for_downloads
(
file_name
)
excel
.
remove_last_comma
(
file_name
)
except
ElementNotFoundError
:
print
(
"导出按钮不存在刷新网页"
)
page
.
refresh
()
...
...
@@ -61,52 +70,180 @@ def export_details_read_data(invoice_number):
if
not
os
.
path
.
isfile
(
file_name
):
export_details_read_data
(
invoice_number
)
return
pd
.
read_csv
(
file_name
,
skiprows
=
2
,
engine
=
'python'
,
on_bad_lines
=
'skip'
)
return
pd
.
read_csv
(
file_name
)
def
get_content
(
tree
,
row_index
:
int
,
cell_index
:
int
)
->
str
:
"""获取指定行和列的内容,如果没有找到,则返回 None。"""
content
=
tree
.
xpath
(
f
'//*[@role="row"][{row_index}]/*[@role="cell"][{cell_index}]/text()'
)
return
content
[
0
]
if
content
else
None
def
get_po_code
(
index
,
po_id
)
->
dict
:
result
=
{
"index"
:
index
,
"po_id"
:
po_id
}
page
.
get
(
f
"https://vendorcentral.amazon.com/po/vendor/members/po-mgmt/order?poId={po_id}"
,
timeout
=
3
)
po_table
=
page
.
ele
(
"#po-header"
,
timeout
=
2
)
.
html
# 使用 lxml 解析 HTML
tree
=
etree
.
HTML
(
po_table
)
# 获取 Vendor 内容
result
[
"vendor"
]
=
get_content
(
tree
,
2
,
2
)
# 正则表达式查找数字和%之间的内容
match
=
re
.
search
(
r'Payment Terms.*?(\d+
%
)'
,
po_table
)
if
match
:
result
[
"payment_terms"
]
=
match
.
group
(
1
)[:
-
1
]
# 去掉%
else
:
result
[
"payment_terms"
]
=
None
return
result
def
price_extract_data
(
html_content
):
# 使用 lxml 解析 HTML
tree
=
etree
.
HTML
(
html_content
)
# 提取所有行的数据
rows
=
tree
.
xpath
(
'//tr[contains(@class, "mt-row")]'
)
data_list
=
[]
for
row
in
rows
:
# 确保在提取数据之前定义data为字典
data
=
{
'PO_NUMBER'
:
row
.
xpath
(
'./td[@data-column="PO_NUMBER"]/span/span/a/text()'
),
'ASIN'
:
row
.
xpath
(
'./td[@data-column="ASIN"]/span/span/a/text()'
),
'EXTERNAL_ID'
:
row
.
xpath
(
'./td[@data-column="EXTERNAL_ID"]/span/span/a/text()'
),
'TITLE'
:
row
.
xpath
(
'./td[@data-column="TITLE"]/text()'
),
'QUANTITY'
:
row
.
xpath
(
'./td[@data-column="QUANTITY"]/text()'
),
'INVOICE_COST'
:
row
.
xpath
(
'./td[@data-column="INVOICE_COST"]/text()'
),
'PO_COST'
:
row
.
xpath
(
'./td[@data-column="PO_COST"]/text()'
),
'INITIAL_RESEARCH_COST'
:
row
.
xpath
(
'./td[@data-column="INITIAL_RESEARCH_COST"]/text()'
),
'RESOLUTION_DECISION'
:
row
.
xpath
(
'./td[@data-column="RESOLUTION_DECISION"]/text()'
),
'RESOLUTION_COST'
:
row
.
xpath
(
'./td[@data-column="RESOLUTION_COST"]/text()'
)
}
# 使用.get()方法安全地获取列表中的第一个元素
for
key
in
data
:
if
data
[
key
]:
# 检查列表是否非空
data
[
key
]
=
data
[
key
][
0
]
.
strip
()
# 访问第一个元素并去除空格
else
:
data
[
key
]
=
None
# 或者赋值为空字符串或其他适当的默认值
data_list
.
append
(
data
)
return
data_list
def
click_get_price_data
():
try
:
# 获取 Amounts 表格html
page_html
=
page
.
ele
(
".a-column a-span4"
,
timeout
=
2
)
.
html
# 使用 lxml 解析 HTML
tree
=
etree
.
HTML
(
page_html
)
# 使用 XPath 查找第三个 span class="a-color-base invoice-property-field"
price_variance_amount
=
tree
.
xpath
(
'(//div[@class="a-column a-span4"]//span[@class="a-color-base invoice-property-field"])[3]/text()'
)
# 检查内容是否有效
if
price_variance_amount
and
price_variance_amount
[
0
]
.
strip
()
!=
"-"
:
page
.
ele
(
"#pd"
,
timeout
=
2
)
.
click
()
print
(
"等在加载数据,15秒后执行"
)
time
.
sleep
(
15
)
table_html
=
page
.
ele
(
".a-bordered a-horizontal-stripes mt-table"
,
timeout
=
2
)
.
html
return
price_extract_data
(
table_html
)
except
ElementNotFoundError
:
page
.
refresh
()
click_get_price_data
()
def
handle_price_data
(
price_data
,
detail_data
):
result
=
None
"""处理争议数据"""
for
_
,
price
in
price_data
:
if
price
[
'ASIN'
]
==
detail_data
[
'ASIN'
]:
result
=
detail_data
.
copy
()
result
[
'Quantity received'
]
=
price
[
'Quantity'
]
result
[
'Amount'
]
=
price
[
'RESOLUTION_COST'
]
if
price
[
'RESOLUTION_DECISION'
]
==
"Approved"
:
result
[
'Shortage quantity'
]
=
0
else
:
result
[
'Shortage quantity'
]
=
1
break
return
result
def
handle_data
(
detail_datum
,
vendor
,
deduction_points
):
"""处理正常数据"""
amount
=
detail_datum
.
get
(
'Amount'
,
'$0.00'
)
# 默认值设为 '$0.00' 以避免错误
amount
=
float
(
amount
.
replace
(
'$'
,
''
)
.
replace
(
','
,
''
))
# 如果是0则回款完成
is_finished
=
"否"
if
detail_datum
.
get
(
'Shortage quantity'
,
-
1
)
==
0
:
is_finished
=
"是"
# 计算扣除后的金额
amount_after_deduction
=
amount
-
(
amount
*
(
deduction_points
/
100
))
# 复制原始行数据,避免直接修改
record
=
detail_datum
.
copy
()
record
[
"IsFinished"
]
=
is_finished
record
[
"DeductionPoints"
]
=
f
"{deduction_points}
%
"
# 拼接百分号
record
[
"Code"
]
=
vendor
record
[
"AmountAfterDeduction"
]
=
amount_after_deduction
return
record
def
main
():
list_data
=
export_list_read_data
()
excel
.
save_xls
(
list_data
,
"回款数据.xlsx"
,
"Remittance payments"
)
print
(
f
"共计:{len(list_data)} 订单"
)
all_normal_pay_data
=
[]
all_price_pay_data
=
[]
i
=
0
for
_
,
data
in
list_data
.
iterrows
():
i
+=
1
invoice_number
=
data
.
get
(
"Invoice Number"
)
description
=
data
.
get
(
"Description"
)
if
"Price"
in
description
or
"PCR"
in
description
or
"Missed"
in
description
or
"Shortage"
in
description
:
# 获取前8位
# 取订单前8位后面的没用
invoice_number
=
invoice_number
[:
8
]
print
(
invoice_number
)
# invoice_search_page = page.new_tab(f"https://vendorcentral.amazon.com/hz/vendor/members/inv-mgmt/invoice-po-search?searchByNumberToken={invoice_number}")
# invoice_search_page.close()
# 获取当前订单的Payee和优惠比例
vendor_payment_terms
=
get_po_code
(
i
,
invoice_number
)
time
.
sleep
(
1
)
vendor
=
vendor_payment_terms
[
'vendor'
]
deduction_points
=
int
(
vendor_payment_terms
[
'payment_terms'
])
# 下载excel文件并读取数据
detail_data
=
export_details_read_data
(
invoice_number
)
# 获取争议数据
price_data
=
click_get_price_data
()
# 初始化列表存储新字段数据
normal_pay_data
=
[]
price_pay_data
=
[]
for
index
,
detail_datum
in
detail_data
.
iterrows
():
amount
=
detail_datum
.
get
(
'Amount'
,
0
)
# 使用默认值 0 防止 None
code
=
"VECET"
deduction_points
=
1
# 正常回款数据
success_data
=
handle_data
(
detail_datum
,
vendor
,
deduction_points
)
# 计算扣除后的金额
amount_after_deduction
=
amount
-
(
amount
*
deduction_points
/
100
)
# 复制原始行数据,避免直接修改
new_record
=
detail_datum
.
copy
()
new_record
[
"IsFinished"
]
=
"是"
new_record
[
"DeductionPoints"
]
=
f
"{deduction_points}
%
"
# 拼接百分号
new_record
[
"Code"
]
=
code
new_record
[
"AmountAfterDeduction"
]
=
amount_after_deduction
# 争议回款
price_data
=
handle_price_data
(
price_data
,
detail_datum
)
price_data
=
handle_data
(
price_data
,
vendor
,
deduction_points
)
# 将处理后的记录添加到临时列表
normal_pay_data
.
append
(
new_record
)
normal_pay_data
.
append
(
success_data
)
# 将处理后的记录添加到临时列表
price_pay_data
.
append
(
new_record
)
price_pay_data
.
append
(
price_data
)
# 添加到汇总列表
all_normal_pay_data
.
append
(
pd
.
DataFrame
(
normal_pay_data
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment