jina.clients.websocket module#

class jina.clients.websocket.WebSocketClient(args=None, **kwargs)[source]#

Bases: WebSocketBaseClient, PostMixin, ProfileMixin, HealthCheckMixin

A client connecting to a Gateway using WebSocket protocol.

Instantiate this class through the jina.Client() convenience method.

EXAMPLE USAGE

from jina import Client
from docarray import Document

# select host address to connect to
c = Client(
    protocol='websocket', asyncio=False, host='ws://my.awesome.flow:1234'
)  # returns WebSocketClient instance
c.post(on='/index', inputs=Document(text='hello!'))
aio_tracing_client_interceptors()#

Create a gRPC client aio channel interceptor. :rtype: Optional[Sequence[ClientInterceptor]] :returns: An invocation-side list of aio interceptor objects.

aio_tracing_server_interceptors()#

Create a gRPC aio server interceptor. :rtype: Optional[Sequence[ServerInterceptor]] :returns: A service-side aio interceptor object.

static check_input(inputs=None, **kwargs)#

Validate the inputs and print the first request if success.

Parameters:
  • inputs (Optional[InputType]) – the inputs

  • kwargs – keyword arguments

Return type:

None

property client: T#

Return the client object itself

Return type:

TypeVar(T)

Returns:

the Client object

delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
dry_run(**kwargs)#
index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
property inputs: InputType#

An iterator of bytes, each element represents a Document’s raw content.

inputs defined in the protobuf

Return type:

InputType

Returns:

inputs

is_flow_ready(**kwargs)#

Check if the Flow is ready to receive requests

Parameters:

kwargs – potential kwargs received passed from the public interface

Return type:

bool

Returns:

boolean indicating the health/readiness of the Flow

post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=0.1, backoff_multiplier=1.5, results_in_order=False, **kwargs)#

Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • results_in_order (bool) – return the results in the same order as the inputs

  • kwargs – additional parameters

Return type:

Union[DocumentArray, List[Response], None]

Returns:

None or DocumentArray containing all response Documents

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

profiling(show_table=True)#

Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.

Parameters:

show_table (bool) – whether to show the table or not.

Return type:

Dict[str, float]

Returns:

the latency report in a dict.

search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
tracing_client_interceptor()#
Return type:

Optional[OpenTelemetryClientInterceptor]

Returns:

a gRPC client interceptor with the global tracing provider.

update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) Optional[Union[DocumentArray, List[Response]]]#
class jina.clients.websocket.AsyncWebSocketClient(args=None, **kwargs)[source]#

Bases: WebSocketBaseClient, AsyncPostMixin, AsyncProfileMixin, AsyncHealthCheckMixin

Asynchronous client connecting to a Gateway using WebSocket protocol.

Instantiate this class through the jina.Client() convenience method.

Unlike WebSocketClient, here post() is a coroutine (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed.

To actually run a coroutine, user need to put them in an event loop, e.g. via asyncio.run(), asyncio.create_task().

AsyncWebSocketClient can be very useful in the integration settings, where Jina/Flow/Client is NOT the main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control the asyncio.eventloop. On contrary, Client is controlling and wrapping the event loop internally, making the Client looks synchronous from outside.

EXAMPLE USAGE

from jina import Client
from docarray import Document

# async inputs for the client
async def async_inputs():
    for _ in range(10):
        yield Document()
        await asyncio.sleep(0.1)


# select host address to connect to
c = Client(
    protocol='websocket', asyncio=True, host='http://ws.awesome.flow:1234'
)  # returns AsyncWebSocketClient instance

async for resp in client.post(on='/index', async_inputs, request_size=1):
    print(resp)
aio_tracing_client_interceptors()#

Create a gRPC client aio channel interceptor. :rtype: Optional[Sequence[ClientInterceptor]] :returns: An invocation-side list of aio interceptor objects.

aio_tracing_server_interceptors()#

Create a gRPC aio server interceptor. :rtype: Optional[Sequence[ServerInterceptor]] :returns: A service-side aio interceptor object.

static check_input(inputs=None, **kwargs)#

Validate the inputs and print the first request if success.

Parameters:
  • inputs (Optional[InputType]) – the inputs

  • kwargs – keyword arguments

Return type:

None

property client: T#

Return the client object itself

Return type:

TypeVar(T)

Returns:

the Client object

delete(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
dry_run(**kwargs)#
index(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
property inputs: InputType#

An iterator of bytes, each element represents a Document’s raw content.

inputs defined in the protobuf

Return type:

InputType

Returns:

inputs

async is_flow_ready(**kwargs)#

Check if the Flow is ready to receive requests

Parameters:

kwargs – potential kwargs received passed from the public interface

Return type:

bool

Returns:

boolean indicating the health/readiness of the Flow

async post(on, inputs=None, on_done=None, on_error=None, on_always=None, parameters=None, target_executor=None, request_size=100, show_progress=False, continue_on_error=False, return_responses=False, max_attempts=1, initial_backoff=0.5, max_backoff=0.1, backoff_multiplier=1.5, results_in_order=False, **kwargs)#

Async Post a general data request to the Flow.

Parameters:
  • inputs (Optional[InputType]) – input data which can be an Iterable, a function which returns an Iterable, or a single Document.

  • on (str) – the endpoint which is invoked. All the functions in the executors decorated by @requests(on=…) with the same endpoint are invoked.

  • on_done (Optional[CallbackFnType]) – the function to be called when the Request object is resolved.

  • on_error (Optional[CallbackFnType]) – the function to be called when the Request object is rejected.

  • on_always (Optional[CallbackFnType]) – the function to be called when the Request object is either resolved or rejected.

  • parameters (Optional[Dict]) – the kwargs that will be sent to the executor

  • target_executor (Optional[str]) – a regex string. Only matching Executors will process the request.

  • request_size (int) – the number of Documents per request. <=0 means all inputs in one request.

  • show_progress (bool) – if set, client will show a progress bar on receiving every request.

  • continue_on_error (bool) – if set, a Request that causes an error will be logged only without blocking the further requests.

  • return_responses (bool) – if set to True, the result will come as Response and not as a DocumentArray

  • max_attempts (int) – Number of sending attempts, including the original request.

  • initial_backoff (float) – The first retry will happen with a delay of random(0, initial_backoff)

  • max_backoff (float) – The maximum accepted backoff after the exponential incremental delay

  • backoff_multiplier (float) – The n-th attempt will occur at random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))

  • results_in_order (bool) – return the results in the same order as the inputs

  • kwargs – additional parameters, can be used to pass metadata or authentication information in the server call

Yield:

Response object

Warning

target_executor uses re.match for checking if the pattern is matched. target_executor=='foo' will match both deployments with the name foo and foo_what_ever_suffix.

Return type:

AsyncGenerator[None, Union[DocumentArray, Response]]

async profiling(show_table=True)#

Profiling a single query’s roundtrip including network and computation latency. Results is summarized in a Dict.

Parameters:

show_table (bool) – whether to show the table or not.

Return type:

Dict[str, float]

Returns:

the latency report in a dict.

search(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#
tracing_client_interceptor()#
Return type:

Optional[OpenTelemetryClientInterceptor]

Returns:

a gRPC client interceptor with the global tracing provider.

update(inputs: Optional[InputType] = None, on_done: Optional[CallbackFnType] = None, on_error: Optional[CallbackFnType] = None, on_always: Optional[CallbackFnType] = None, parameters: Optional[Dict] = None, target_executor: Optional[str] = None, request_size: int = 100, show_progress: bool = False, continue_on_error: bool = False, return_responses: bool = False, max_attempts: int = 1, initial_backoff: float = 0.5, max_backoff: float = 0.1, backoff_multiplier: float = 1.5, results_in_order: bool = False, **kwargs) AsyncGenerator[None, Union[DocumentArray, Response]]#