Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG, when task was revoked/terminated , singleton also can not create new task #29

Open
chestarss opened this issue Sep 25, 2020 · 3 comments

Comments

@chestarss
Copy link

BUG, task was revoked , singleto also can not create new task

@chestarss chestarss changed the title BUG, task was revoked , singleto also can not create new task BUG, task was revoked , singleton also can not create new task Sep 25, 2020
@chestarss
Copy link
Author

In [14]: cron_sync_approval.delay()
Out[14]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [15]: from celery.result import AsyncResult

In [16]: res = AsyncResult("742569f5-3148-4bdf-9203-75ac8a0f87c5")

In [17]: res.state
Out[17]: 'REVOKED'

In [18]: cron_sync_approval.delay()
Out[18]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [19]: cron_sync_approval.delay()
Out[19]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

In [20]: cron_sync_approval.delay()
Out[20]: <AsyncResult: 742569f5-3148-4bdf-9203-75ac8a0f87c5>

@chestarss
Copy link
Author

chestarss commented Sep 26, 2020

workaroud method!

from celery.signals import task_revoked
from celery import current_app

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    from celery_singleton.util import generate_lock
    from celery_singleton.config import Config
    from celery_singleton.backends import get_backend
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name, task_args=task_args, task_kwargs=task_kwargs, key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)

@chestarss chestarss changed the title BUG, task was revoked , singleton also can not create new task BUG, when task was revoked/terminated , singleton also can not create new task Sep 26, 2020
@birdhackor
Copy link

workaroud method!

from celery.signals import task_revoked
from celery import current_app

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    from celery_singleton.util import generate_lock
    from celery_singleton.config import Config
    from celery_singleton.backends import get_backend
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name, task_args=task_args, task_kwargs=task_kwargs, key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)

for task that use unique_on, should modify to this:

import inspect
import importlib

from celery import current_app
from celery.signals import task_revoked
from celery_singleton.util import generate_lock
from celery_singleton.config import Config
from celery_singleton.backends import get_backend

@task_revoked.connect
def clean_revoked_task_singleton_lock(sender=None, headers=None, body=None, **kwargs):
    task_id = kwargs['request'].id
    task_name = kwargs['request'].task
    task_args = kwargs['request'].args
    task_kwargs = kwargs['request'].kwargs

    mod_name, func_name = task_name.rsplit('.',1)
    mod = importlib.import_module(mod_name)
    func = getattr(mod, func_name)
    unique_on = getattr(func, 'unique_on', None)

    if unique_on:
        if isinstance(unique_on, str):
            unique_on = [unique_on]
        sig = inspect.signature(func)
        bound = sig.bind(*task_args, **task_kwargs).arguments

        unique_args = []
        unique_kwargs = {key: bound[key] for key in unique_on}
    else:
        unique_args = task_args
        unique_kwargs = task_kwargs

    app_config = Config(current_app)
    backend = get_backend(app_config)
    redis_key = generate_lock(task_name,
                              task_args=unique_args,
                              task_kwargs=unique_kwargs,
                              key_prefix=app_config.key_prefix)

    # clean lock
    cache_task_id = backend.get(redis_key)
    if cache_task_id and cache_task_id.startswith(task_id):
        print(f'clean singletion lock:{redis_key} task_id:{task_id}')
        backend.clear(redis_key)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants