Commit 2740b6b3 authored by 邱阿朋's avatar 邱阿朋

队列消息

parent a23736f5
No preview for this file type
import pika
import json
class RabbitMQClient:
def __init__(self, host='localhost',port='5672', username='guest', password='guest'):
"""
初始化 RabbitMQ 客户端,创建连接和通道。
:param host: RabbitMQ 主机地址
:param username: RabbitMQ 账号
:param password: RabbitMQ 密码
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.connection = None
self.channel = None
self.exchange = None
self.queue = None
self.routing_key = None
self._initialize_connection()
def _initialize_connection(self):
"""初始化连接和通道"""
try:
credentials = pika.PlainCredentials(self.username, self.password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.host,port=self.port, credentials=credentials
))
self.channel = self.connection.channel()
print(f"Initialized connection to RabbitMQ at {self.host}.")
except Exception as e:
print(f"Error initializing RabbitMQ connection: {e}")
raise
def connect(self, queue, exchange='', exchange_type='direct', routing_key=''):
"""
绑定队列和交换机
:param queue: 队列名称
:param exchange: 交换机名称
:param exchange_type: 交换机类型
:param routing_key: 路由键
"""
try:
# 声明交换机(如果有)
if exchange:
self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
# 声明队列
self.channel.queue_declare(queue=queue, durable=True)
# 绑定队列到交换机(如果有)
if exchange:
self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
self.queue = queue
self.exchange = exchange
self.routing_key = routing_key
print(f"Connected to queue '{queue}' with exchange '{exchange}' (type: {exchange_type}).")
except Exception as e:
print(f"Error connecting to queue and exchange: {e}")
raise
def send_message(self, message, routing_key=None):
"""
发送消息到绑定的交换机
:param message: 要发送的消息
:param routing_key: 路由键(如果不传,则使用绑定时的 routing_key)
"""
routing_key = routing_key or self.routing_key
try:
if isinstance(message, dict):
message = json.dumps(message)
self.channel.basic_publish(
exchange=self.exchange,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
# print(f"Sent message: {message}")
except Exception as e:
print(f"Error sending message: {e}")
def receive_message(self, callback):
"""
接收消息并处理
:param callback: 处理消息的回调函数
"""
try:
self.channel.basic_consume(queue=self.queue, on_message_callback=callback, auto_ack=True)
print("Waiting for messages...")
self.channel.start_consuming()
except Exception as e:
print(f"Error receiving message: {e}")
def close(self):
"""关闭连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
print("Connection closed.")
\ No newline at end of file
...@@ -8,8 +8,11 @@ from DrissionPage import ChromiumPage ...@@ -8,8 +8,11 @@ from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError from DrissionPage.errors import ElementNotFoundError
from helper import helper, excel, file, domain, logger, api from helper import helper, excel, file, domain, logger, api
from src.helper import rabbitmq
country = None country = None
shop_code = None
log = logger.ConsoleLog() log = logger.ConsoleLog()
page = ChromiumPage() page = ChromiumPage()
...@@ -104,6 +107,29 @@ def main(): ...@@ -104,6 +107,29 @@ def main():
excel.save_xls(new_list_data, new_file_name) excel.save_xls(new_list_data, new_file_name)
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient(host='172.18.218.11', port=15672, username='test', password='khd123456')
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name)
for _, item_row in data.iterrows():
push_data = {
'return_id': item_row.get('Return ID'),
'asin': item_row.get('ASIN'), # ASIN
'order_no': item_row.get('Purchase order'), # 订单号
'sku_quantity': item_row.get('Quantity'), # 退回数量
'sku_amount': item_row.get('Total amount'), # Total cost
'currency': data.get('Currency code'), # Currency code
'data_date': data.get('Return Date').strftime('%Y-%m-%d'), # Return Date
'erp_sku': item_row.get("erp_sku"), # ERP SKU # SKU1匹配
'shop_code': shop_code, # 店铺code
'supplier_code': data.get('Vendor code'), # 供应商编码
}
# 推送数据
rabbit.send_message(push_data)
if __name__ == '__main__': if __name__ == '__main__':
try: try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US") country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
......
...@@ -13,8 +13,11 @@ from DrissionPage.errors import ElementNotFoundError ...@@ -13,8 +13,11 @@ from DrissionPage.errors import ElementNotFoundError
from lxml import etree from lxml import etree
from helper import helper, excel, file, domain, logger, api from helper import helper, excel, file, domain, logger, api
from src.helper import rabbitmq
country = None country = None
shop_code = None
log = logger.ConsoleLog() log = logger.ConsoleLog()
page = ChromiumPage() page = ChromiumPage()
...@@ -296,6 +299,30 @@ def save_excel(sheet_data, large_sheet_data, new_file_name): ...@@ -296,6 +299,30 @@ def save_excel(sheet_data, large_sheet_data, new_file_name):
log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}") log.info(f"文件 {new_file_name} 保存完成,路径:{os.path.abspath(new_file_name)}")
def push_data_queue(file_name):
rabbit = rabbitmq.RabbitMQClient(host='172.18.218.11', port=15672, username='test', password='khd123456')
rabbit.connect(queue='return_robot', routing_key='return_robot', exchange='reports')
data = pd.read_excel(file_name)
for _, item_row in data.iterrows():
push_data = {
'ad_date': item_row.get('Order Date', ""), # spa费用数据日期
'erp_sku': item_row.get('ERP SKU', ""), # ERP SKU
'ad_amount': item_row.get('Order Date', ""), # spa费用金额
'ad_amount_currency': item_row.get('Agreement Currency', ""), # spa费用币制
'amount_type': item_row.get('Order Date', ""), # 费用类型
'group_name': item_row.get('Group Name', ""), # 组别 运营一组 运营二组
'group_code': item_row.get('Order Date', ""), # 组别 T1 T2
'asin': item_row.get('Asin', ""), # ASIN
'shop_code': shop_code, # 店铺code
'type': 2, # 1 sheet1 2 其他sheet
'parent_id': item_row.get('Invoice ID', ""), # sheet1 为Invoice ID 其他sheet为sheet名称
'order_no': item_row.get('Invoice ID', ""), # 订单号
}
# 推送数据
rabbit.send_message(push_data)
if __name__ == '__main__': if __name__ == '__main__':
try: try:
country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US") country = helper.get_input_with_default("国家(目前支持[DE,FR,JP,CA,UK,US])", "US")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment