Source code for jina.orchestrate.deployments.config.k8slib.kubernetes_deployment

import json
from argparse import Namespace
from typing import Dict, List, Optional, Tuple, Union

from jina.orchestrate.deployments.config.k8slib import kubernetes_tools
from jina.serve.networking import GrpcConnectionPool

[docs]def get_deployment_yamls( name: str, namespace: str, image_name: str, container_cmd: str, container_args: str, replicas: int, pull_policy: str, jina_deployment_name: str, pod_type: str, shard_id: Optional[int] = None, port: Optional[int] = None, env: Optional[Dict] = None, gpus: Optional[Union[int, str]] = None, image_name_uses_before: Optional[str] = None, image_name_uses_after: Optional[str] = None, container_cmd_uses_before: Optional[str] = None, container_cmd_uses_after: Optional[str] = None, container_args_uses_before: Optional[str] = None, container_args_uses_after: Optional[str] = None, monitoring: bool = False, port_monitoring: Optional[int] = None, protocol: Optional[str] = None ) -> List[Dict]: """Get the yaml description of a service on Kubernetes :param name: name of the service and deployment :param namespace: k8s namespace of the service and deployment :param image_name: image for the k8s deployment :param container_cmd: command executed on the k8s pods :param container_args: arguments used for the k8s pod :param replicas: number of replicas :param pull_policy: pull policy used for fetching the Docker images from the registry. :param jina_deployment_name: Name of the Jina Deployment this deployment belongs to :param pod_type: type os this pod, can be gateway/head/worker :param shard_id: id of this shard, None if shards=1 or this is gateway/head :param port: port which will be exposed by the deployed containers :param env: environment variables to be passed into configmap. :param gpus: number of gpus to use, for k8s requires you pass an int number, refers to the number of requested gpus. :param image_name_uses_before: image for uses_before container in the k8s deployment :param image_name_uses_after: image for uses_after container in the k8s deployment :param container_cmd_uses_before: command executed in the uses_before container on the k8s pods :param container_cmd_uses_after: command executed in the uses_after container on the k8s pods :param container_args_uses_before: arguments used for uses_before container on the k8s pod :param container_args_uses_after: arguments used for uses_after container on the k8s pod :param monitoring: enable monitoring on the deployment :param port_monitoring: port which will be exposed, for the prometheus server, by the deployed containers :param protocol: In case of being a Gateway, the protocol used to expose its server :return: Return a dictionary with all the yaml configuration needed for a deployment """ # we can always assume the ports are the same for all executors since they run on different k8s pods # port expose can be defined by the user if not port: port = GrpcConnectionPool.K8S_PORT if not port_monitoring: port_monitoring = GrpcConnectionPool.K8S_PORT_MONITORING deployment_params = { 'name': name, 'namespace': namespace, 'image': image_name, 'replicas': replicas, 'command': container_cmd, 'args': container_args, 'port': port, 'port_uses_before': GrpcConnectionPool.K8S_PORT_USES_BEFORE, 'port_uses_after': GrpcConnectionPool.K8S_PORT_USES_AFTER, 'args_uses_before': container_args_uses_before, 'args_uses_after': container_args_uses_after, 'command_uses_before': container_cmd_uses_before, 'command_uses_after': container_cmd_uses_after, 'image_uses_before': image_name_uses_before, 'image_uses_after': image_name_uses_after, 'pull_policy': pull_policy, 'jina_deployment_name': jina_deployment_name, 'shard_id': f'\"{shard_id}\"' if shard_id is not None else '\"\"', 'pod_type': pod_type, 'protocol': str(protocol).lower() if protocol is not None else '', } if gpus: deployment_params['device_plugins'] = {'': gpus} template_name = 'deployment-executor' if name != 'gateway' else 'deployment-gateway' if image_name_uses_before and image_name_uses_after: template_name = 'deployment-uses-before-after' elif image_name_uses_before: template_name = 'deployment-uses-before' elif image_name_uses_after: template_name = 'deployment-uses-after' if monitoring: service_yaml = kubernetes_tools.get_yaml( 'service_monitoring', { 'name': name, 'target': name, 'namespace': namespace, 'port': port, 'type': 'ClusterIP', 'port_monitoring': port_monitoring, }, ) service_monitor_yaml = kubernetes_tools.get_yaml( 'service_monitor', { 'name': name, 'target': name, 'namespace': namespace, }, ) else: service_yaml = kubernetes_tools.get_yaml( 'service', { 'name': name, 'target': name, 'namespace': namespace, 'port': port, 'type': 'ClusterIP', }, ) service_monitor_yaml = None yamls = [ kubernetes_tools.get_yaml( 'configmap', { 'name': name, 'namespace': namespace, 'data': env, }, ), service_yaml, kubernetes_tools.get_yaml(template_name, deployment_params), ] if service_monitor_yaml: yamls.append(service_monitor_yaml) return yamls
[docs]def get_cli_params( arguments: Namespace, skip_list: Tuple[str] = (), port: Optional[int] = None ) -> str: """Get cli parameters based on the arguments. :param arguments: arguments where the cli parameters are generated from :param skip_list: list of arguments which should be ignored :param port: overwrite port with the provided value if set :return: string which contains all cli parameters """ = '' skip_attributes = [ 'uses', # set manually 'uses_with', # set manually 'runtime_cls', # set manually 'workspace', 'log_config', 'polling_type', 'uses_after', 'uses_before', 'replicas', ] + list(skip_list) if port: arguments.port = port arg_list = [ [attribute, attribute.replace('_', '-'), value] for attribute, value in arguments.__dict__.items() ] cli_args = [] for attribute, cli_attribute, value in arg_list: # TODO: This should not be here, its a workaround for our parser design with boolean values if attribute in skip_attributes: continue if type(value) == bool and value: cli_args.append(f'"--{cli_attribute}"') elif type(value) != bool: if value is not None: value = str(value) value = value.replace('\'', '').replace('"', '\\"') cli_args.append(f'"--{cli_attribute}", "{value}"') cli_string = ', '.join(cli_args) return cli_string
[docs]def dictionary_to_cli_param(dictionary) -> str: """Convert the dictionary into a string to pass it as argument in k8s. :param dictionary: dictionary which has to be passed as argument in k8s. :return: string representation of the dictionary """ return json.dumps(dictionary).replace('"', '\\"') if dictionary else ""