jina.peapods.zmq

class jina.peapods.zmq.Zmqlet(args, logger=None, ctrl_addr=None)[source]

Bases: object

A Zmqlet object can send/receive data to/from ZeroMQ socket and invoke callback function. It has three sockets for input, output and control.

Warning

Starting from v0.3.6, ZmqStreamlet replaces Zmqlet as one of the key components in jina.peapods.runtimes.zmq.zed.ZEDRuntime. It requires tornado and uvloop to be installed.

Parameters
  • args (argparse.Namespace) – the parsed arguments from the CLI

  • logger (JinaLogger) – the logger to use

register_pollin()[source]
pause_pollin()[source]

Remove in_sock from the poller

resume_pollin()[source]

Put in_sock back to the poller

static get_ctrl_address(host, port_ctrl, ctrl_with_ipc)[source]

Get the address of the control socket

Parameters
  • host (Optional[str]) – the host in the arguments

  • port_ctrl (Optional[str]) – the control port

  • ctrl_with_ipc (bool) – a bool of whether using IPC protocol for controlling

Return type

Tuple[str, bool]

Returns

A tuple of two pieces:

  • a string of control address

  • a bool of whether using IPC protocol for controlling

close_sockets()[source]

Close input, output and control sockets of this Zmqlet.

init_sockets()[source]

Initialize all sockets and the ZMQ context.

Return type

Tuple

Returns

A tuple of four pieces:

  • ZMQ context

  • the input socket

  • the output socket

  • the control socket

close()[source]

Close all sockets and shutdown the ZMQ context associated to this Zmqlet.

Note

This method is idempotent.

print_stats()[source]

Print out the network stats of of itself

send_message(msg)[source]

Send a message via the output socket

Parameters

msg (Message) – the protobuf message to send

send_idle()[source]

Tell the upstream router this dealer is idle

recv_message(callback=None)[source]

Receive a protobuf message from the input socket

Parameters

callback (Optional[Callable[[Message], Message]]) – the callback function, which modifies the recevied message inplace.

Return type

Message

Returns

the received (and modified) protobuf message

clear_stats()[source]

Reset the internal counter of send and receive bytes to zero.

class jina.peapods.zmq.AsyncZmqlet(args, logger=None, ctrl_addr=None)[source]

Bases: jina.peapods.zmq.Zmqlet

An async vesion of Zmqlet. The send_message() and recv_message() works in the async manner.

Parameters
  • args (argparse.Namespace) – the parsed arguments from the CLI

  • logger (JinaLogger) – the logger to use

async send_message(msg, sleep=0, **kwargs)[source]

Send a protobuf message in async via the output socket

Parameters
  • msg (Message) – the protobuf message to send

  • sleep (float) – the sleep time of every two sends in millisecond. A near-zero value could result in bad load balancing in the proceeding pods.

async recv_message(callback=None)[source]

Receive a protobuf message from the input socket

Parameters

callback (Optional[Callable[[Message], Union[Message, Request]]]) – the callback function, which modifies the recevied message inplace.

Return type

Message

Returns

the received (and modified) protobuf message

class jina.peapods.zmq.ZmqStreamlet(args, logger=None, ctrl_addr=None)[source]

Bases: jina.peapods.zmq.Zmqlet

A ZmqStreamlet object can send/receive data to/from ZeroMQ stream and invoke callback function. It has three sockets for input, output and control.

Warning

Starting from v0.3.6, ZmqStreamlet replaces Zmqlet as one of the key components in jina.peapods.runtime.BasePea. It requires tornado and uvloop to be installed.

Parameters
  • args (argparse.Namespace) – the parsed arguments from the CLI

  • logger (JinaLogger) – the logger to use

register_pollin()[source]
close()[source]

Close all sockets and shutdown the ZMQ context associated to this Zmqlet.

Note

This method is idempotent.

pause_pollin()[source]

Remove in_sock from the poller

resume_pollin()[source]

Put in_sock back to the poller

start(callback)[source]
jina.peapods.zmq.send_ctrl_message(address, cmd, timeout)[source]

Send a control message to a specific address and wait for the response

Parameters
  • address (str) – the socket address to send

  • cmd (str) – the control command to send

  • timeout (int) – the waiting time (in ms) for the response

Return type

Message

jina.peapods.zmq.send_message(sock, msg, timeout=-1, **kwargs)[source]

Send a protobuf message to a socket

Parameters
  • sock (Union[Socket, ZMQStream]) – the target socket to send

  • msg (Message) – the protobuf message

  • timeout (int) – waiting time (in seconds) for sending

Return type

int

Returns

the size (in bytes) of the sent message

async jina.peapods.zmq.send_message_async(sock, msg, timeout=-1, **kwargs)[source]

Send a protobuf message to a socket in async manner

Parameters
  • sock (Socket) – the target socket to send

  • msg (Message) – the protobuf message

  • timeout (int) – waiting time (in seconds) for sending

Return type

int

Returns

the size (in bytes) of the sent message

jina.peapods.zmq.recv_message(sock, timeout=-1, **kwargs)[source]

Receive a protobuf message from a socket

Parameters
  • sock (Socket) – the socket to pull from

  • timeout (int) – max wait time for pulling, -1 means wait forever

Return type

Message

Returns

a tuple of two pieces

  • the received protobuf message

  • the size of the message in bytes

async jina.peapods.zmq.recv_message_async(sock, timeout=-1, **kwargs)[source]

Receive a protobuf message from a socket in async manner

Parameters
  • sock (Socket) – the socket to pull from

  • timeout (int) – max wait time for pulling, -1 means wait forever

Return type

Message

Returns

a tuple of two pieces

  • the received protobuf message

  • the size of the message in bytes