Queue API

Basic Definitions:

  • ttl - Time to Live of task.
  • ttr - Time to Release of task.
  • pri - Priority of task.
  • delay - Delay for task to be added to queue.

Warning

Don’t use constructor of Task and Tube. Task’s are created by Tube and Queue methods. For creating Tube object use Queue.tube(name)

class tarantool_queue.Queue(host='localhost', port=33013, space=0, schema=None)

Tarantool queue wrapper. Surely pinned to space. May create tubes. By default it uses msgpack for serialization, but you may redefine serialize and deserialize methods. You must use Queue only for creating Tubes. For more usage, please, look into tests. Usage:

>>> from tntqueue import Queue
>>> queue = Queue()
>>> tube1 = queue.create_tube('holy_grail', ttl=100, delay=5)
# Put task into the queue
>>> tube1.put([1, 2, 3])
# Put task into the beggining of queue (Highest priority)
>>> tube1.urgent([2, 3, 4])
>>> tube1.get() # We get task and automaticaly release it
>>> task1 = tube1.take()
>>> task2 = tube1.take()
>>> print(task1.data)
    [2, 3, 4]
>>> print(task2.data)
    [1, 2, 3]
>>> del task2
>>> del task1
>>> print(tube1.take().data)
    [1, 2, 3]
# Take task and Ack it
>>> tube1.take().ack()
    True
DataBaseError

alias of DatabaseError

exception NetworkError(orig_exception=None, *args)

Error related to network

Queue.deserialize

Deserialize function: must be Callable. If sets to None or delete, then it will use msgpack for deserializing.

Queue.peek(task_id)

Return a task by task id.

Parameters:task_id (string) – UUID of task in HEX
Return type:Task instance
Queue.serialize

Serialize function: must be Callable. If sets to None or deleted, then it will use msgpack for serializing.

Queue.statistics(tube=None)

Return queue module statistics accumulated since server start. Output format: if tube != None, then output is dictionary with stats of current tube. If tube is None, then output is dict of t stats, ...} e.g.:

>>> tube.statistics()
# or queue.statistics('tube0')
# or queue.statistics(tube.opt['tube'])
{'ack': '233',
'meta': '35',
'put': '153',
'release': '198',
'take': '431',
'take_timeout': '320',
'tasks': {'buried': '0',
        'delayed': '0',
        'done': '0',
        'ready': '0',
        'taken': '0',
        'total': '0'},
'urgent': '80'}
or
>>> queue.statistics()
{'tube0': {'ack': '233',
        'meta': '35',
        'put': '153',
        'release': '198',
        'take': '431',
        'take_timeout': '320',
        'tasks': {'buried': '0',
                'delayed': '0',
                'done': '0',
                'ready': '0',
                'taken': '0',
                'total': '0'},
        'urgent': '80'}}
Parameters:tube (string or None) – Name of tube
Return type:dict with statistics
Queue.tarantool_connection

Tarantool Connection class: must be class with methods call and __init__. If it sets to None or deleted - it will use the default tarantool.Connection class for connection.

Queue.tarantool_lock

Locking class: must be locking instance with methods __enter__ and __exit__. If it sets to None or delete - it will use default threading.Lock() instance for locking in the connecting.

Queue.tube(name, **kwargs)

Create Tube object, if not created before, and set kwargs. If existed, return existed Tube.

Parameters:
  • name (string) – name of Tube
  • delay (int) – default delay for Tube tasks (Not necessary, will be 0)
  • ttl (int) – default TTL for Tube tasks (Not necessary, will be 0)
  • ttr (int) – default TTR for Tube tasks (Not necessary, will be 0)
  • pri (int) – default priority for Tube tasks (Not necessary)
Return type:

Tube instance

class tarantool_queue.Tube(queue, name, **kwargs)

Tarantol queue tube wrapper. Pinned to space and tube, but unlike Queue it has predefined delay, ttl, ttr, and pri.

Warning

Don’t instantiate it with your bare hands

deserialize

Deserialize function: must be Callable or None. Sets None when deleted

kick(count=None)

‘Dig up’ count tasks in a queue. If count is not given, digs up just one buried task.

:rtype boolean

put(data, **kwargs)

Enqueue a task. Returns a tuple, representing the new task. The list of fields with task data (‘...’)is optional. If urgent set to True then the task will get the highest priority.

Parameters:
  • data – Data for pushing into queue
  • urgent (boolean) – make task urgent (Not necessary, False by default)
  • delay (int) – new delay for task (Not necessary, Default of Tube object)
  • ttl (int) – new time to live (Not necessary, Default of Tube object)
  • ttr (int) – time to release (Not necessary, Default of Tube object)
  • tube (string) – name of Tube (Not necessary, Default of Tube object)
  • pri – priority (Not necessary, Default of Tube object)
Return type:

Task instance

serialize

Serialize function: must be Callable or None. Sets None when deleted

statistics()

See Queue.statistics() for more information.

take(timeout=0)

If there are tasks in the queue ready for execution, take the highest-priority task. Otherwise, wait for a ready task to appear in the queue, and, as soon as it appears, mark it as taken and return to the consumer. If there is a timeout, and the task doesn’t appear until the timeout expires, return ‘None’. If timeout is None, wait indefinitely until a task appears.

Parameters:timeout (int or None) – timeout to wait.
Return type:Task instance or None
update_options(**kwargs)

Update options for current tube (such as ttl, ttr, pri and delay)

urgent(data=None, **kwargs)

Same as Tube.put() put, but set highest priority for this task.

class tarantool_queue.Task(queue, space=0, task_id=0, tube='', status='', raw_data=None)

Tarantool queue task wrapper.

Warning

Don’t instantiate it with your bare hands

ack()

Confirm completion of a task. Before marking a task as complete

Return type:Task instance
bury()

Mark a task as buried. This special status excludes the task from the active list, until it’s dug up. This function is useful when several attempts to execute a task lead to a failure. Buried tasks can be monitored by the queue owner, and treated specially.

Return type:boolean
delete()

Delete a task from the queue (regardless of task state or status).

Return type:boolean
dig()

‘Dig up’ a buried task, after checking that the task is buried. The task status is changed to ready.’

Return type:boolean
done(data)

Mark a task as complete (done), but don’t delete it. Replaces task data with the supplied data.

Parameters:data – Data for pushing into queue
Return type:boolean
meta()

Return unpacked task metadata. :rtype: dict with metainformation or None

release(**kwargs)

Return a task back to the queue: the task is not executed.

Parameters:
  • ttl (int) – new time to live
  • delay (int) – new delay for task
Return type:

Task instance

requeue()

Return a task to the queue, the task is not executed. Puts the task at the end of the queue, so that it’s executed only after all existing tasks in the queue are executed.

Return type:boolean
touch()

Prolong living time for taken task with this id.

Return type:boolean

Previous topic

tarantool-queue-python

Next topic

Quick Start

This Page