Skip to content
Steven Gonzales edited this page May 30, 2013 · 1 revision

###Intent:

It is possible that the datastore layer could become unavailable to a worker due to problems with network connectivity or the datastore itself.

Because of this potential problem it is necessary for workers to implement a form of store and forward so that messages may be queued locally before an attempt is made to send to the datastore. This will allow workers to continue to process incoming messages and forward them to the datastore once connectivity has been restored.

###Celery & RabbitMQ

Celery was chosen to implement this as a task queue as it supports concurrency using multiprocessing, has support for using several different message brokers. Celery also has features that allow us to retry failed tasks, keep reserved tasks on the queue until after they have been processed, and allows for different methods of task serialization. Currently JSON serialization is being used as opposed to riskier methods such as pickle.

RabbitMQ was chosen as the broker for the local queue as it has multiprocess support and manages the flushing of messages to disk for queue durability. The durability of the queue will allow the celery worker to resume processing existing messages in the event of an application or system restart.

###Implementation

RabbitMQ is installed locally on the worker, and is only available on localhost. Meniscus is using librabbitmq as the AMQP transport library.

Celery configurations are stored in our meniscus configuration file:

[celery]
BROKER_URL = librabbitmq://guest@localhost//
CELERYD_CONCURRENCY = 10
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_SERIALIZER = json

The celery worker is defined and configured from within the meniscus application code using the settings imported from the meniscus configuration file. When a meniscus worker is started it launches the celery worker processes to begin reading tasks off of the queue.

Message tasks are also defined in the meniscus application code with the use of the @celery.task decorator. Consider the following snippet of code:

@celery.task(acks_late=True, max_retries=0,
             ignore_result=True, serializer="json")
def persist_message(message):
    """Takes a message and persists it to the datastrore."""
    try:
        _sink = db_handler()
        _sink.put('logs', message)
    except:
        persist_message.retry()

The @celery.task decorator defines this method as a celery task.

The acks_late=True parameter tells celery not to delete the message from the queue until it has been processed by the worker process that reserved the task. This helps preserve the message in the event the celery worker process crashes or is killed before completing the task.

The call to persist_message.retry() places the task back on the queue in the event of failure while the max_retries=0 tells celery to continue to retry the message until successful.

As a meniscus worker begins receiving incoming messages, it validates the message and adds correlation information before placing the message task on the queue. The task is placed on queue for processing by calling the task decorated function with .delay()

#pass persist_message call to queue
persist_message.delay(message)

Clone this wiki locally