婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > celery實現動態設置定時任務

celery實現動態設置定時任務

熱門標簽:電銷機器人 金倫通信 南京crm外呼系統排名 汕頭電商外呼系統供應商 云南地圖標注 鄭州智能外呼系統中心 北京外呼電銷機器人招商 賓館能在百度地圖標注嗎 crm電銷機器人 400電話 申請 條件

本文實例為大家分享了celery動態設置定時任務的具體代碼,供大家參考,具體內容如下

首先celery是一種異步任務隊列,如果還不熟悉這個開源軟件的請先看看官方文檔,快速入門。

這里講的動態設置定時任務的方法不使用數據庫保存定時任務的信息,所以是項目重啟后定時任務配置就會丟失,如果想保存成永久配置,可以考慮保存到數據庫、redis或者使用pickle、json保存成文件,在項目啟動時自動載入。

方法原理介紹

先來看一下celery的beat運行過程。

上圖是beat的主要組成結構,beat中包含了一個service對象,service中包含了一個scheduler對象,scheduler中包含了一個schedule字典,schedule中key對應的的value才是真正的定時任務,是整個beat中最小的單元。

首先分別介紹一下各個對象和它們運行的過程,beat是celery.apps.beat.Beat類創建的對象,調用beat.run()方法就可以啟動beat,下面是beat.run()方法的源碼。

def run(self):
 print(str(self.colored.cyan(
 'celery beat v{0} is starting.'.format(VERSION_BANNER))))
 self.init_loader()
 self.set_process_title()
 self.start_scheduler()

重點是在run()方法里調用了start_scheduler()方法,而start_scheduler()方法本質上是創建了一個service對象(celery.beat.Service類),并調用service.start()方法,下面是beat.start_scheduler()方法的源碼。

def start_scheduler(self):
 if self.pidfile:
 platforms.create_pidlock(self.pidfile)
 service = self.Service(
 app=self.app,
 max_interval=self.max_interval,
 scheduler_cls=self.scheduler_cls,
 schedule_filename=self.schedule,
 )
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
 logger.debug('Setting default socket timeout to %r',
 self.socket_timeout)
 socket.setdefaulttimeout(self.socket_timeout)
 try:
 self.install_sync_handler(service)
 service.start()
 except Exception as exc:
 logger.critical('beat raised exception %s: %r',
 exc.__class__, exc,
 exc_info=True)
 raise

調用了service.start()之后,會進入一個死循環,先使用self.scheduler.tick()獲取下一個任務a的定時點到現在時間的間隔,然后進入睡眠,睡眠結束之后判斷如果self.scheduler里的下一個任務a可以執行,就立即執行,并獲取self.scheduler里的下下一個任務b的定時點到現在時間的間隔,進入下一次循環。下面是service.start()的源碼。

def start(self, embedded_process=False):
 info('beat: Starting...')
 debug('beat: Ticking with max interval->%s',
 humanize_seconds(self.scheduler.max_interval))
 
 signals.beat_init.send(sender=self)
 if embedded_process:
 signals.beat_embedded_init.send(sender=self)
 platforms.set_process_title('celery beat')
 
 try:
 while not self._is_shutdown.is_set():
 interval = self.scheduler.tick()
 if interval and interval > 0.0:
 debug('beat: Waking up %s.',
 humanize_seconds(interval, prefix='in '))
 time.sleep(interval)
 if self.scheduler.should_sync():
 self.scheduler._do_sync()
 except (KeyboardInterrupt, SystemExit):
 self._is_shutdown.set()
 finally:
 self.sync()

service.scheduler默認是celery.beat.PersistentScheduler類的實例對象,而celery.beat.PersistentScheduler其實是celery.beat.Scheduler的子類,所以scheduler.schedule是celery.beat.Scheduler類中的字典,保存的是celery.beat.ScheduleEntry類型的對象。ScheduleEntry的實例對象保存了定時任務的名稱、參數、定時信息、過期時間等信息。celery.beat.Scheduler類實現了對schedule的更新方法即update_from_dict(self, dict_)方法。下面是update_from_dict(self, dict_)方法的源碼。

def _maybe_entry(self, name, entry):
 if isinstance(entry, self.Entry):
 entry.app = self.app
 return entry
 return self.Entry(**dict(entry, name=name, app=self.app))
 
def update_from_dict(self, dict_):
 self.schedule.update({
 name: self._maybe_entry(name, entry)
 for name, entry in items(dict_)
 })

可以看到update_from_dict(self, dict_)方法實際上是向schedule中更新了self.Entry的實例對象,而self.Entry從celery.beat.Scheduler的源碼知道是celery.beat.ScheduleEntry。

到這里整個流程就粗略的介紹完了,基本過程是這個樣子。

但是從前面start_scheduler()的源碼可以看到,beat在內部創建一個service之后,就直接進入死循環了,所以從外面無法拿到service對象,就不能對service里的scheduler對象操作,就不能對scheduler的schedule字典操作,所以就無法在beat運行的過程中動態添加定時任務。

方法介紹

前面介紹完原理,現在來講一下解決思路。主要思路就是讓start_scheduler方法中創建的service暴露出來。所以就想到手寫一個類去繼承Beat,重寫start_scheduler()方法。

import socket
from celery import platforms
from celery.apps.beat import Beat
 
 
class MyBeat(Beat):
 '''
 繼承Beat 添加一個獲取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了獲取service的方式
 service = self.get_service()
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise
 
 def get_service(self):
 '''
 這個是自定義的 目的是為了把service暴露出來,方便對service的scheduler操作,因為定時任務信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service

在MyBeat類中添加一個get_service()方法,如果beat沒有servic對象就創建一個,如果有就直接返回,方便對service的scheduler操作。

然后在此基礎上實現對定時任務的增刪改查操作。

def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 創建或更新定時任務
 :param task_name: 定時任務名稱
 :param cron_task: task名稱
 :param minute: 以下是時間
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)
 
 
def del_cron_task(task_name: str):
 '''
 刪除定時任務
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]
 
 
def get_cron_task():
 '''
 獲取當前所有定時任務的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

但是僅僅是這樣還不能解決問題,從前面的serive.start()的源碼看到,beat啟動后會進入一個死循環,如果直接在主線程啟動beat,必然會阻塞在死循環中,所以需要為beat創建一個子線程,這樣才影響主線程的其他操作。

flag = False
 
beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)
 
 
# 設置主動啟動beat是為了避免使用celery -A celery_demo worker 命令重復啟動worker
def run():
 '''
 啟動Beat
 :return:
 '''
 beat.run()
 
 
def new_thread():
 '''
 創建一個線程啟動Beat 最多只能創建一個
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 啟動成功2s后才能操作定時任務 否則可能會報錯
 time.sleep(2)
 flag = True

可能看到上面的代碼有人會想,為什么不在主程序加載完成就啟動為beat創建一個子線程,還非要寫個函數等待主動調用?這是因為例如在使用django+celery組合時,一般啟動django和啟動celery woker是兩個獨立的進程,如果讓django在加載代碼的時候自動啟動beat的子線程,那么在使用celery -A demo_name worker 啟動celery時,會重新加載一邊django的代碼,因為celery需要掃描每個app下的tasks.py文件,加載異步任務函數,這時啟動celery woker就會也啟動一個beat子線程,可能會造成定時任務重復執行的情況。所以在這里設置成主動開啟beat子線程,目的就是為了celery worker啟動不重復創建beat線程。

完整的代碼如下:

import socket
import time
import threading
from celery import platforms
from celery.schedules import crontab
from celery.apps.beat import Beat
from celery.utils.log import get_logger
from celery_demo import celery_app
 
logger = get_logger('celery.beat')
flag = False
 
 
class MyBeat(Beat):
 '''
 繼承Beat 添加一個獲取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了獲取service的方式
 service = self.get_service()
 
 print(self.banner(service))
 
 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise
 
 def get_service(self):
 '''
 這個是自定義的 目的是為了把service暴露出來,方便對service的scheduler操作,因為定時任務信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service
 
 
beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)
 
 
# 設置主動啟動beat是為了避免使用celery -A celery_demo worker 命令重復啟動worker
def run():
 '''
 啟動Beat
 :return:
 '''
 beat.run()
 
 
def new_thread():
 '''
 創建一個線程啟動Beat 最多只能創建一個
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 啟動成功2s后才能操作定時任務 否則可能會報錯
 time.sleep(2)
 flag = True
 
 
def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 創建或更新定時任務
 :param task_name: 定時任務名稱
 :param cron_task: task名稱
 :param minute: 以下是時間
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)
 
 
def del_cron_task(task_name: str):
 '''
 刪除定時任務
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]
 
 
def get_cron_task():
 '''
 獲取當前所有定時任務的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

另外還可以參考我的github,相關的注釋在代碼里寫的較為清晰。

注意:使用這種方式添加/刪除定時任務只是保存在內存中的,項目重啟后就會丟失。如果想要持久化,可以參照上面的方法,把相關信息保存到數據庫或其他可持久保存文件中,在beat線程啟動時加載相關任務信息,在對定時任務修改做增刪改時及時修改數據庫或文件中內容。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。

您可能感興趣的文章:
  • celery實現動態設置定時任務
  • Django+Celery實現動態配置定時任務的方法示例
  • Django實現celery定時任務過程解析
  • python基于celery實現異步任務周期任務定時任務
  • celery4+django2定時任務的實現代碼
  • django+xadmin+djcelery實現后臺管理定時任務
  • Django配置celery(非djcelery)執行異步任務和定時任務
  • python Celery定時任務的示例

標簽:梅州 昆明 西寧 錫林郭勒盟 文山 懷化 石家莊 浙江

巨人網絡通訊聲明:本文標題《celery實現動態設置定時任務》,本文關鍵詞  celery,實現,動態,設置,定時,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《celery實現動態設置定時任務》相關的同類信息!
  • 本頁收集關于celery實現動態設置定時任務的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    99精品在线观看视频| 成人免费在线观看入口| 91片黄在线观看| www久久久久| 91久久精品国产91性色tv| 精品久久久久99| 亚洲欧美另类综合偷拍| 国产麻豆欧美日韩一区| 精品三级av在线| 经典一区二区三区| 国产亚洲欧美日韩俺去了| 韩国三级电影一区二区| 91精品国产91久久久久久一区二区| 91精品在线免费| 日韩高清一区在线| 欧美一级片在线| 蜜桃视频在线观看一区二区| 欧美酷刑日本凌虐凌虐| 日韩成人免费电影| 精品国产不卡一区二区三区| 国产美女av一区二区三区| 精品乱码亚洲一区二区不卡| 国内外成人在线| 国产精品久久久久久亚洲伦 | 粗大黑人巨茎大战欧美成人| 国产精品乱人伦中文| 成+人+亚洲+综合天堂| 亚洲精品菠萝久久久久久久| 日韩免费看网站| 欧洲色大大久久| 大胆亚洲人体视频| 九一九一国产精品| 日韩中文字幕91| 亚洲精品日韩专区silk| 中文字幕久久午夜不卡| 91麻豆精品国产自产在线| 99riav一区二区三区| 精品一区二区免费视频| 日韩精品每日更新| 偷窥国产亚洲免费视频| 亚洲一区影音先锋| 一区二区三区高清| 一区二区在线观看不卡| 亚洲精品视频在线看| 欧美亚男人的天堂| 成人免费视频播放| 午夜欧美视频在线观看| 亚洲午夜久久久久久久久久久 | 香蕉影视欧美成人| 五月婷婷久久丁香| 久久99热这里只有精品| 国产真实乱对白精彩久久| 久久成人免费日本黄色| 国产一区二区在线免费观看| 国产一区二三区| 久久久久国产精品免费免费搜索| 国产欧美日韩麻豆91| 亚洲精品亚洲人成人网| 日本va欧美va瓶| 成人自拍视频在线观看| 一本一本大道香蕉久在线精品| 北岛玲一区二区三区四区| 在线播放视频一区| 国产原创一区二区| 暴力调教一区二区三区| 91精选在线观看| 久久亚洲一级片| 中文字幕在线不卡国产视频| 久久久国际精品| 91在线一区二区| 韩国成人精品a∨在线观看| 一色屋精品亚洲香蕉网站| 久久久久9999亚洲精品| 免费观看日韩av| 久久99热狠狠色一区二区| 91国偷自产一区二区三区观看 | 亚洲麻豆国产自偷在线| 久久久精品国产免大香伊| 久久亚洲一级片| 国产三级精品在线| 亚洲一二三专区| 欧美二区三区的天堂| 国产大陆亚洲精品国产| 欧美精品一区二区久久久| 国产一区二三区| 国产精品亚洲第一区在线暖暖韩国| 亚洲视频在线一区| 97久久超碰精品国产| 丁香天五香天堂综合| 久久国产精品无码网站| 亚洲综合999| 国产精品久久综合| 欧美日韩国产综合一区二区三区| 99国产精品99久久久久久| 91久久线看在观草草青青| 狠狠狠色丁香婷婷综合激情| 国产乱码精品一区二区三区av | 国产69精品久久久久777| av亚洲精华国产精华精华| 欧美一卡二卡在线| 一区二区在线电影| 经典一区二区三区| 欧美亚洲高清一区| 日韩欧美中文一区二区| 日韩精品中文字幕在线一区| 国产欧美综合在线观看第十页| 日韩一区中文字幕| av一区二区三区在线| 中文字幕中文字幕在线一区 | 在线看日韩精品电影| 国产精品久久久久一区二区三区| 国产高清一区日本| 欧美一区二区三区日韩| 国产精品国产三级国产aⅴ中文 | 色噜噜狠狠一区二区三区果冻| 欧美性三三影院| 久久99精品一区二区三区三区| 在线观看一区二区精品视频| 亚洲一区二区在线播放相泽| 欧美特级限制片免费在线观看| 色综合激情久久| 一区二区三区在线视频观看| 91精品中文字幕一区二区三区| 精品亚洲成av人在线观看| 久久美女艺术照精彩视频福利播放 | 国产亚洲va综合人人澡精品| 亚洲成av人影院在线观看网| 日韩精品一区二区三区四区视频 | 中文字幕一区二区5566日韩| 一本大道久久a久久精品综合| 一区二区三区在线免费| 久久人人爽人人爽| 欧美人体做爰大胆视频| av网站一区二区三区| 亚洲精品免费视频| 国产成人在线网站| 亚洲视频在线一区观看| 国产午夜精品福利| 欧美日韩国产欧美日美国产精品| 亚洲一区二区四区蜜桃| 亚洲日本护士毛茸茸| 欧美年轻男男videosbes| 91在线免费视频观看| 在线观看亚洲精品| 欧美午夜精品一区二区蜜桃| 日本在线播放一区二区三区| 国产精品你懂的| 69堂亚洲精品首页| 久久草av在线| 日韩激情在线观看| 亚洲欧洲精品一区二区三区| 欧美一区二区精品在线| 成年人国产精品| 国产福利一区在线| 国产成人亚洲综合色影视| 人人爽香蕉精品| 国产成人一区在线| 人人超碰91尤物精品国产| 三级影片在线观看欧美日韩一区二区 | 成人免费视频caoporn| 免费观看成人鲁鲁鲁鲁鲁视频| 亚洲色图另类专区| 91成人国产精品| 成人小视频在线| 久久久久国产精品麻豆ai换脸 | 日韩国产精品久久久| 亚洲一二三四区不卡| 91免费看视频| 日韩一区二区三区av| 色菇凉天天综合网| 精品国产乱码久久久久久久久| 久久久亚洲精品石原莉奈| 久久久综合激的五月天| 亚洲天堂中文字幕| 东方aⅴ免费观看久久av| 久久亚洲捆绑美女| 日韩成人伦理电影在线观看| 中文字幕一区二区三区四区不卡| 亚洲一区在线观看视频| 亚洲人xxxx| 久久久国产综合精品女国产盗摄| 中文字幕人成不卡一区| 国产精品一区二区三区99| 成人av在线播放网址| 国产日韩精品一区二区三区在线| 久久精品国产99国产精品| 欧美日本一区二区三区| 67194成人在线观看| 国产制服丝袜一区| 一区二区三区日韩欧美| 精品免费99久久| 亚洲视频免费看| 色综合中文字幕国产| 国产91对白在线观看九色| 亚洲欧洲国产专区| 亚洲欧美日韩国产另类专区| 日韩国产精品久久| fc2成人免费人成在线观看播放| 欧美羞羞免费网站| 国产视频视频一区|