Source code for jina.checker
import argparse
from jina.logging.predefined import default_logger
[docs]class NetworkChecker:
"""Check if a BaseDeployment is running or not."""
def __init__(self, args: 'argparse.Namespace'):
"""
Create a new :class:`NetworkChecker`.
:param args: args provided by the CLI.
"""
import time
from jina.logging.profile import TimeContext
from jina.serve.runtimes.worker import WorkerRuntime
ctrl_addr = f'{args.host}:{args.port}'
try:
total_time = 0
total_success = 0
for j in range(args.retries):
with TimeContext(
f'ping {ctrl_addr} at {j} round', default_logger
) as tc:
r = WorkerRuntime.is_ready(ctrl_addr)
if not r:
default_logger.warning(
'not responding, retry (%d/%d) in 1s'
% (j + 1, args.retries)
)
else:
total_success += 1
total_time += tc.duration
time.sleep(1)
if total_success < args.retries:
default_logger.warning(
'message lost %.0f%% (%d/%d) '
% (
(1 - total_success / args.retries) * 100,
args.retries - total_success,
args.retries,
)
)
if total_success > 0:
default_logger.info(
'avg. latency: %.0f ms' % (total_time / total_success * 1000)
)
exit(0)
except KeyboardInterrupt:
pass
# returns 1 (anomaly) when it comes to here
exit(1)
[docs]def dry_run_checker(args: 'argparse.Namespace'):
"""
call dry run on the given endpoint
:param args: args provided by the CLI.
"""
# No retry mechanism for dry run since it is built in the Flow
from jina import Client
client = Client(host=args.host)
try:
if client.dry_run(timeout=args.timeout):
default_logger.info('dry run successful')
exit(0)
else:
default_logger.warning('dry run failed')
exit(1)
except KeyboardInterrupt:
pass
exit(1)
# returns 1 (anomaly) when it comes to here