本文共 3395 字,大约阅读时间需要 11 分钟。
Celery是由Python开发、简单、灵活、可靠的分布式
任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:
简单:熟悉celery的工作流程后,配置使用简单高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务快速:一个单进程的celery每分钟可处理上百万个任务灵活:几乎celery的各个组件都可以被扩展及自定制
生产者消费者模型
消息代理和结果存储 => redis
#linux安装运行redis# 安装yum install redis# 配置:修改监听端口 =》vim /etc/redis.confbind 0.0.0.0# 启动:redis-server /etc/redis.conf &# 重启: kilall redis-server# 如果没有killall命令,请这样安装yum install psmisc -yredis-server /etc/redis.conf &=>port: 6379
Flask代码处理=> flask+celery
1. 安装依赖pip install redispip install celery2. 创建celery实例: celery_app/__init__.pyfrom celery import Celerycelery = Celery("celery_app")# 从配置文件中读取并配置celerycelery.config_from_object('config.celery_config')3. 创建配置文件 config/celery_config.py# 配置消息中间件的地址BROKER_URL = "redis://192.168.189.200:6379/1"# 结果存放地址CELERY_RESULT_BACKEND = "redis://192.168.189.200:6379/2"# 启动Celery时,导入任务CELERY_IMPORTS = ( 'celery_app.tasks',)# 配置定时任务4. 创建任务:celery_app/tasks.pyfrom . import celeryimport timeimport random@celery.taskdef celery_task(sth1): """输出start, end, time.sleep""" print("celery_app.task start") delay_time = random.randint(5, 10) time.sleep(delay_time) print("celery_app.task end") return sth1# 调用任务@view01_bp.route('/index/')def index(): # 调用一个异步任务 from celery_app.tasks import celery_task # 立即发送任务,立刻执行任务 celery_task.delay('hello world!') # celery_task.apply_async() => 指定运行倒计时,发送到哪个队列.... return "This is index""""目标:当触发该任务时,web页面能实时返回return的结果,而不是在等待在worker看到任务被接收和处理"""5. 将celery配置启动入口:manage.pyfrom celery_app import celery以便通过 => manage.celery 启动celery项目6. 启动: 1. flask-web => python server.py 2. worker => 1个或n个 => celery worker -A manage.celery --loglevel=info -n nodename nodename指定的workd的名字(自己取) 3. beat => 如果有定时任务celery4 windows不支持结果处理pip install eventlet然后启动worker的时候加一个参数,如下:celery -Aworker -l info -P eventlet
也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/
tar xvfz celery-0.0.0.tar.gzcd celery-0.0.0python setup.py buildpython setup.py install
1.在pycharm:Tool->Deployment-> + -> 设置一个名字, SFTP 配置Connection 配置Mappings2. 创建虚拟环境 cd /opt/flaskproj/ python3 -m venv linuxvenv source linuxvenv/bin/activate pip install -r requirements.txt3. 运行 cd /opt/flaskproj/ && source linuxvenv/bin/activate && cd operbench # 运行flask python server.py # 运行celery worker celery worker -A manage.celery --loglevel=info -n node1### web方式查看celery的工作情况cd /opt/flaskproj/ && source linuxvenv/bin/activate && cd operbenchpip install flowerflower -A manage.celery --loglevel=info --address=192.168.189.200 --port=5555
# tasks.pyimport timefrom celery import Celerycelery = Celery('tasks', broker='redis://localhost:6379/0')@celery.taskdef sendmail(mail): print('sending mail to %s...' % mail['to']) time.sleep(2.0) print('mail sent.')
$ celery -A tasks worker --loglevel=info
上面的命令行实际上启动的是Worker,如果要放到后台运行,可以扔给supervisor。
如何发送任务?>>> from tasks import sendmail>>> sendmail.delay(dict(to='celery@python.org'))
转载地址:http://lqezi.baihongyu.com/