Celery任务结果分片管理
Project description
celery-streaming-result
Celery任务结果分片管理。
安装
pip install celery-streaming-result
使用方法
服务端
import time
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
@app.task(bind=True)
def task1(celery_task):
result = []
for i in range(10):
csrm.append_result_chunk(celery_task, i)
result.append(i)
csrm.append_ended_chunk(celery_task)
return result
客户端(同步)
import redis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
# 根据你的task定义,正确引用
from test_server import task1
app = app_or_default()
redis_instance = redis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
# 生成一个异步任务
atask1 = task1.delay()
# 读取该异步任务的结果分片
for chunk in csrm.get_result_chunks(atask1):
print(chunk, end="-", flush=True)
客户端(异步)
from redis import asyncio as aioredis
from celery.app import app_or_default
from celery_streaming_result import CeleryStreamingResultManager
from celery_streaming_result import start_celery_task_async
from celery_streaming_result import get_celery_task_result_async
from test_server import task1 # 根据你的task定义,正确引用
app = app_or_default()
redis_instance = aioredis.Redis()
csrm = CeleryStreamingResultManager(redis_instance)
async def on_finished(
celery_task,
):
print("on finished...")
task_result = await get_celery_task_result_async(
celery_task,
)
# 这里的task_result值是celery任务的返回值。
# 一般来说是所有结果分片的集合,但实际只取决于celery任务的实现。
atask1 = await start_celery_task_async(
task1
) # task1.delay()是一个同步函数。需要使用`sync_to_async`进行转化。
# 读取该异步任务的结果分片,如果任务结果,则回调on_finished函数。
async for chunk in csrm.get_result_chunks(
atask1,
on_finished=on_finished,
):
print(chunk, end="-", flush=True)
版本记录
v0.1.0
- 首次发布。
v0.1.1
- 添加asyncio支持。
- 获取结果支持on_finished回调。
v0.1.2
- 流式中断支持。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for celery_streaming_result-0.1.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8f19154071ab52982475e17a0ee8424305843917f8aed7ddc0fbbd00cd6b1e10 |
|
MD5 | 7870684aea95537ff8bae136aef13c3f |
|
BLAKE2b-256 | 2e0932116d864a282e90543db3ff974f1613de7902cd7cae516758e9424742a0 |
Close
Hashes for celery_streaming_result-0.1.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f631ac4937fa9183e0790deaf33b8f4e960a4b013eddced7c70aad64bb271b20 |
|
MD5 | 340b56fd087e8e49b4c8813e10325284 |
|
BLAKE2b-256 | b448c674396dc01594ab28854a4be29519148e07762087b63681801a6a5d5f58 |