Celery-RabbitMQ-Django-Cron

   

celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。

0. 安装

0.1 install

1. celery 基础命令

1.1. 启动 celery

1.2. 后台进程启动celery worker

1.3. 重启 celery worker 后台进程

1.4. 停止 celery worker 后台进程

1.5. 等待 停止 celery worker 后台进程

1.6. 指定 pidfile & logfile

1.7. 启动 多个 worker 并且指定不同参数

1.8. 手动杀死所有worker进程

1.9. 较完整的celery的启动命令

2. 管理相关命令

2.1. 启动 flower

2.2. 需要 目录

2.3. 使用 librabbitmq


0.1 install

sudoapt-getinstallbuild-essentialpython-dev sudopipinstallcelery sudopipinstalllibrabbitmq sudopipinstallflower sudoapt-getinstallrabbitmq-server sudorabbitmq-pluginsenablerabbitmq_management sudoservicerabbitmq-serverrestart sudorabbitmq-pluginsdisablerabbitmq_management

访问 http://host:15672 即可进入管理界面。

默认用户名,密码都是 guest

1 (97).jpg

1.1. 启动 celery

celery-Ataskworker-linfo

启动时指定要使用的 queue

celery-Ataskworker-linfo-Qbrand_queue,new_queue-E

 

1.2. 后台进程启动celery worker

celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E

可通过命令查看后台启动的进程:

ps-aux|grepcelery [celeryd:w1@x:MainProcess]-active-(worker-E-Atask-linfo-Qbrand_queue,new_queue--logfile=w1.log--pidfile=w1.pid--hostname=w1@x)

可以看到默认添加了几个参数:

--logfile=w1.log默认在当前文件夹新建w1.log文件 --pidfile=w1.pid默认在当前文件夹新建w1.pid文件 --hostname=w1@x默认实例名woker_name/机器名

1.3. 重启 celery worker 后台进程

celerymultirestartw1-Atask-linfo-Qbrand_queue,new_queue-E

1.4. 停止 celery worker 后台进程

celerymultistopw1-Atask-linfo-Qbrand_queue,new_queue-E

stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,

并且不会写停止worker相关的日志

1.5. 等待 停止 celery worker 后台进程

celerymultistopwaitw1-Atask-linfo-Qbrand_queue,new_queue-E

这个停止命令,会等待正在运行的所有任务都完成再停止。

1.6. 指定 pidfile & logfile

celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log

1.7. 启动 多个 worker 并且指定不同参数

celerymultistart10-Atask-linfo-E-Q:1-3images,video-Q:4,5data-Qdefault-L:4,5debug

启动了10个worker:

worker 1,2,3 使用了队列 images, video

worker 4,5 使用了队列 data

worker 其他 使用了队列 default

-L 是什么参数?

1.8. 手动杀死所有worker进程

psauxww|grep\'celeryworker\'|awk\'{print$2}\'|xargskill-9

1.9. 较完整的celery的启动命令

celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue,time_queue,cron_queue-E-B-s/var/www/api/space/run/celerybeat-schedule--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log

 

2.1 启动 flower

celeryflower--broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger--address=192.168.0.4--port=5555--broker_api=http://tiger:tiger@192.168.0.6:15672/api/--basic_auth=tiger:tiger rabbitmq主机地址:192.168.0.6 本机地址:192.168.0.4 本地监听端口:5555

2.2 需要目录

run/ log/

2.3 使用 librabbitmq

Ifyou’reusingRabbitMQ(AMQP)asthebrokerthenyoucaninstallthelibrabbitmqmoduletouseanoptimizedclientwritteninC: $pipinstalllibrabbitmq

注意

1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。

2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。

代码示例

配置文件:

#-*-coding=utf-8-*- #FileName:task_conf.py from__future__importabsolute_import fromceleryimportCelery fromdatetimeimporttimedelta fromcelery.schedulesimportcrontab \'\'\' task_random:是任务的名称 broker:通过amqp://用户名:密码@ip/虚拟主机连接amqp include:任务程序 \'\'\' #消息队列配置 mq_host=\'192.168.0.6\' mq_name=\'tiger\' mq_pass=\'tiger\' mq_vr=\'vr_tiger\' broker=\'amqp://%s:%s@%s/%s\'%(mq_name,mq_pass,mq_host,mq_vr) #初始化app app=Celery(\'name_wash\',broker=broker,include=[\'task\']) #指定任务存储队列 app.conf.update( CELERY_ROUTES={ \'task.exe_task\':{\'queue\':\'brand_queue\'}, \'task.task_sms_send\':{\'queue\':\'new_queue\'}, \'task.task_sec\':{\'queue\':\'time_queue\'}, \'task.task_cron\':{\'queue\':\'cron_queue\'} }, CELERYBEAT_SCHEDULE={ \'exe-every-10-seconds\':{ \'task\':\'task.task_sec\', \'schedule\':timedelta(seconds=30), \'args\':[1], }, \'add-every-monday-morning\':{ \'task\':\'task.task_cron\', \'schedule\':crontab(hour=15,minute=47,day_of_week=5), \'args\':(15232897835,), }, }, #CELERY_TASK_SERIALIZER=\'json\', #CELERY_ACCEPT_CONTENT=[\'json\'],#Ignoreothercontent #CELERY_RESULT_SERIALIZER=\'json\', CELERY_EVENT_QUEUE_TTL=5, CELERY_TIMEZONE=\'Asia/Shanghai\', CELERY_ENABLE_UTC=True, CELERY_DISABLE_RATE_LIMITS=True, CELERY_IGNORE_RESULT=True ) if__name__==\'__main__\': app.start()

 

任务文件:

#-*-coding=utf-8-*- #FileName:task.py \'\'\' task \'\'\' from__future__importabsolute_import importtime importtraceback fromjob.task_confimportapp @app.task(ignore_result=True) defexe_task(task_id,number): \'\'\'根据参数执行任务\'\'\' try: print\'exetask:\',task_id time.sleep(number) except: traceback.print_exc() return(task_id,number,-1) return\'true:)\' @app.task deftask_sms_send(mobile,content): \'\'\'任务-发送短信\'\'\' try: print\'sendsms:mobile->%s,content->%s\'%(mobile,content) except: traceback.print_exc() return\'Fail:(\' return\'Success:)\' @app.task deftask_sec(mobile): \'\'\'测试任务时间定制\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F\' return\'S\' @app.task deftask_cron(mobile): \'\'\'测试任务时间定制Cron\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F-cron\' return\'S-cron\' defmain(): res=exe_task(2,2) print\'res:\',res if__name__==\'__main__\': main()

 

添加任务代码:

#-*-coding=utf-8-*- #FileName:add_task.py importtime importtraceback importrandom fromtaskimportexe_task,task_sms_send defaction(): tries=0 while1: try: tries+=1 iftries>=20: break task_id=tries number=random.randint(1,5) exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') print\'addedonetask\' time.sleep(1) except: traceback.print_exc() pass print\'addtaskdone\' defadd_task_by_django(task_id,number): \'\'\'测试从django添加任务\'\'\' exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') defadd_task_sms(mobile,content): \'\'\'添加发送短信任务\'\'\' #列表参数或者字典参数 #task.apply_async(args=[arg1,arg2],kwargs={\'kwarg1\':\'x\',\'kwarg2\':\'y\'}) task_sms_send.apply_async(args=[mobile,content],queue=\'new_queue\') print\'taskadded:sms\' defmain(): action() if__name__==\'__main__\': main()