jina.peapods.zmq package

Module contents

class jina.peapods.zmq.Zmqlet(args, logger=None, ctrl_addr=None)[source]

Bases: object

A Zmqlet object can send/receive data to/from ZeroMQ socket and invoke callback function. It has three sockets for input, output and control.

Parameters
  • args (Namespace) – the parsed arguments from the CLI

  • logger (Optional[JinaLogger]) – the logger to use

  • ctrl_addr (Optional[str]) – control address

Warning

Starting from v0.3.6, ZmqStreamlet replaces Zmqlet as one of the key components in jina.peapods.runtimes.zmq.zed.ZEDRuntime. It requires tornado and uvloop to be installed.

pause_pollin()[source]

Remove in_sock from the poller

resume_pollin()[source]

Put in_sock back to the poller

static get_ctrl_address(host, port_ctrl, ctrl_with_ipc)[source]

Get the address of the control socket

Parameters
  • host (Optional[str]) – the host in the arguments

  • port_ctrl (Optional[str]) – the control port

  • ctrl_with_ipc (bool) – a bool of whether using IPC protocol for controlling

Return type

Tuple[str, bool]

Returns

A tuple of two pieces:

  • a string of control address

  • a bool of whether using IPC protocol for controlling

close(*args, **kwargs)[source]

Close all sockets and shutdown the ZMQ context associated to this Zmqlet.

Note

This method is idempotent.

Parameters
  • args – Extra positional arguments

  • kwargs – Extra key-value arguments

print_stats()[source]

Print out the network stats of of itself

send_message(msg)[source]

Send a message via the output socket

Parameters

msg (Message) – the protobuf message to send

recv_message(callback=None)[source]

Receive a protobuf message from the input socket

Parameters

callback (Optional[Callable[[Message], Message]]) – the callback function, which modifies the recevied message inplace.

Return type

Message

Returns

the received (and modified) protobuf message

class jina.peapods.zmq.AsyncZmqlet(args, logger=None, ctrl_addr=None)[source]

Bases: jina.peapods.zmq.Zmqlet

An async vesion of Zmqlet. The send_message() and recv_message() works in the async manner.

async send_message(msg, **kwargs)[source]

Send a protobuf message in async via the output socket

Parameters
  • msg (Message) – the protobuf message to send

  • kwargs – keyword arguments

async recv_message(callback=None)[source]

Receive a protobuf message in async manner.

Parameters

callback (Optional[Callable[[Message], Union[Message, Request]]]) – Callback function to receive message

Return type

Optional[Message]

Returns

Received protobuf message. Or None in case of any error.

class jina.peapods.zmq.ZmqStreamlet(*args, **kwargs)[source]

Bases: jina.peapods.zmq.Zmqlet

A ZmqStreamlet object can send/receive data to/from ZeroMQ stream and invoke callback function. It has three sockets for input, output and control.

Warning

Starting from v0.3.6, ZmqStreamlet replaces Zmqlet as one of the key components in jina.peapods.runtime.BasePea. It requires tornado and uvloop to be installed.

close(flush=True, *args, **kwargs)[source]

Close all sockets and shutdown the ZMQ context associated to this Zmqlet.

Note

This method is idempotent.

Parameters
  • flush (bool) – flag indicating if sockets need to be flushed before close is done

  • args – Extra positional arguments

  • kwargs – Extra key-value arguments

pause_pollin()[source]

Remove in_sock from the poller

resume_pollin()[source]

Put in_sock back to the poller

start(callback)[source]

Open all sockets and start the ZMQ context associated to this Zmqlet.

Parameters

callback (Callable[[Message], None]) – callback function to receive the protobuf message

jina.peapods.zmq.send_ctrl_message(address, cmd, timeout, raise_exception=False)[source]

Send a control message to a specific address and wait for the response

Parameters
  • address (str) – the socket address to send

  • cmd (Union[str, Message]) – the control command to send

  • timeout (int) – the waiting time (in ms) for the response

  • raise_exception (bool) – raise exception when exception found

Return type

Message

Returns

received message

jina.peapods.zmq.send_message(sock, msg, raise_exception=False, timeout=- 1, **kwargs)[source]

Send a protobuf message to a socket

Parameters
  • sock (Union[Socket, ZMQStream]) – the target socket to send

  • msg (Message) – the protobuf message

  • raise_exception (bool) – if true: raise an exception which might occur during send, if false: log error

  • timeout (int) – waiting time (in seconds) for sending

  • kwargs – keyword arguments

Return type

int

Returns

the size (in bytes) of the sent message

async jina.peapods.zmq.send_message_async(sock, msg, timeout=- 1, **kwargs)[source]

Send a protobuf message to a socket in async manner

Parameters
  • sock (Socket) – the target socket to send

  • msg (Message) – the protobuf message

  • timeout (int) – waiting time (in seconds) for sending

  • kwargs – keyword arguments

Return type

int

Returns

the size (in bytes) of the sent message

jina.peapods.zmq.recv_message(sock, timeout=- 1, **kwargs)[source]

Receive a protobuf message from a socket

Parameters
  • sock (Socket) – the socket to pull from

  • timeout (int) – max wait time for pulling, -1 means wait forever

  • kwargs – keyword arguments

Return type

Message

Returns

a tuple of two pieces

  • the received protobuf message

  • the size of the message in bytes

async jina.peapods.zmq.recv_message_async(sock, timeout=- 1, **kwargs)[source]

Receive a protobuf message from a socket in async manner

Parameters
  • sock (Socket) – the socket to pull from

  • timeout (int) – max wait time for pulling, -1 means wait forever

  • kwargs – keyword arguments

Return type

Message

Returns

a tuple of two pieces

  • the received protobuf message

  • the size of the message in bytes