新聞中心
手把手教你在Windows下設(shè)置分布式隊(duì)列Celery的心跳輪詢
作者: 吳老板 2021-03-05 08:52:00
系統(tǒng)
Windows
分布式 大家好,我是吳老板。用Celery 官方的話來(lái)說(shuō),Celery 是一個(gè)非常優(yōu)秀的分布式隊(duì)列,可應(yīng)用于分布式共享中間隊(duì)列和定時(shí)任務(wù)等等。

10多年的章貢網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都營(yíng)銷網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整章貢建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)從事“章貢網(wǎng)站設(shè)計(jì)”,“章貢網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
1 前言
大家好,我是吳老板。用Celery 官方的話來(lái)說(shuō),Celery 是一個(gè)非常優(yōu)秀的分布式隊(duì)列,可應(yīng)用于分布式共享中間隊(duì)列和定時(shí)任務(wù)等等。
2 版本的差異
Celery 有很多個(gè)版本,各版本之間的差異可謂不小,比如最新的 Celery6.0 版本在穩(wěn)定性遠(yuǎn)不如 Celery4.0,所以在使用不同版本的時(shí)候,系統(tǒng)給到我們的反饋可能并不能如我們所愿。
3 服務(wù)
在 windows 下掛在 Celery 服務(wù)有時(shí)候會(huì)出現(xiàn)不穩(wěn)定的情況(unix中暫時(shí)未發(fā)現(xiàn)這種情況),比如在執(zhí)行定時(shí)任務(wù)的時(shí)候,過(guò)了一段時(shí)間之后,Celery 出現(xiàn)了假死狀態(tài),以至于不能按照我們指定的時(shí)間點(diǎn)去執(zhí)行任務(wù)。
這些任務(wù)只是加入到待運(yùn)行隊(duì)列中(堆積在 Redis 中),只能人為重啟 Celery 服務(wù)之后才能將堆積的任務(wù)釋放出來(lái)運(yùn)行。
這樣一來(lái),第一是定時(shí)任務(wù)在指定時(shí)間點(diǎn)沒(méi)有正常運(yùn)行,其二是在其他時(shí)間運(yùn)行了這些任務(wù),很可能會(huì)產(chǎn)生更新數(shù)據(jù)不及時(shí),時(shí)間節(jié)點(diǎn)混亂的問(wèn)題,不僅達(dá)不到業(yè)務(wù)需求,還會(huì)反受其害。
4 設(shè)置心跳
為了解決 Celery 在 windows 中的這種弊端,可以為 Celery 任務(wù)隊(duì)列設(shè)置一個(gè)心跳時(shí)間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫(kù)發(fā)送一次數(shù)據(jù)以保證隊(duì)列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊(duì)列服務(wù)就不會(huì)出現(xiàn)假死狀態(tài)。
5 舉個(gè)栗子
我總是很喜歡用示例來(lái)說(shuō)話,前些時(shí)間在對(duì)某平臺(tái)的商家后臺(tái)進(jìn)行數(shù)據(jù)采集的時(shí)候,為了使用時(shí)能自動(dòng)獲取該網(wǎng)站的 cookie ,
用Pyppeteer 寫(xiě)了一個(gè)自動(dòng)化登陸的腳本,和往常一樣仍在 Celery 隊(duì)列中并迅速的啟動(dòng)服務(wù)。
腳本是這樣的(非常接近實(shí)際的偽代碼,沒(méi)辦法,保命要緊)
- # -*- coding: utf-8 -*-
- from db.redisCurd import RedisQueue
- import asyncio
- import random
- import tkinter
- from pyppeteer.launcher import launch
- from platLogin.config import USERNAME, PASSWORD, LOGIN_URL
- class Login():
- def __init__(self, shopId):
- self.shopId = shopId
- self.RedisQueue = RedisQueue("cookie")
- def screen_size(self):
- tk = tkinter.Tk()
- width = tk.winfo_screenwidth()
- height = tk.winfo_screenheight()
- tk.quit()
- return {'width': width, 'height': height}
- async def login(self, username, password, url):
- browser = await launch(
- {
- 'headless': False,
- 'dumpio': True
- },
- args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'],
- )
- page = await browser.newPage() # 啟動(dòng)新的瀏覽器頁(yè)面
- try:
- await page.setViewport(viewport=self.screen_size())
- await page.setJavaScriptEnabled(enabled=True) # 啟用js
- await page.setUserAgent(
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299'
- )
- await self.page_evaluate(page)
- await page.goto(url)
- await asyncio.sleep(2)
- # 輸入用戶名,密碼
- await page.evaluate(f'document.querySelector("#userName").value=""')
- await page.type('#userName', username, {'delay': self.input_time_random() - 50}) # delay是限制輸入的時(shí)間
- await page.evaluate('document.querySelector("#passWord").value=""')
- await page.type('#passWord', password, {'delay': self.input_time_random()})
- await page.waitFor(6000)
- loginImgVcode = await page.waitForSelector('#checkCode')
- await loginImgVcode.screenshot({'path': './loginImg.png'})
- await page.waitFor(6000)
- res = use_cjy("./loginImg.png")
- pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234"
- await page.waitFor(6000)
- await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
- await page.waitFor(6000)
- await page.click('#subMit')
- await page.waitFor(6000)
- await asyncio.sleep(2)
- await self.get_cookie(page)
- await page.waitFor(3000)
- await self.page_close(browser)
- return {'code': 200, 'msg': '登陸成功'}
- except:
- return {'code': -1, 'msg': '出錯(cuò)'}
- finally:
- await page.waitFor(3000)
- await self.page_close(browser)
- # 獲取登錄后cookie
- async def get_cookie(self, page):
- cookies_list = await page.cookies()
- cookies = ''
- for cookie in cookies_list:
- str_cookie = '{0}={1}; '
- str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
- cookies += str_cookie
- # 將cookie 放入 cookie 池
- self.RedisQueue.put_hash(self.shopId, cookies)
- return cookies
- async def page_evaluate(self, page):
- await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
- await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
- await page.evaluate(
- '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
- await page.evaluate(
- '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
- await page.waitFor(3000)
- async def page_close(self, browser):
- for _page in await browser.pages():
- await _page.close()
- await browser.close()
- def input_time_random(self):
- return random.randint(100, 151)
- def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
- loop = asyncio.get_event_loop()
- i_future = asyncio.ensure_future(self.login(username, password, url))
- loop.run_until_complete(i_future)
- return i_future.result()
- if __name__ == '__main__':
- Z = Login(shopId="001")
- Z.run()
Celery 任務(wù)文件是這樣的
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import os
- import sys
- import time
- from db.redisCurd import RedisQueue
- from send_msg.weinxin import Send_msg
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(base_dir)
- from logger.logger import log_v
- from celery import Task
- from platLogin.login import Login # 登陸類
- from celery import Celery
- randomQueue = RedisQueue("cookie")
- celery_app = Celery('task')
- celery_app.config_from_object('celeryConfig')
- S = Send_msg()
- dl_dict = {
- 'demo': {
- 'cookie': '',
- 'loginClass': 'Login',
- }
- }
- # todo 這是三種運(yùn)行的狀態(tài)
- class task_status(Task):
- def on_success(self, retval, task_id, args, kwargs):
- log_v.info('任務(wù)信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- log_v.warning('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
- # todo 隨便找個(gè)hash key作為輪詢對(duì)象, celery在win10系統(tǒng)可能不太穩(wěn)定,有時(shí)候會(huì)有連接斷開(kāi)的情況
- @celery_app.task(base=task_status)
- def get_cookie_status(platName="demo"):
- try:
- # log_v.debug(f'[+] 輪詢 {platName} 定時(shí)器啟動(dòng) ..... Done')
- randomQueue.get_hash(platName).decode()
- log_v.debug(f'[+] 輪詢 {platName} 成功 ..... Done')
- return "Erp 輪詢成功"
- except:
- return "Erp 輪詢失敗"
- @celery_app.task(base=task_status)
- def set_plat_cookie(platName="demo", shopId=None):
- log_v.debug(f"[+] {platName} 正在登陸")
- core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
- result = core.run()
- return result
Celery 配置文件是這樣的
- from __future__ import absolute_import
- import datetime
- from kombu import Exchange, Queue
- from celery.schedules import crontab
- from urllib import parse
- BROKER_URL = f'redis://root:{parse.quote("你的不規(guī)則密碼")}@主機(jī):6379/15'
- # 導(dǎo)入任務(wù),如tasks.py
- CELERY_IMPORTS = ('monitor.tasks',)
- # 列化任務(wù)載荷的默認(rèn)的序列化方式
- CELERY_TASK_SERIALIZER = 'json'
- # 結(jié)果序列化方式
- CELERY_RESULT_SERIALIZER = 'json'
- CELERY_ACCEPT_CONTENT = ['json']
- CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時(shí)區(qū),不指定默認(rèn)為 'UTC'
- # CELERY_TIMEZONE='UTC'
- CELERYBEAT_SCHEDULE = {
- 'add-every-60-seconds': {
- 'task': 'tasks.get_cookie_status',
- 'schedule': datetime.timedelta(minutes=1), # 每 1 分鐘執(zhí)行一次
- 'args': () # 任務(wù)函數(shù)參數(shù)
- },
- }
啟動(dòng)服務(wù)
- celery -A tasks beat -l INFO
- celery -A tasks worker -l INFO -c 2
以 2 個(gè)線程啟動(dòng)消費(fèi)者隊(duì)列服務(wù)并啟用定時(shí)任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前平臺(tái)的 cookie 不可用時(shí),我會(huì)向 Celery 發(fā)送一個(gè)信號(hào)(就是調(diào)用了前面的set_plat_cookie 這個(gè)方法),消費(fèi)者得到這個(gè)任務(wù)這個(gè)就會(huì)執(zhí)行自動(dòng)化腳本以獲取 cookie 并儲(chǔ)存在 Redis 中,使用時(shí)在從 Redis 中獲取就能正常請(qǐng)求到該平臺(tái)的數(shù)據(jù)。
在空閑時(shí)間,Celery中的 get_cookie_status 方法會(huì)每隔一分鐘向 Redis 請(qǐng)求數(shù)據(jù),這就是我們?cè)O(shè)置的 1分鐘心跳。
這樣不管我們的 Celery 是否是后臺(tái)啟動(dòng),都不會(huì)出現(xiàn)假死、卡死的狀態(tài),則萬(wàn)事大吉矣!!
6 總結(jié)
本文為了解決 Celery 在 windows 中的這種弊端,為 Celery 任務(wù)隊(duì)列設(shè)置一個(gè)心跳時(shí)間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫(kù)發(fā)送一次數(shù)據(jù)以保證隊(duì)列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機(jī)并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊(duì)列服務(wù)都不會(huì)出現(xiàn)假死、卡死的狀態(tài)。
網(wǎng)站題目:手把手教你在Windows下設(shè)置分布式隊(duì)列Celery的心跳輪詢
分享路徑:http://m.fisionsoft.com.cn/article/cohccsd.html


咨詢
建站咨詢
