Commit bcecc30e authored by 邱阿朋's avatar 邱阿朋

添加YcClient类及商品管理功能

parent 1b191ee6
......@@ -5,4 +5,6 @@ pandas==2.0.3
openpyxl==3.1.5
pyinstaller==6.11.0
requests==2.32.3
redis==5.0.8
\ No newline at end of file
redis==5.0.8
pika==1.3.2
xmltodict==0.14.2
\ No newline at end of file
# coding: utf-8
import json
import requests
import xmltodict
class YcClient:
@classmethod
def __init__(cls, app_key, app_token):
cls.app_key = app_key
cls.app_token = app_token
cls.base_url = "http://8.210.223.221/default/svc/web-service"
@classmethod
def call_service(cls, service, params_json):
"""
调用 SOAP 服务的封装函数。
:param params_json: 请求的数据内容,Python 字典
:param service: 要调用的接口方法
:return: 响应内容
"""
# 构造 SOAP 请求的 XML 数据
payload = f"""<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ns1="http://www.example.org/Ec/">
<SOAP-ENV:Body>
<ns1:callService>
<appToken>{cls.app_token}</appToken>
<appKey>{cls.app_key}</appKey>
<service>{service}</service>
<paramsJson>{json.dumps(params_json)}</paramsJson>
</ns1:callService>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
# 发送请求
response = requests.post(cls.base_url, data=payload, headers={"Content-Type": "text/xml;charset=UTF-8"})
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"HTTP 错误: {response.status_code}, 内容: {response.text}")
# 将 XML 转换为字典
response_dict = xmltodict.parse(response.text)
resp = (response_dict.get("SOAP-ENV:Envelope", {}).
get("SOAP-ENV:Body", {}).
get("ns1:callServiceResponse", {}).
get('response', {}))
json_res = resp
# 如果 resp 是字符串,则解析为 JSON 字典
if isinstance(resp, str):
json_res = json.loads(resp)
if json_res.get("ask", "Failure") != "Success":
raise Exception(json_res.get("message", "未知错误"))
return json_res.get("data", {})
@classmethod
def get_products(cls, sku: str) -> dict:
"""
获取商品列表
:param sku: 商品编码
"""
params_json = {
"pageSize": 10,
"page": 1,
"product_sku": sku,
}
return cls.call_service("getProductList", params_json)
@classmethod
def create_asn(cls):
"""
创建入库单
"""
params_json = {
"reference_no": "dfdfd1399866764",
"warehouse_code": "HRBW",
"items": [
{
"product_sku": "EA140509201610",
"quantity": 10,
"box_no": 1,
}
]
}
return cls.call_service("createAsn", params_json)
if __name__ == '__main__':
try:
# 配置公共参数
key = "c906cc46bda8cea593c0b6b20eadb1f2"
token = "2832175c4ee5c6efd79e129e919f2dfd"
client = YcClient(key, token)
resp = client.get_products("KHD-CC-RF05-BLK")
for item in resp:
print(item.get('product_title'))
except Exception as e:
print(e)
finally:
print("程序结束")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment