Queue API

Basic Definitions:

  • ttl - Time to Live of task.
  • ttr - Time to Release of task.
  • pri - Priority of task.
  • delay - Delay for task to be added to queue.


Don’t use constructor of Task and Tube. Task’s are created by Tube and Queue methods. For creating Tube object use Queue.tube(name)

class tarantool_queue.Queue(host='localhost', port=33013, space=0, schema=None)[source]

Tarantool queue wrapper. Surely pinned to space. May create tubes. By default it uses msgpack for serialization, but you may redefine serialize and deserialize methods. You must use Queue only for creating Tubes. For more usage, please, look into tests. Usage:

>>> from tarantool_queue import Queue
>>> queue = Queue()
>>> tube1 = queue.create_tube('holy_grail', ttl=100, delay=5)
# Put task into the queue
>>> tube1.put([1, 2, 3])
# Put task into the beggining of queue (Highest priority)
>>> tube1.urgent([2, 3, 4])
>>> tube1.get() # We get task and automaticaly release it
>>> task1 = tube1.take()
>>> task2 = tube1.take()
>>> print(task1.data)
    [2, 3, 4]
>>> print(task2.data)
    [1, 2, 3]
>>> del task2
>>> del task1
>>> print(tube1.take().data)
    [1, 2, 3]
# Take task and Ack it
>>> tube1.take().ack()

alias of DatabaseError

exception NetworkError(orig_exception=None, *args)

Error related to network


Deserialize function: must be Callable. If sets to None or delete, then it will use msgpack for deserializing.


Return a task by task id.

Parameters:task_id (string) – UUID of task in HEX
Return type:Task instance

Serialize function: must be Callable. If sets to None or deleted, then it will use msgpack for serializing.


Return queue module statistics accumulated since server start. Output format: if tube != None, then output is dictionary with stats of current tube. If tube is None, then output is dict of t stats, ...} e.g.:

>>> tube.statistics()
# or queue.statistics('tube0')
# or queue.statistics(tube.opt['tube'])
{'ack': '233',
'meta': '35',
'put': '153',
'release': '198',
'take': '431',
'take_timeout': '320',
'tasks': {'buried': '0',
        'delayed': '0',
        'done': '0',
        'ready': '0',
        'taken': '0',
        'total': '0'},
'urgent': '80'}
>>> queue.statistics()
{'tube0': {'ack': '233',
        'meta': '35',
        'put': '153',
        'release': '198',
        'take': '431',
        'take_timeout': '320',
        'tasks': {'buried': '0',
                'delayed': '0',
                'done': '0',
                'ready': '0',
                'taken': '0',
                'total': '0'},
        'urgent': '80'}}
Parameters:tube (string or None) – Name of tube
Return type:dict with statistics

Tarantool Connection class: must be class with methods call and __init__. If it sets to None or deleted - it will use the default tarantool.Connection class for connection.


Locking class: must be locking instance with methods __enter__ and __exit__. If it sets to None or delete - it will use default threading.Lock() instance for locking in the connecting.


Truncate queue tube, return quantity of deleted tasks

Parameters:tube (string) – Name of tube
Return type:int
Queue.tube(name, **kwargs)[source]

Create Tube object, if not created before, and set kwargs. If existed, return existed Tube.

  • name (string) – name of Tube
  • delay (int) – default delay for Tube tasks (Not necessary, will be 0)
  • ttl (int) – default TTL for Tube tasks (Not necessary, will be 0)
  • ttr (int) – default TTR for Tube tasks (Not necessary, will be 0)
  • pri (int) – default priority for Tube tasks (Not necessary)
Return type:

Tube instance