Commit a3048422 authored by 邱阿朋's avatar 邱阿朋

删除无用代码

parent 0922aecc
# coding: utf-8
import configparser
import os
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from models.models import Base
# 创建 configparser 对象
config = configparser.ConfigParser()
db_session = None
def init():
# 配置文件路径
config_file = 'config.ini'
# 检查配置文件是否存在
if not os.path.exists(config_file):
raise FileNotFoundError(f"配置文件 {config_file} 不存在.")
# 读取配置文件
config.read(config_file)
init_mysql()
def init_mysql():
global db_session
try:
host = config.get('database', 'host')
port = config.get('database', 'port')
database = config.get('database', 'database')
username = config.get('database', 'username')
password = config.get('database', 'password')
engine = create_engine(url=f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}",
pool_recycle=3600,
echo=True)
# 测试连接是否正常
with engine.connect() as connection:
connection.execute(text("SELECT 1"))
Base.metadata.create_all(engine)
db_session = sessionmaker(bind=engine)
print("数据库连接成功,Session 初始化完成。")
except Exception as e:
raise FileNotFoundError(f"数据库连接失败: {e}")
import re
import time
import ddddocr
from DrissionPage import Chromium
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError, PageDisconnectedError
from lxml import etree
import tasks
from bootstrap import init
data_count = 0
init()
def get_total_pages(page):
total_pages = page.ele('.s-pagination-item s-pagination-disabled').text
return int(total_pages) if total_pages else 1
def search_goods(page, keywords):
# 定位输入框并输入关键字
page.ele('#twotabsearchtextbox').input(keywords)
page.ele('#nav-search-submit-button').click()
page.wait.title_change(keywords)
# 获取总分页
total_pages = get_total_pages(page)
print(f"总页数: {total_pages}")
# 获取商品信息
goods_list(page)
# 从第二页开始循环抓取,直到总页数
for pg in range(2, total_pages + 1):
pagination = page.ele('.a-section a-text-center s-pagination-container')
pagination.ele('.s-pagination-item s-pagination-next s-pagination-button s-pagination-separator').click()
print(f"抓取第 {pg} 页内容...")
time.sleep(10)
goods_list(page)
def goods_list(page):
# 解析 HTML 内容
tree = etree.HTML(page.html)
elements = tree.xpath('//div[contains(@class, "a-section") and contains(@class, "a-spacing-base")]')
table = Chromium()
for element in elements:
try:
# 检查元素是否包含 "Sponsored" 商品是否为广告
is_sponsored = element.xpath('.//span[contains(text(), "Sponsored")]')
if is_sponsored:
global data_count
data_count = data_count + 1
data = {}
# 提取 class='a-size-base-plus a-color-base a-text-normal' 的元素的文本值
goods_name = element.xpath(
'.//span[contains(@class, "a-size-base-plus") and contains(@class, "a-color-base") and contains(@class, "a-text-normal")]/text()')
if goods_name:
# print(f"商品名称: {title[0]}")
data['goods_name'] = goods_name[0]
# 提取 class='s-image' 的 img 标签中的 src 属性 (图片链接)
img_src = element.xpath('.//img[contains(@class, "s-image")]/@src')
if img_src:
# print(f"图片链接: {img_src[0]}")
data['img_src'] = img_src[0]
# 提取 class='a-link-normal s-no-outline' 的链接
link = element.xpath(
'.//a[contains(@class, "a-link-normal") and contains(@class, "s-no-outline")]/@href')
if link:
url = "https://www.amazon.com" + link[0]
data['url'] = url
tab = table.new_tab(url=url)
new_tree = etree.HTML(tab.html)
# 提取 <a id="bylineInfo"> 标签的文本值
store_name_text = new_tree.xpath('//a[@id="bylineInfo"]/text()')
if store_name_text:
store_name = re.sub(r'^(Visit the |访问 )', '', store_name_text[0])
data['store_name'] = store_name
tab.close()
# print(f"详情链接: {url}")
# print(data)
tasks.process_data(data)
# print("-" * 50)
except ElementNotFoundError:
pass
def check_captcha(page):
"""
检测图形码
:param page:
:return:
"""
container = page.ele('.a-row a-text-center')
return True if container else False
def open_url(page, url):
# 访问网页
page.get(url)
# 延迟2秒等待图形码加载
page.wait(2)
while True:
time.sleep(1)
if not check_captcha(page):
break
print("存在图形码开始处理...")
# 下载图形码
img = page('tag:img')
img.get_screenshot(name='captcha_image.jpg')
# 读取图像并进行识别
ocr = ddddocr.DdddOcr(show_ad=False)
image = open("captcha_image.jpg", "rb").read()
result = ocr.classification(image).upper()
if result:
# 输入验证码提交
page.ele('#captchacharacters').input(result)
time.sleep(2)
page.ele('.a-button-text').click()
def main(keywords):
page = ChromiumPage()
page.set.load_mode.normal() # 设置为normal模式
# 访问网页
open_url(page, 'https://www.amazon.com')
search_goods(page, keywords)
print(f"数据总量 {data_count} 条")
if __name__ == '__main__':
try:
print("请输入你要搜索的商品")
search_input = input()
print("你输入的是:", search_input)
# search_input = "皮鞋"
main(search_input)
except KeyboardInterrupt:
pass
except PageDisconnectedError as e:
print("与页面的连接已断开")
[database]
host = localhost
port = 3306
database = spring
username = root
password = 123456
[redis]
host = localhost
port = 6379
password =
db = 0
\ No newline at end of file
# coding: utf-8
import random
import time
from curl_cffi import requests
from lxml import etree
def proxies():
proxies = {
"http": "http://127.0.0.1:7897",
"https": "http://127.0.0.1:7897",
}
return proxies
def get_headers():
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "max-age=0",
"device-memory": "8",
"downlink": "9.35",
"dpr": "1",
"ect": "4g",
"priority": "u=0, i",
"rtt": "250",
"sec-ch-device-memory": "8",
"sec-ch-dpr": "1",
"sec-ch-ua": "\"Chromium\";v=\"128\", \"Not;A=Brand\";v=\"24\", \"Google Chrome\";v=\"128\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-ch-ua-platform-version": "\"15.0.0\"",
"sec-ch-viewport-width": "1920",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"viewport-width": "1920"
}
return headers
def gen_ac_buk():
return f"{random.randint(100, 999)}-{random.randint(1000000, 9999999)}-{random.randint(1000000, 9999999)}"
def goodsSearch(keywords):
"""
商品搜索
:param keywords:
:return:
"""
headers = get_headers()
ac_buk = gen_ac_buk()
cookies = {
"lc-main": "zh_CN",
"ubid-acbuk": ac_buk,
"crid": "MEKLIWGGWBQG",
"sprefix": "沙发,aps,373",
}
url = "https://www.amazon.com/s"
params = {
"k": keywords,
"ref": "nb_sb_noss"
}
response = requests.get(url, headers=headers, cookies=cookies, params=params, impersonate="chrome110", proxies="")
return response
def goodsInfo(url):
"""
获取商品详情页
:param url:
:return:
"""
headers = get_headers()
ac_buk = gen_ac_buk()
cookies = {
"lc-main": "zh_CN",
"ubid-acbuk": ac_buk,
}
response = requests.get(url, headers=headers, cookies=cookies, params="", impersonate="chrome110", proxies="")
return response
def goodsListPage(keywords, page):
"""
商品分页页面
:param keywords:
:param page:
:return:
"""
headers = get_headers()
ac_buk = gen_ac_buk()
cookies = {
"lc-main": "zh_CN",
# "lc-main": "en_US",
"ubid-acbuk": ac_buk,
}
url = "https://www.amazon.com/s"
params = {
"k": keywords, # 关键词参数
"qid": int(time.time()), # 当前时间戳
"page": page, # 当前页码
"crid": "MEKLIWGGWBQG",
"sprefix": "沙发,aps,373",
}
response = requests.get(url, headers=headers, cookies=cookies, params=params, impersonate="chrome110", proxies="")
return response
data_total = 0
def get_total_pages(response):
"""
分页总数
:param response:
:return:
"""
# 解析 HTML 内容
tree = etree.HTML(response.text)
# 找到分页总数的元素 (通过类名找到总页数,如 "s-pagination-item s-pagination-disabled")
total_pages = tree.xpath('//span[@class="s-pagination-item s-pagination-disabled"]/text()')
# 返回总页数,确保返回的是整数
return int(total_pages[-1].strip()) if total_pages else 1
def process_page_content(response):
# 在这里解析每页的响应内容,提取需要的数据
tree = etree.HTML(response.text)
# 查找包含特定 class 的元素
# 注意:因为 class 属性可以包含多个值,所以使用 contains() 函数来匹配 class
elements = tree.xpath('//div[contains(@class, "a-section") and contains(@class, "a-spacing-base")]')
# 输出匹配的元素
for element in elements:
# 检查元素是否包含 "Sponsored" 商品是否为广告
is_sponsored = element.xpath('.//span[contains(text(), "Sponsored")]')
if is_sponsored:
global data_total
data_total = data_total + 1
# 提取 class='a-size-base-plus a-color-base a-text-normal' 的元素的文本值
title = element.xpath(
'.//span[contains(@class, "a-size-base-plus") and contains(@class, "a-color-base") and contains(@class, "a-text-normal")]/text()')
if title:
print(f"商品名称: {title[0]}")
# 提取 class='s-image' 的 img 标签中的 src 属性 (图片链接)
img_src = element.xpath('.//img[contains(@class, "s-image")]/@src')
if img_src:
print(f"图片链接: {img_src[0]}")
# 提取 class='a-link-normal s-no-outline' 的链接
link = element.xpath(
'.//a[contains(@class, "a-link-normal") and contains(@class, "s-no-outline")]/@href')
# 输出商品标题和图片链接和产品链接
if link:
url = "https://www.amazon.com" + link[0]
print(f"详情链接: {url}")
info_resp = goodsInfo(url)
new_tree = etree.HTML(info_resp.text)
# 提取 <a id="bylineInfo"> 标签的文本值
byline_info = new_tree.xpath('//a[@id="bylineInfo"]/text()')
if byline_info:
print(f"店铺名: {byline_info[0]}")
print("-" * 50)
def main():
keywords = "沙发"
# 获取第一页内容
first_page_response = goodsSearch(keywords)
# 获取总页数
total_pages = get_total_pages(first_page_response)
print(f"总页数: {total_pages}")
print(f"抓取第 1 页内容...")
# 获取第一页数据
process_page_content(first_page_response)
# 从第二页开始循环抓取,直到总页数
for page in range(2, total_pages + 1):
print(f"抓取第 {page} 页内容...")
response = goodsListPage(keywords, page)
# 处理每页的响应内容 (可以解析并提取需要的信息)
process_page_content(response)
print("共计:", data_total, "条")
if __name__ == "__main__":
main()
import time
import ddddocr
from DrissionPage import Chromium
from DrissionPage import ChromiumPage
from DrissionPage.errors import ElementNotFoundError, PageDisconnectedError
from lxml import etree
data_count = 0
def get_total_pages(page):
total_pages = page.ele('.s-pagination-item s-pagination-disabled').text
return int(total_pages) if total_pages else 1
def search_goods(page, keywords):
# 定位输入框并输入关键字
page.ele('#twotabsearchtextbox').input(keywords)
page.ele('#nav-search-submit-button').click()
page.wait.title_change(keywords)
# 获取总分页
total_pages = get_total_pages(page)
print(f"总页数: {total_pages}")
# 获取商品信息
goods_list(page)
# 从第二页开始循环抓取,直到总页数
for pg in range(2, total_pages + 1):
pagination = page.ele('.a-section a-text-center s-pagination-container')
pagination.ele('.s-pagination-item s-pagination-next s-pagination-button s-pagination-separator').click()
print(f"抓取第 {pg} 页内容...")
time.sleep(10)
goods_list(page)
def goods_list(page):
# 解析 HTML 内容
tree = etree.HTML(page.html)
elements = tree.xpath('//div[contains(@class, "a-section") and contains(@class, "a-spacing-base")]')
table = Chromium()
for element in elements:
try:
# 检查元素是否包含 "Sponsored" 商品是否为广告
is_sponsored = element.xpath('.//span[contains(text(), "Sponsored")]')
if is_sponsored:
global data_count
data_count = data_count + 1
# 提取 class='a-size-base-plus a-color-base a-text-normal' 的元素的文本值
title = element.xpath(
'.//span[contains(@class, "a-size-base-plus") and contains(@class, "a-color-base") and contains(@class, "a-text-normal")]/text()')
if title:
print(f"商品名称: {title[0]}")
# 提取 class='s-image' 的 img 标签中的 src 属性 (图片链接)
img_src = element.xpath('.//img[contains(@class, "s-image")]/@src')
if img_src:
print(f"图片链接: {img_src[0]}")
# 提取 class='a-link-normal s-no-outline' 的链接
link = element.xpath(
'.//a[contains(@class, "a-link-normal") and contains(@class, "s-no-outline")]/@href')
if link:
url = "https://www.amazon.com" + link[0]
tab = table.new_tab(url=url)
new_tree = etree.HTML(tab.html)
# 提取 <a id="bylineInfo"> 标签的文本值
store_name_text = new_tree.xpath('//a[@id="bylineInfo"]/text()')
if store_name_text:
store_name = store_name_text[0].replace("Visit the ", "")
print(f"店铺名: {store_name}")
tab.close()
print(f"详情链接: {url}")
print("-" * 50)
except ElementNotFoundError:
pass
def check_captcha(page):
"""
检测图形码
:param page:
:return:
"""
container = page.ele('.a-row a-text-center')
return True if container else False
def open_url(page, url):
# 访问网页
page.get(url)
# 延迟2秒等待图形码加载
page.wait(2)
while True:
time.sleep(1)
if not check_captcha(page):
break
print("存在图形码开始处理...")
# 下载图形码
img = page('tag:img')
img.get_screenshot(name='captcha_image.jpg')
# 读取图像并进行识别
ocr = ddddocr.DdddOcr(show_ad=False)
image = open("captcha_image.jpg", "rb").read()
result = ocr.classification(image).upper()
if result:
# 输入验证码提交
page.ele('#captchacharacters').input(result)
time.sleep(2)
page.ele('.a-button-text').click()
def main(keywords):
page = ChromiumPage()
page.set.load_mode.normal() # 设置为normal模式
# 访问网页
open_url(page, 'https://www.amazon.com')
search_goods(page, keywords)
print(f"数据总量 {data_count} 条")
if __name__ == '__main__':
try:
print("请输入你要搜索的商品")
search_input = input()
print("你输入的是:", search_input)
# search_input = "皮鞋"
main(search_input)
except KeyboardInterrupt:
pass
except PageDisconnectedError as e:
print("与页面的连接已断开")
import email
import imaplib
from email.header import decode_header
# 邮箱账户信息
username = 'us-cs001@khdtek.com' # 更换为你的邮箱
password = 'khd=20221208' # 更换为你的邮箱密码
imap_server = 'imap.qiye.aliyun.com'
def get_latest_unread_email():
# 连接到 IMAP 服务器
mail = imaplib.IMAP4_SSL(imap_server)
mail.login(username, password)
# 选择收件箱
mail.select("inbox")
# 搜索未读邮件
status, messages = mail.search(None, 'UNSEEN')
if status != 'OK':
print("没有找到未读邮件")
return
# 获取邮件 ID
email_ids = messages[0].split()
if not email_ids:
print("没有未读邮件")
return
# 获取最新的未读邮件 ID(最后一个 ID)
latest_email_id = email_ids[-1]
# 获取邮件内容
status, msg_data = mail.fetch(latest_email_id, '(RFC822)')
if status != 'OK':
print("无法获取邮件")
return
# 解析邮件内容
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
# 获取邮件主题
subject, encoding = decode_header(str(msg["Subject"]))[0]
if isinstance(subject, bytes):
subject = subject.decode(encoding if encoding else "utf-8")
# 获取发件人
from_ = msg.get("From")
# 获取发送时间
date_ = msg.get("Date")
print(f"最新未读邮件主题: {subject}")
print(f"发件人: {from_}")
print(f"发送时间: {date_}")
# 获取邮件正文内容
if msg.is_multipart():
# 如果邮件是多部分的
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 只处理文本或 HTML 内容
if content_type == "text/plain" and "attachment" not in content_disposition:
body = part.get_payload(decode=True).decode() # 解码邮件内容
print("邮件正文(纯文本):", body)
elif content_type == "text/html" and "attachment" not in content_disposition:
html_body = part.get_payload(decode=True).decode() # 解码邮件内容
print("邮件正文(HTML):", html_body)
else:
# 如果邮件不是多部分的
body = msg.get_payload(decode=True).decode()
print("邮件正文:", body)
# 登出
mail.logout()
# 执行获取最新未读邮件的操作
get_latest_unread_email()
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
def gen_ac_buk():
return f"{random.randint(100, 999)}-{random.randint(1000000, 9999999)}-{random.randint(1000000, 9999999)}"
driver = webdriver.Chrome()
cookie = {
"lc-main": "zh_CN",
"ubid-acbuk": gen_ac_buk(),
}
driver.add_cookie(cookie)
driver.get("https://amazon.com/")
driver.implicitly_wait(0.5)
search_box = driver.find_element(by=By.ID, value="twotabsearchtextbox")
driver.find_element(by=By.ID, value="nav-search-submit-button").click()
# driver.quit()
# coding: utf-8
from sqlalchemy import Column, Integer, String, Text
from sqlalchemy.orm import declarative_base
# 创建基础类
Base = declarative_base()
class BaseModel(Base):
__abstract__ = True
__table_args__ = {
'mysql_engine': 'InnoDB', # 指定全局的 InnoDB 引擎
'mysql_charset': 'utf8mb4' # 设置字符集
}
class Store(BaseModel):
__tablename__ = 'store'
id = Column(Integer, primary_key=True)
name = Column(String(100), comment='店铺名称')
code = Column(String(50), comment='店铺编码')
__table_args__ = ({'comment': '店铺表'})
class Category(BaseModel):
__tablename__ = 'category'
id = Column(Integer, primary_key=True)
name = Column(String(50), comment='分类名称')
__table_args__ = ({'comment': '商品分类表'})
class Goods(BaseModel):
__tablename__ = 'goods'
id = Column(Integer, primary_key=True)
store_id = Column(Integer, comment='门店id')
name = Column(String(255), comment='商品名称')
image = Column(String(255), comment='商品图片')
url = Column(Text, comment='商品地址')
__table_args__ = ({'comment': '商品表'})
class Comment(BaseModel):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(String(50), comment='内容')
goods_id = Column(Integer, comment='商品id')
store_id = Column(Integer, comment='门店id')
__table_args__ = ({'comment': '商品信息表'})
curl_cffi==0.7.1
lxml==5.3.0 lxml==5.3.0
DrissionPage==4.1.0.2 DrissionPage==4.1.0.2
ddddocr==1.5.5
SQLAlchemy==2.0.35
pymysql==1.1.1
huey==2.5.1
redis==5.0.8
selenium==4.25.0
requests==2.32.3
xlrd==2.0.1 xlrd==2.0.1
pandas==2.2.3 pandas==2.2.3
openpyxl==3.1.5 openpyxl==3.1.5
\ No newline at end of file
...@@ -31,7 +31,7 @@ def page_get(url): ...@@ -31,7 +31,7 @@ def page_get(url):
def sku_relations(): def sku_relations():
relations_dict = {} relations_dict = {}
# 读取ASIN和sku映射关系 # 读取ASIN和sku映射关系
file_name = 'relations.xlsx' file_name = '../relations.xlsx'
if not os.path.isfile(file_name): if not os.path.isfile(file_name):
raise FileNotFoundError(f"{file_name},请确认sku关系文件不存在") raise FileNotFoundError(f"{file_name},请确认sku关系文件不存在")
......
# coding: utf-8
from huey import RedisHuey
import bootstrap
from models.models import Store, Goods
bootstrap.init()
config = bootstrap.config
host = config.get('redis', 'host')
port = config.get('redis', 'port')
db = config.get('redis', 'db')
password = config.get('redis', 'password')
# 创建 Huey 应用,指定 Redis 配置
huey = RedisHuey(host=host, port=port, db=db, password=password)
@huey.task()
def process_data(data: dict):
orm = bootstrap.db_session()
try:
store_name = data['store_name']
# 查询店铺是否存在
store_res = orm.query(Store).filter_by(name=store_name).first()
if not store_res:
store_data = Store(name=store_name)
orm.add(store_data)
orm.commit()
store_id = store_data.id
else:
store_id = store_res.id
goods_name = data['goods_name']
goods_res = orm.query(Goods).filter_by(name=goods_name).first()
if not goods_res:
goods_data = Goods(store_id=store_id, name=goods_name, image=data['img_src'], url=data['url'])
orm.add(goods_data)
orm.commit()
except Exception as e:
print(f"Error: {e}")
finally:
orm.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment