Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
A
amazon_reports
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
common
amazon_reports
Commits
c7f4617e
Commit
c7f4617e
authored
Dec 17, 2024
by
邱阿朋
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
回款
parent
f947f988
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
138 additions
and
51 deletions
+138
-51
excel.py
src/helper/excel.py
+3
-2
file.py
src/helper/file.py
+25
-0
redisx.py
src/helper/redisx.py
+1
-1
payment.py
src/payment.py
+109
-48
No files found.
src/helper/excel.py
View file @
c7f4617e
...
...
@@ -70,6 +70,7 @@ def remove_last_comma(csv_file, skip_rows=2):
# 不添加换行符,待会写入时统一处理
cleaned_lines
.
append
(
cleaned_line
)
# 将处理后的数据
写入同一个
文件
# 将处理后的数据
逐行写回
文件
with
open
(
csv_file
,
'w'
,
encoding
=
'utf-8'
,
newline
=
''
)
as
cleaned_file
:
cleaned_file
.
write
(
'
\n
'
.
join
(
cleaned_lines
)
+
'
\n
'
)
# 使用 join 处理换行符
for
line
in
cleaned_lines
:
cleaned_file
.
write
(
line
+
'
\n
'
)
# 每一行单独写入,确保每行独立处理
src/helper/file.py
View file @
c7f4617e
# coding: utf-8
import
os
import
shutil
import
time
...
...
@@ -25,3 +26,27 @@ def make_dir(path):
return
False
return
True
def
move_file
(
src
,
dest
):
"""
将文件从 src 移动到 dest。
:param src: 源文件路径
:param dest: 目标文件路径
:return: None
"""
try
:
# 检查源文件是否存在
if
not
os
.
path
.
exists
(
src
):
raise
FileNotFoundError
(
f
"源文件 {src} 不存在"
)
# 检查目标文件夹是否存在,如果不存在则创建
dest_dir
=
os
.
path
.
dirname
(
dest
)
if
not
os
.
path
.
exists
(
dest_dir
):
os
.
makedirs
(
dest_dir
)
# 使用 shutil.move 来移动文件
shutil
.
move
(
src
,
dest
)
except
Exception
as
e
:
print
(
f
"移动文件时出错: {e}"
)
\ No newline at end of file
src/helper/redisx.py
View file @
c7f4617e
...
...
@@ -10,7 +10,7 @@ class RedisClient:
cls
.
_instance
.
_initialize
(
*
args
,
**
kwargs
)
return
cls
.
_instance
def
_initialize
(
self
,
host
=
'
39.108.185.244
'
,
port
=
6379
,
db
=
12
,
password
=
"khd2022!"
,
decode_responses
=
True
):
def
_initialize
(
self
,
host
=
'
47.119.182.76
'
,
port
=
6379
,
db
=
12
,
password
=
"khd2022!"
,
decode_responses
=
True
):
self
.
client
=
redis
.
StrictRedis
(
host
=
host
,
port
=
port
,
db
=
db
,
password
=
password
,
decode_responses
=
decode_responses
)
...
...
src/payment.py
View file @
c7f4617e
...
...
@@ -12,6 +12,7 @@ import pandas as pd
from
DrissionPage
import
ChromiumPage
from
DrissionPage.errors
import
ElementNotFoundError
from
lxml
import
etree
from
numpy.ma.core
import
append
from
helper
import
helper
,
excel
,
file
,
domain
,
logger
,
redisx
...
...
@@ -148,7 +149,7 @@ def get_po_code(index, po_id) -> dict:
# 获取 Vendor 内容
result
[
"vendor"
]
=
get_content
(
tree
,
2
,
2
)
# 正则表达式查找数字和%之间的内容
match
=
re
.
search
(
r'Payment
Terms
.*?(\d+
%
)'
,
po_table
)
match
=
re
.
search
(
r'Payment .*?(\d+
%
)'
,
po_table
)
if
match
:
result
[
"payment_terms"
]
=
match
.
group
(
1
)[:
-
1
]
# 去掉%
else
:
...
...
@@ -199,41 +200,86 @@ def price_extract_data(html_content):
return
data_list
def
click_get_price_data
():
while
True
:
try
:
# 点击争议价tab
page
.
ele
(
"#pd"
)
.
click
()
log
.
debug
(
"等待争议数据加载,5秒后获取表单数据"
)
page
.
wait
(
5
)
table_html
=
page
.
ele
(
"#priceDiscrepancyWithDMSGridForm"
,
timeout
=
5
)
.
html
# 抓取表单数据
price_data
=
price_extract_data
(
table_html
)
return
price_data
except
ElementNotFoundError
:
log
.
warning
(
"未获取到表数据"
)
break
def
line_items_data
(
html_content
):
# 使用正则表达式删除所有 HTML 注释
html_content
=
re
.
sub
(
r'<!--.*?-->'
,
''
,
html_content
)
# 使用 lxml 解析 HTML
tree
=
etree
.
HTML
(
html_content
)
# 提取所有行的数据
rows
=
tree
.
xpath
(
'//tr[contains(@class, "mt-row")]'
)
data_list
=
[]
for
row
in
rows
:
# 定义 data 字典,提取并去除多余字符
data
=
{
'PO'
:
row
.
xpath
(
'string(./td[@data-column="PO_NUMBER"]/span/span/a)'
)
.
strip
(),
'External ID'
:
row
.
xpath
(
'string(./td[@data-column="EXTERNAL_ID"])'
)
.
strip
(),
'ASIN'
:
row
.
xpath
(
'string(./td[@data-column="ASIN"]/span/span/a)'
)
.
strip
(),
'TITLE'
:
row
.
xpath
(
'string(./td[@data-column="DESCRIPTION"])'
)
.
strip
(),
'Model'
:
row
.
xpath
(
'string(./td[@data-column="MODEL_NUMBER"])'
)
.
strip
(),
'Freight Term'
:
row
.
xpath
(
'string(./td[@data-column="FREIGHT_TERM"])'
)
.
strip
(),
'Qty'
:
row
.
xpath
(
'string(./td[@data-column="QUANTITY"])'
)
.
strip
(),
'Unit Cost'
:
row
.
xpath
(
'string(./td[@data-column="UNIT_COST"])'
)
.
strip
(),
'Amount'
:
row
.
xpath
(
'string(./td[@data-column="TOTAL_AMOUNT"])'
)
.
strip
(),
'Shortage quantity'
:
row
.
xpath
(
'string(./td[@data-column="SHORTAGE_QUANTITY"])'
)
.
strip
(),
'Amount shortage'
:
row
.
xpath
(
'string(./td[@data-column="SHORTAGE_AMOUNT"])'
)
.
strip
(),
'Last received date'
:
row
.
xpath
(
'string(./td[@data-column="LAST_RECEIVED_DATE"])'
)
.
strip
(),
'ASIN received'
:
row
.
xpath
(
'string(./td[@data-column="RECEIVED_ASIN"])'
)
.
strip
(),
'Quantity received'
:
row
.
xpath
(
'string(./td[@data-column="RECEIVED_QUANTITY"])'
)
.
strip
(),
'Unit cost'
:
row
.
xpath
(
'string(./td[@data-column="RECEIVED_COST_PRICE"])'
)
.
strip
(),
'Amount received'
:
row
.
xpath
(
'string(./td[@data-column="RECEIVED_AMOUNT"])'
)
.
strip
(),
}
# 如果字段为空则设为空字符串
for
key
in
data
:
if
not
data
[
key
]:
data
[
key
]
=
""
# 将 None 转为 ""
data_list
.
append
(
data
)
return
data_list
def
calculate_unit_cost
(
data_list
):
"""计算差异金额单价并返回金额"""
unit_cost_float
=
Decimal
(
data_list
[
'INVOICE_COST'
])
-
Decimal
(
data_list
[
'INITIAL_RESEARCH_COST'
])
unit_cost
=
unit_cost_float
*
int
(
data_list
[
'QUANTITY'
])
return
unit_cost_float
,
float
(
f
"{unit_cost:.2f}"
)
def
handle_price_data
(
price_data_list
,
invoice_amount
):
result
=
{}
invoice_amount
=
abs
(
float
(
f
"{invoice_amount:.2f}"
))
"""处理争议数据"""
total_price_data_amount
=
0
# 计算总金额
for
data_list
in
price_data_list
:
# 计算差异金额单价
unit_cost_float
=
Decimal
(
data_list
[
'INVOICE_COST'
])
-
Decimal
(
data_list
[
'INITIAL_RESEARCH_COST'
])
unit_cost
=
unit_cost_float
*
int
(
data_list
[
'QUANTITY'
])
amount
=
float
(
f
"{unit_cost:.2f}"
)
if
amount
==
invoice_amount
:
result
=
data_list
result
[
'Quantity received'
]
=
data_list
[
'QUANTITY'
]
result
[
'UnitCost'
]
=
unit_cost_float
result
[
'Amount'
]
=
f
"${amount:.2f}"
if
data_list
[
'RESOLUTION_DECISION'
]
==
"Approved"
:
result
[
'Shortage quantity'
]
=
0
else
:
result
[
'Shortage quantity'
]
=
1
break
unit_cost_float
,
amount
=
calculate_unit_cost
(
data_list
)
total_price_data_amount
+=
amount
result
=
[]
invoice_amount
=
abs
(
float
(
f
"{invoice_amount:.2f}"
))
# 如果总金额等于列表金额则已回款
if
total_price_data_amount
==
invoice_amount
:
for
data_list
in
price_data_list
:
unit_cost_float
,
amount
=
calculate_unit_cost
(
data_list
)
data
=
data_list
.
copy
()
data
[
'Quantity received'
]
=
data_list
[
'QUANTITY'
]
data
[
'UnitCost'
]
=
unit_cost_float
data
[
'Amount'
]
=
f
"${amount:.2f}"
data
[
'Shortage quantity'
]
=
'0'
result
.
append
(
data
)
else
:
for
data_list
in
price_data_list
:
unit_cost_float
,
amount
=
calculate_unit_cost
(
data_list
)
if
amount
==
invoice_amount
:
data
=
data_list
.
copy
()
data
[
'Quantity received'
]
=
data_list
[
'QUANTITY'
]
data
[
'UnitCost'
]
=
unit_cost_float
data
[
'Amount'
]
=
f
"${amount:.2f}"
data
[
'Shortage quantity'
]
=
'0'
if
data_list
[
'RESOLUTION_DECISION'
]
==
"Approved"
else
'1'
result
.
append
(
data
)
break
return
result
...
...
@@ -244,7 +290,8 @@ def handle_data(detail_datum, vendor, deduction_points):
amount
=
float
(
amount
.
replace
(
'$'
,
''
)
.
replace
(
','
,
''
))
# 如果是0则回款完成
is_finished
=
"否"
if
detail_datum
.
get
(
'Shortage quantity'
,
-
1
)
==
0
:
shortage_quantity
=
detail_datum
.
get
(
'Shortage quantity'
,
-
1
)
if
shortage_quantity
==
'0'
:
is_finished
=
"是"
amount_after_deduction
=
amount
...
...
@@ -268,7 +315,7 @@ def main():
# list_data = list_data[25:]
# 获取当前日期和时间并格式化
current_datetime
=
datetime
.
now
()
.
strftime
(
'
%
Y
-
%
m-
%
d_
%
H-
%
M'
)
# 格式化为 'YYYY-MM-DD_HH-MM-SS'
current_datetime
=
datetime
.
now
()
.
strftime
(
'
%
Y
%
m
%
d
%
H
%
M'
)
# 格式化为 'YYYY-MM-DD_HH-MM-SS'
# 原文件名
file_name
=
"回款数据.xlsx"
# 拼接新的文件名
...
...
@@ -304,30 +351,44 @@ def main():
else
:
# 进入详情页
invoice_details
(
invoice_number
,
last_two
,
last_three
)
# 获取争议数据
price_data
=
click_get_price_data
()
# 点击争议价tab
page
.
ele
(
"#pd"
)
.
click
()
log
.
debug
(
"等待争议数据加载,10秒后获取表单数据"
)
page
.
wait
(
10
)
table_html
=
page
.
ele
(
"#priceDiscrepancyWithDMSGridForm"
,
timeout
=
5
)
.
html
# 抓取表单数据
price_data
=
price_extract_data
(
table_html
)
# 缓存数据
rdb
.
get_client
()
.
hset
(
cache_key
,
invoice_number
,
json
.
dumps
(
price_data
))
# 争议回款
handle_after_price_data
=
handle_price_data
(
price_data
,
invoice_amount
)
format_price_data
=
handle_data
(
handle_after_price_data
,
vendor
,
deduction_points
)
all_price_pay_data
.
append
(
pd
.
DataFrame
(
format_price_data
,
index
=
[
0
]))
price_data
=
handle_price_data
(
price_data
,
invoice_amount
)
price_pay_data
=
[]
for
detail_datum
in
price_data
:
# 争议回款数据
format_price_data
=
handle_data
(
detail_datum
,
vendor
,
deduction_points
)
# 将处理后的记录添加到临时列表
price_pay_data
.
append
(
format_price_data
)
# 添加到汇总列表
all_price_pay_data
.
append
(
pd
.
DataFrame
(
price_pay_data
))
else
:
file_name
=
f
"payment
\\
{invoice_number}.csv"
if
os
.
path
.
isfile
(
file_name
):
detail_data
=
pd
.
read_csv
(
file_name
)
cache_key
=
"item_data"
detail_data
=
rdb
.
get_client
()
.
hget
(
cache_key
,
invoice_number
)
if
detail_data
:
detail_data
=
json
.
loads
(
detail_data
)
else
:
# 进入详情页
invoice_details
(
invoice_number
,
last_two
,
last_three
)
# 下载excel文件并读取数据
detail_data
=
export_details_read_data
(
file_name
)
if
detail_data
is
None
:
log
.
error
(
"数据存在问题,请手动处理"
)
continue
page
.
wait
(
3
)
table_html
=
page
.
ele
(
"#invoiceLineItems"
,
timeout
=
5
)
.
html
# 抓取表单数据
detail_data
=
line_items_data
(
table_html
)
# 缓存数据
rdb
.
get_client
()
.
hset
(
cache_key
,
invoice_number
,
json
.
dumps
(
detail_data
))
# 初始化列表存储新字段数据
normal_pay_data
=
[]
for
index
,
detail_datum
in
detail_data
.
iterrows
()
:
for
detail_datum
in
detail_data
:
# 正常回款数据
success_data
=
handle_data
(
detail_datum
,
vendor
,
deduction_points
)
# 将处理后的记录添加到临时列表
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment