如何dynamic添加/删除周期性任务到芹菜(celerybeat)

如果我有一个函数定义如下:

def add(x,y): return x+y 

有没有办法dynamic地添加这个函数作为芹菜PeriodicTask并在运行时启动它? 我想能够做一些像(伪代码):

 some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30")) celery.beat.start(some_unique_task_id) 

我也想用(伪代码)之类的东西来dynamic地停止或删除这个任务:

 celery.beat.remove_task(some_unique_task_id) 

要么

 celery.beat.stop(some_unique_task_id) 

仅供参考我不使用djcelery,它允许您通过djangopipe理员pipe理定期任务。

不,对不起,这是不可能的与常规芹菜。

但是,它可以很容易地扩展到你想要的,例如,django-celery调度器只是一个读取和写入数据库的时间表的子类(在顶部有一些优化)。

您也可以使用django-celery调度程序,即使是非Django项目也是如此。

像这样的东西:

  • 安装django + django-芹菜:

    $ pip install -U django django-芹菜

  • 将以下设置添加到您的celeryconfig中:

     DATABASES = { 'default': { 'NAME': 'celerybeat.db', 'ENGINE': 'django.db.backends.sqlite3', }, } INSTALLED_APPS = ('djcelery', ) 
  • 创build数据库表:

     $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig 
  • 用数据库调度程序启动celerybeat:

     $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ -S djcelery.schedulers.DatabaseScheduler 

还有djcelerymon命令可以用于非Django项目在同一个进程中启动celerycam和Django Adminnetworking服务器,你可以使用它来在一个漂亮的web界面中编辑你的定期任务:

  $ djcelerymon 

(注意由于某些原因djcelerymon不能用Ctrl + C停止,你必须使用Ctrl + Z + kill%1)

这个问题在谷歌组上得到了答复。

我不是作者,所有功劳都归于Jean Mark

这是一个适当的解决scheme。 确定工作,在我的情况下,我分类周期任务,并创build一个模型,因为我可以添加其他领域的模型,因为我需要,也可以添加“终止”的方法。 您必须将定期任务的启用属性设置为False,并在删除之前将其保存。 整个子类不是必须的,schedule_every方法是真正做这个工作的方法。 当你准备好终止你的任务时(如果你没有inheritance它),你可以使用PeriodicTask.objects.filter(name = …)search你的任务,禁用它,然后删除它。

希望这可以帮助!

 from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete() 

你可以看看这个configuration烧瓶和djcelery的flask-djcelery ,还提供可浏览的rest api

有一个名为django-celery-beat的库,它提供了需要的模型。 为了使其dynamic加载新的周期性任务,必须创build自己的调度程序。

 from django_celery_beat.schedulers import DatabaseScheduler class AutoUpdateScheduler(DatabaseScheduler): def tick(self, *args, **kwargs): if self.schedule_changed(): print('resetting heap') self.sync() self._heap = None new_schedule = self.all_as_schedule() if new_schedule: to_add = new_schedule.keys() - self.schedule.keys() to_remove = self.schedule.keys() - new_schedule.keys() for key in to_add: self.schedule[key] = new_schedule[key] for key in to_remove: del self.schedule[key] super(AutoUpdateScheduler, self).tick(*args, **kwargs) @property def schedule(self): if not self._initial_read and not self._schedule: self._initial_read = True self._schedule = self.all_as_schedule() return self._schedule