Commit a46559b1 authored by 邱阿朋's avatar 邱阿朋

refactor(rabbitmq): 优化 RabbitMQ 客户端并更新数据处理逻辑

- 修复 RabbitMQ 客户端初始化和连接逻辑
- 优化队列声明和绑定过程
- 更新 return_goods.py 中的数据处理逻辑
- 添加组别信息到推送数据中- 优化日期处理,确保正确格式化
parent a8ee314b
......@@ -3,7 +3,7 @@ import json
class RabbitMQClient:
def __init__(self, host='localhost',port='5672', username='guest', password='guest'):
def __init__(self, host='localhost', port='5672', username='guest', password='guest'):
"""
初始化 RabbitMQ 客户端,创建连接和通道。
:param host: RabbitMQ 主机地址
......@@ -30,7 +30,7 @@ class RabbitMQClient:
try:
credentials = pika.PlainCredentials(self.username, self.password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.host,port=self.port, credentials=credentials
host=self.host, port=self.port, credentials=credentials
))
self.channel = self.connection.channel()
print(f"Initialized connection to RabbitMQ at {self.host}.")
......@@ -52,8 +52,8 @@ class RabbitMQClient:
self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
# 死信交换机和路由键配置
dead_letter_exchange = "dead_letter_exchange" # 死信交换机名称
dead_letter_routing_key = "dead_letter" # 死信路由键
dead_letter_exchange = queue + "_dead_letter_exchange" # 死信交换机名称
dead_letter_routing_key = queue + "_dead_letter" # 死信路由键
# 队列声明的参数
queue_arguments = {
......@@ -62,7 +62,7 @@ class RabbitMQClient:
}
# 声明队列
self.channel.queue_declare(queue=queue, durable=True,arguments=queue_arguments)
self.channel.queue_declare(queue=queue, durable=True, arguments=queue_arguments)
# 绑定队列到交换机(如果有)
if exchange:
......@@ -115,4 +115,4 @@ class RabbitMQClient:
"""关闭连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
print("Connection closed.")
\ No newline at end of file
print("Connection closed.")
......@@ -81,18 +81,20 @@ def main():
for _, item_row in item_data_result.iterrows():
relation = relations_dict.get(item_row.get('ASIN'))
erp_sku = relation.get('erp_sku')
erp_sku = relation.get('erp_sku', "")
data_dict = data.to_dict()
data_dict.update({
'Return Date': data_dict['Return Date'].strftime('%m/%d/%Y'),
'Return ID': str(data_dict['Return ID']),
'PO': item_row.get('Purchase order'),
'ASIN': item_row.get('ASIN'),
'SKU': erp_sku if erp_sku is not None else "",
'Quantity': item_row.get('Quantity'),
'PO': item_row.get('Purchase order', ""),
'ASIN': item_row.get('ASIN', ""),
'SKU': erp_sku,
'Quantity': item_row.get('Quantity', 0),
# 替换回会数量和金额为详情里面的值
'Return quantity': item_row.get('Quantity'), # 替换回会数量
'Total cost': item_row.get('Total amount') # 替换金额
'Return quantity': item_row.get('Quantity', 0), # 替换回会数量
'Total cost': item_row.get('Total amount', 0), # 替换金额
'Group Name': relation.get("name", ""),
'Group Code': relation.get("code", ""),
})
# 追加数据
new_list_data.append(data_dict)
......@@ -110,10 +112,16 @@ def main():
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient(host='47.107.31.4', port=15672, username='khd_rabbitmq', password='KHDrq2024%,,')
rabbit = rabbitmq.RabbitMQClient(host='172.18.218.11', port=5672, username='test', password='khd123456')
# rabbit = rabbitmq.RabbitMQClient(host='47.107.31.4', port=5672, username='khd_rabbitmq', password='KHDrq2024%,,')
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name)
# 使用 format='%m/%d/%Y' 解析日期
data['Return Date'] = pd.to_datetime(data['Return Date'], format='%m/%d/%Y')
# 将日期格式化为 'YYYY-MM-DD' 字符串,去掉时间部分
data['Return Date'].dt.strftime('%Y-%m-%d')
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID', ''),
......@@ -121,11 +129,13 @@ def push_data_queue(file_name):
'order_no': item_row.get('Purchase order', ''), # 订单号
'sku_quantity': item_row.get('Quantity', 0), # 退回数量
'sku_amount': item_row.get('Total amount', 0), # Total cost
'currency': data.get('Currency code', ''), # Currency code
'data_date': data.get('Return Date').strftime('%Y-%m-%d'), # Return Date
'currency': item_row.get('Currency code', ''), # Currency code
'data_date': str(item_row.get('Return Date','')), # Return Date
'erp_sku': item_row.get("erp_sku", ''), # ERP SKU # SKU1匹配
'shop_code': shop_code, # 店铺code
'supplier_code': data.get('Vendor code', ''), # 供应商编码
'supplier_code': item_row.get('Vendor code', ''), # 供应商编码
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Group Code', ""), # 组别 T1 T2
}
# 推送数据
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment