Source code for yr.config

#!/usr/bin/env python3
# coding=UTF-8
# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
yr api config for user
"""
import dataclasses
import json
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Union, Optional, get_origin, Any
from enum import Enum, IntEnum
from yr.affinity import Affinity

_DEFAULT_CONNECTION_NUMS = 100
_DEFAULT_ENABLE_METRICS = False
_DEFAULT_MAX_TASK_INSTANCE_NUM = -1
_DEFAULT_MAX_CONCURRENCY_CREATE_NUM = 100
_DEFAULT_CONCURRENCY = 1
_DEFAULT_RECYCLE_TIME = 2
_DEFAULT_HTTP_IOC_THREADS_NUM = 400
_DEFAULT_RPC_TIMOUT = 30 * 60
_MAX_INT = 0x7FFFFFFF
_MIN_INT = 0
NPU_RESOURCE_NAME = "NPU"
CPU_RESOURCE_NAME = "CPU"
MEMORY_RESOURCE_NAME = "Memory"


[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class UserTLSConfig: """ The SSL/TLS configuration used by users when communicating with external clusters. """ #: Path to the root certificate file. root_cert_path: str #: Path to the module certificate file. module_cert_path: str #: Path to the module key file. module_key_path: str #: Server name, defaults is ``None``. server_name: str = None
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class DeploymentConfig: """ AutoDeploymentConfig Attributes: cpu(str): cpu acquired, the unit is millicpu mem(str): mem acquiored (MB) datamem(str): data system mem acquired (MB) spill_path(str): spill path, when out of memory will flush data to disk spill_size(str): spill size limit (MB) """ cpu: int = 0 mem: int = 0 datamem: int = 0 spill_path: str = "" spill_limit: int = 0
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class Config: """ YR API config. """ #: Function id which you deploy, get default by env `YRFUNCID`. #: etc. ``sn:cn:yrk:default:function:0-test-test:$latest``. function_id: str = "" #: Cpp function id which you deploy, get default by env `YR_CPP_FUNCID`. cpp_function_id: str = "" #: Use default function for cpp. cpp_auto_function_name: str = "" #: Function name which need in runtime. function_name: str = "" #: System cluster address, get default by env `YR_SERVER_ADDRESS`. server_address: str = "" #: DataSystem address, get default by env `YR_DS_ADDRESS`. ds_address: str = "" #: Only ``False`` when initialize in runtime, default is ``True``. is_driver: bool = True #: YR api log level have ``ERROR/WARNING/INFO/DEBUG``, default is ``WARNING``. log_level: Union[str, int] = "" #: Http client read timeout(sec), default is ``900``. invoke_timeout: int = 900 #: Run code in local, default is ``False``. local_mode: bool = False #: The code directory path that must be configured at runtime initialization. code_dir: str = "" #: Http client connection nums. #: default is ``100``, limit: [1,∞). connection_nums: int = _DEFAULT_CONNECTION_NUMS #: Instance recycle period(sec). #: default is ``2 second``, limit: (0,300]. recycle_time: int = _DEFAULT_RECYCLE_TIME #: If ``True`` will use DataSystem in cluster client, default: ``False``. in_cluster: bool = None #: Auto generated by init. job_id: str = "" #: For out cluster https ssl. tls_config: UserTLSConfig = None #: Auto start distribute-executor when `yr.init`, and auto stop distribute-executor when `yr.finalize`. #: default is ``False``. auto: bool = False #: When `auto=True` needed, use to define deployment detail. deployment_config: "DeploymentConfig" = None #: Runtime server, keep default in driver. rt_server_address: str = "" #: Log directory, specifies the path where log files will be stored. #: Default is the current working directory ("./"). log_dir: str = "./" #: Path to environment variable file (.env format) to load at startup. #: The file should contain environment variables in KEY=VALUE format, one per line, e.g.: #: "KEY1=VALUE1" #: "KEY2=VALUE2" #: Lines starting with # are treated as comments and ignored. Empty lines are ignored. #: If specified, environment variables from this file will be loaded into os.environ. env_file: str = "" #: Max size for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``40``). log_file_size_max: int = 0 #: Max number for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``20``). log_file_num_max: int = 0 #: Interval for log flush, default is ``5``. log_flush_interval: int = 5 #: Runtime id, keep default in driver. runtime_id: str = "driver" #: The maximum number of instances of stateless function. max_task_instance_num: int = _DEFAULT_MAX_TASK_INSTANCE_NUM #: Code loading path. load_paths: list = field(default_factory=list) #: The timeout used for RPC. rpc_timeout: int = _DEFAULT_RPC_TIMOUT #: Whether to enable client two-way authentication, default is ``False``. enable_mtls: bool = None enable_tls: bool = None #: Client private key file path. private_key_path: str = "" #: Client certificate file path. certificate_file_path: str = "" #: Server certificate file path. verify_file_path: str = "" #: Client private key encryption password. private_key_paaswd: str = "" #: HTTP link worker thread. http_ioc_threads_num: int = _DEFAULT_HTTP_IOC_THREADS_NUM #: Server name, used to identify and connect to a specific server instance. server_name: str = "" #: Namespace, used to organize and isolate configurations or resources. ns: str = "" tenant_id: str = "" #: Whether to enable metric collection. ``False`` indicates disabled, and ``True`` indicates enabled. #: The default value is ``True``. This takes effect only when called in the cluster. enable_metrics: bool = True #: Used to set custom environment variables for the runtime. Currently, only `LD_LIBRARY_PATH` is supported. custom_envs: Dict[str, str] = field(default_factory=dict) #: Function master address list. master_addr_list: list = field(default_factory=list) #: Specify the user code or the local path location that it depends on, absolute path, and ensure that #: it exists on all nodes in the cluster. The default value is empty. working_dir: str = "" #: Whether to enable data system TLS authentication. #: If ``True``, enable data system tls authentication, else not. enable_ds_encrypt: bool = False #: The path of worker public key for data system tls authentication, #: if enable_ds_encrypt is true and the ds_public_key_path is empty, an exception will be thrown. ds_public_key_path: str = "" #: The path of client public key for data system tls authentication. #: if enable_ds_encrypt is true and the runtime_public_key_path is empty, an exception will be thrown. runtime_public_key_path: str = "" #: The path of client private key for data system tls authentication. #: if enable_ds_encrypt is true and the runtime_private_key_path is empty, an exception will be thrown. runtime_private_key_path: str = "" num_cpus: Optional[int] = None runtime_env: Optional[Dict[str, Any]] = None #: If ``True``, the output from all of the job processes on all nodes will be directed to the driver, #: default is ``False``. log_to_driver: bool = False #: If ``True``, deduplicates logs that appear redundantly across multiple processes, default True. #: The first instance of each log message is always immediately printed. However, subsequent log #: messages of the same pattern are buffered for up to five seconds and printed in batch. dedup_logs: bool = True auth_token: str = ""
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class ClientInfo: """ Use to store yr client info. """ #: Automatically generated when `yr.init` is called, a unique identifier for a task. job_id: str
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class Device: """ Use to init xpu task """ name: str = "" batch_size: int = 1
[docs] class SchedulingAffinityType(IntEnum): """ Bundle affinity type. **Please use ONLY REQUIRED_AFFINITY_IN_EACH_BUNDLE.** **All other attributes are inherited from IntEnum and should not be used directly.** """ #: Currently, only REQUIRED_AFFINITY_IN_EACH_BUNDLE is supported, #: indicating strong affinity within each bundle. REQUIRED_AFFINITY_IN_EACH_BUNDLE = 0
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class ResourceGroupOptions: """ Resource group options. """ #: The name of the ResourceGroup that needs to be scheduled. By default, it is empty. #: indicating that it is not scheduled to any ResourceGroup. #: If it is not empty, it is scheduled to the specified ResourceGroup. resource_group_name: str = "" #: The index of the bundle to be scheduled takes effect only if resource_group_name is not empty. #: The value range is [-1, the number of bundles in the ResourceGroup). #: default value is ``-1``, indicating that no specific bundle is specified; #: if it is a value other than ``-1`` within the value range, #: it indicates that the bundle is scheduled to the corresponding index of the ResourceGroup; #: if it is any other value, an error will be generated. bundle_index: int = -1
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class DebugConfig: """ debug instance configurations. """ enable: bool = False
[docs] @dataclass class FunctionGroupOptions: """ Function group options. """ #: Required CPU size in millicores (m), limited to the range [300, 16000]. cpu: Optional[int] = None #: Required memory size in MB, limited to the range [128, 65536]. memory: Optional[int] = None #: Custom resources, currently supports "NPU/XX/YY", where XX is the card model such as Ascend910B4, #: and YY can be ``count``, ``latency``, or ``stream``. resources: Dict[str, float] = field(default_factory=dict) #: The affinity type of instances within the bundle, Default is ``None``. scheduling_affinity_type: Optional[SchedulingAffinityType] = None #: The number of function instances within each bundle. #: Timeout period, measured in seconds. Restriction: ``-1, [0, 0x7FFFFFFF]``, #: The default value is ``-1``, indicating blocking, etc. scheduling_affinity_each_bundle_size: Optional[int] = None #: Timeout in seconds, valid values are ``-1`` or within ``[0, 0x7FFFFFFF]``. Default is ``-1``, #: meaning blocking wait. timeout: Optional[int] = None #: Instance concurrency, limited to the range [1, 1000]. concurrency: Optional[int] = None #: Number of recovery retry attempts, used when instance recovery fails.Default is ``0``. recover_retry_times: int = 0
def function_group_enabled(opts: FunctionGroupOptions, group_size: int) -> bool: """ if function group enabled. """ if opts.scheduling_affinity_each_bundle_size is None: return False if opts.scheduling_affinity_each_bundle_size > 0 and ( opts.scheduling_affinity_each_bundle_size <= group_size): return True if group_size == 0 and opts.scheduling_affinity_each_bundle_size == 0: return False raise RuntimeError("enable function group failed, group_size: {}, bundle_size: {}".format( group_size, opts.scheduling_affinity_each_bundle_size)) @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class DeviceInfo: #: 处理器 HDC 通道号 device_id: int = 0 #: 处理器真实网卡 IP device_ip: str = "" #: rank 的标识,rank id 从 0 开始 rank_id: int = 0 @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class ServerInfo: #: 挂载到本函数实例的设备信息 devices: List[DeviceInfo] = field(default_factory=list) #: 设备所在的节点 id server_id: str = ""
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class FunctionGroupContext: """ A context class for managing function group information. """ #: The ID of this function instance within the function group. #: Range: [0, world_size - 1] #: Default value: ``0``. rank_id: int = 0 #: Total number of function instances in the group. #: Default value: ``0``. world_size: int = 0 #: Server info list for inter-instance communication. #: Default: empty list. server_list: List['ServerInfo'] = field(default_factory=list) #: Name of the device used by this function instance, e.g., NPU/Ascend910B. #: Default: empty string. device_name: str = ""
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class GroupOptions: """ Configuration options for grouped instance scheduling. The `GroupOptions` structure defines parameters for the lifecycle management of grouped instances, including timeout settings for rescheduling when kernel resources are insufficient. """ #: Timeout for rescheduling when kernel resources are insufficient, in seconds. #: If set to `-1`, the kernel will retry scheduling indefinitely. #: If set to a value less than `0`, an exception will be thrown. #: Default value: ``-1``. timeout: int = -1 #: Whether to enable the fate-sharing configuration for grouped instances. #: `True` (default): Instances in the group will be created and destroyed together. #: `False`: Instances can have independent lifecycles. #: Default value: ``True``. same_lifecycle: bool = True #: The strategy to create the group #: None: No strategy. #: SPREAD: Distribute multiple instances across different nodes as much as possible. #: STRICT_PACK: All instances must be placed on the same node, otherwise creation fails. #: PACK: Pack multiple instances into the same node as much as possible. #: STRICT_SPREAD: All instances must be placed on different nodes, otherwise creation fails. #: Default: ``None``. strategy: str = ""
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class PortForwarding: """Defines a port to be forwarded into the sandbox. Attributes: port: The port number inside the sandbox. Range: [1, 65535]. protocol: The protocol type. Supported: "TCP", "UDP". Default: "TCP". """ port: int = 0 protocol: str = "TCP"
[docs] @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class InvokeOptions: """Use to set the invoke options. Examples: >>> import yr >>> import time >>> yr.init() >>> opt = yr.InvokeOptions() >>> opt.pod_labels["k1"] = "v1" >>> @yr.invoke(invoke_options=opt) ... def func(): ... time.sleep(100) >>> ret = func.invoke() >>> yr.get(ret) >>> yr.finalize() """ #: The size of the CPU required. Value Range is [300, 16000] and unit is m (milli-core). cpu: int = 500 #: The size of memory required. Unit: MB. Range: [128, 1073741824]. memory: int = 500 #: Instance concurrency. Value Range is [1, 1000]. Priority is higher than #: the "Concurrency" configured in custom_extensions. It is recommended to use this parameter for configuration. #: If not set (None), the default value will be determined by the runtime: #: - For async task/actor: 1000 #: - For non-async task/actor: 1 concurrency: Optional[int] = None #: Custom resources currently support "GPU/XX/YY" and "NPU/XX/YY", where XX is the card model such as Ascend910B4, #: and YY can be count, latency, or stream. custom_resources: Dict[str, float] = field(default_factory=dict) custom_extensions: Dict[str, str] = field(default_factory=dict) """ Specify user-defined configurations, such as function concurrency. It can also be used as a user-defined tag for metrics to collect user information. .. list-table:: Common `custom_extensions` configuration * - "Concurrency" - Concurrency. Range: [1,1000]. * - "lifecycle" - detached, supports detached mode. * - "DELEGATE_DIRECTORY_INFO" - Custom directories support the ability to create and delete subdirectories. When an instance is created, if the user-defined directory exists and has read and write permissions, a subdirectory is created under it as the working directory; otherwise, a subdirectory is created under the `/tmp` directory as the working directory. When the instance is destroyed, the working directory is destroyed. The user function can obtain the working directory through the `INSTANCE_WORK_DIR` environment variable. * - "DELEGATE_DIRECTORY_QUOTA" - Subdirectory quota size, value range is greater than ``0 M`` and less than ``1 TB``. If this configuration is not set, the default is ``512 M``. If the configuration is ``-1``, monitoring is not performed. Unit: MB. * - "GRACEFUL_SHUTDOWN_TIME" - Customize the graceful exit time, in seconds. Limit: ``>=0``, ``0`` means immediate exit, and does not guarantee that the user's graceful exit function can be completed; if configured <0, the system configuration at deployment time is used as the timeout time. * - "RECOVER_RETRY_TIMEOUT" - Customize the recover timeout time. The instance recover timeout time is in milliseconds. Limit: ``>0``, Default to ``10 * 60 * 1000`` When used as a user-defined tag for metrics: >>> import yr >>> yr.init() >>> opt = yr.InvokeOptions() >>> opt.custom_extensions["YR_Metrics"] = "{\'endpoint\':\'127.0.0.1\', \'project_id\':\'my_project_id\'}" In Prometheus, select `metrics name` as `yr_app_instance_billing_invoke_latency`, and you can find the custom tag information in the collected invoke information: .. code-block:: text yr_app_instance_billing_invoke_latency{ ... endpoint="127.0.0.1", ...} """ pod_labels: Dict[str, str] = field(default_factory=dict) """ Pod labels only used in Kubernetes environment. When creating a function instance, pod_labels can accept key-value pairs from the user and pass them to the function system. * After the ActorPattern function instance specialization is completed (Running), the Scaler applies the incoming labels to the POD. * When an ActorPattern function instance fails or is deleted, the Scaler sets the corresponding label of the POD to empty (Remove it); * Constraints: * The number of labels that can be stored in `pod_labels` cannot exceed 5. * Constraints on the key and value in `pod_labels`: * key:Supports uppercase and lowercase letters, numbers, and hyphens, and allows a length of 1-63. Does not start or end with a hyphen. Empty strings are not allowed. * value:Supports uppercase and lowercase letters, numbers, and hyphens, with a length of 1-63. Does not start or end with a hyphen. Allows empty strings. * Raises: When the `pod_labels` passed by the user does not meet the constraints, the corresponding exception and error message will be thrown. """ #: Labels of instance labels: List[str] = field(default_factory=list) #: Affinity of instance affinity: Dict[str, str] = field(default_factory=dict) #: Specify the name of the model used by the heterogeneous function. device: Device = field(default_factory=Device) #: Specify the time when the invoke call of the desired heterogeneous function is completed. max_invoke_latency: int = 5000 #: Specify the minimum number of instances for a stateless function. min_instances: int = 0 #: Specify the maximum number of instances for a stateless function. max_instances: int = 0 #: The number of instance recovery times (when an instance abnormally exits, the instance is automatically restored #: to the latest state). If the value is ``0``, the instance is not automatically restored when it abnormally exits. recover_retry_times: int = 0 #: Whether to enable order-preserving. Only effective for stateful functions. need_order: bool = False #: Used to specify the name of the instance. When `namespace` is specified, the instance name is `namespace-name`, #: otherwise it is `name`. name: str = "" #: Used to specify the namespace of the instance. namespace: str = "" #: Set affinity condition list. schedule_affinities: List[Affinity] = field(default_factory=dict) #: Whether to enable data affinity scheduling. is_data_affinity: bool = False #: Set whether to enable weak affinity priority scheduling. If enabled, when multiple weak affinity conditions are #: passed, match and score them in order. Scheduling is successful as soon as one condition is met. preferred_priority = True #: Set whether to enable strong affinity priority scheduling. If enabled, when multiple strong affinity conditions #: are passed, they are matched and scored in order. If none of the strong affinity conditions are met, #: the scheduling fails. required_priority = False #: Whether to enable anti-affinity for non-selectable resources. If enabled, scheduling fails when none of the #: weak affinity conditions are met. When preferred_anti_other_labels is set to True, if no PODs that meet the #: conditions are found for weak affinity/anti-affinity, scheduling fails and no other resources' PODs are #: selected for scheduling. preferred_anti_other_labels = False resource_group_options: ResourceGroupOptions = field( default_factory=ResourceGroupOptions) """ Specify the ResourceGroup option, which includes resource_group_name and bundle_index. When creating a function instance: If `resource_group_name` is set, it will be passed to the kernel to schedule to the specified ResourceGroup. If both `resource_group_name` and `bundle_index` are set, they are passed to the kernel to schedule the bundle to the specified ResourceGroup and index. The default value of `resource_group_name` is empty, and the default value of `bundle_index` is ``-1``. * Constraints: * When `resource_group_name` is empty, the instance will not be scheduled to the specified ResourceGroup, and the bundle_index field is not effective. * When `resource_group_name` is not empty: * When `bundle_index` is ``-1``, the instance is scheduled to the specified ResourceGroup. * When ``0<= bundle_index < number of bundles in ResourceGroup``,schedule the instance to a specified bundle in a specified ResourceGroup. * When ``bundle_index < -1`` or ``bundle_index >= number of bundles in ResourceGroup``,raise error. * Raises: * There is no ResourceGroup with the `resource_group_name` provided by the user. * The user passes a non-empty `resource_group_name` and ``bundle_index < -1``. * When the user passes a non-empty `resource_group_name` and `bundle_index` >= the number of bundles in the ResourceGroup * Scheduling failed: For example, the specified ResourceGroup or the specified bundle of the specified ResourceGroup does not have enough resources to handle instance scheduling. """ #: Function group options. function_group_options: FunctionGroupOptions = field( default_factory=FunctionGroupOptions) #: Set environment variables when the instance starts. env_vars: Dict[str, str] = field(default_factory=dict) #: Number of retries for stateless functions. retry_times: int = 0 #: Set the traceId for function calls for link tracing. trace_id: str = "" #: In the scenario where a function is invoked by a specified alias in cross-function invocation, when the #: alias is a rule alias, this parameter is used to set the kv parameter that the rule alias depends on. alias_params: Dict[str, str] = field(default_factory=dict) runtime_env: Dict = field(default_factory=dict) debug: DebugConfig = field(default_factory=DebugConfig) """ Configure the stateful/stateless function runtime environment with `conda`, `pip`, `working_dir`, and `env_vars`. * `conda` provides different Python runtime environments for stateful function. * Specify an existing conda environment (the environment exists on all nodes) ``runtime_env = {"conda":"pytorch_p39"}`` * Create and use conda environments through configuration. ``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9", "msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}`` * Create and use a conda environment through a YAML file (the YAML file meets the conda requirements). ``runtime_env = {"conda":"/home/env.yaml"}`` * `pip` installs dependencies for Python runtime environment. * `working_dir` configure the code path of the job. * `env_vars` configure process-level environment variables. ``runtime_env = {"env_vars":{"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}}`` * `shared_dir` supports configuring a shared directory for some instance, with yr managing the lifecycle of this shared directory. `shared_dir` supports two fields: name and TTL. The name field only allows numbers, letters, "-", and "_". The TTL supports integers greater than 0 and less than INTMAX. ``runtime_env = {"shared_dir":{"name": "user_define", "TTL": 5}}`` * Constraints of `runtime_env`: * The keys supported by runtime_env are `conda`, `env_vars`, `pip`, `working_dir`. Other keys will not take effect and will not cause errors. * Run the yr function with conda. The environment needs to have yr and its third-party dependencies. It is recommended that users first create a conda environment and then specify it with `runtime_env`, for example: ``runtime_env = {"conda":"pytorch_p39"}`` * `runtime_env` supports creating and switching conda environments using configurations. The configuration needs to install third-party dependencies for yr, for example: ``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9", "msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}`` * The environment created using conda in `runtime_env` needs to be cleaned up by the user. * In `runtime_env`, conda can use `pip` to install dependencies, which are managed directly by conda. ``runtime_env = {"conda":{'name': 'my_project_env', 'channels': ['defaults', 'conda-forge'], 'dependencies': ['python=3.9', {'pip': ['requests==2.25.1']}]}}`` * Currently, Python 3.9 and Python 3.11 SDKs are available. The Python version of conda needs to be consistent with the SDK version. * If both `InvokeOptions.env_vars` and `InvokeOptions.runtime_env["env_vars"]` are configured with the same key, the configuration in `InvokeOptions.env_vars` will be used. * If `InvokeOptions.runtime_env["working_dir"]` is configured, use this configuration, otherwise, use `YR.Config.working_dir` and finally use the configuration in `InvokeOptions.env_vars`. * If you use conda, you need to specify the environment variable `YR_CONDA_HOME` to point to installation path. * `shared_dir` has the following constraints: 1. It is not recommended to configure different TTL for the same shared directory. 2. The minimum cleanup interval for shared directories is 5 seconds. 3. When multiple yr Agents are deployed on the same node, each Agent must be configured with different root directory to prevent conflicts in shared directory management. """ #: Whether an instance can be preempted is effective only in the priority scenario (when the maxPriority #: configuration item deployed by YuanRong is greater than ``0``). The default value is ``False``. preempted_allowed: bool = False #: The priority of an instance is determined by its value. The higher the value, the higher the priority. #: A high-priority instance can preempt a low-priority instance that is configured as `preempted_allowed = True`. #: It only takes effect in priority scenarios (scenarios where the maxPriority configuration item of YuanRong #: deployment is greater than ``0``). The minimum value of `instance_priority` is ``0`` and the maximum value #: is the maxPriority configuration of YuanRong deployment. The default is ``0``. instance_priority: int = 0 #: The scheduling timeout time of an instance. Unit: milliseconds. Value range: #: [-1, the maximum value of the int type]. Default value: ``30000``. schedule_timeout_ms: int = 30000 group_name: str = "" is_delete_remote_tensor: bool = False get_if_exists: bool = False #: Whether to skip serializing the instance class code. #: Set to ``True`` for pre-deployed classes (e.g., SDK built-in classes) #: to avoid cross-version serialization issues. #: Default: ``False``. skip_serialize: bool = False port_forwardings: List[PortForwarding] = field(default_factory=list) """ Configure port forwarding rules for the sandbox. Each entry specifies a port to be forwarded inside the sandbox environment. When configured, the port forwarding rules are serialized to JSON and passed to the runtime via ``createOptions["network"]``. Supports configuring multiple ports simultaneously. * Constraints: * ``port``: Must be an integer in the range [1, 65535]. * ``protocol``: Must be ``"TCP"`` or ``"UDP"``. Default is ``"TCP"``. * Raises: * ``TypeError``: If ``port`` is not an ``int`` or ``protocol`` is not a ``str``. * ``ValueError``: If ``port`` is out of range or ``protocol`` is unsupported. Example:: >>> import yr >>> yr.init() >>> opt = yr.InvokeOptions() >>> opt.port_forwardings = [ ... yr.PortForwarding(port=8080), ... yr.PortForwarding(port=9090, protocol="UDP"), ... ] >>> @yr.invoke(invoke_options=opt) ... def serve(): ... pass The above configuration produces the following JSON in ``createOptions["network"]``:: {"portForwardings": [{"port": 8080, "protocol": "TCP"}, {"port": 9090, "protocol": "UDP"}]} """
[docs] def check_options_valid(self): """ Check whether the options are valid. Raises: TypeError: If options are invalid, throw this exception. """ attributes_to_check = [ ("env_vars", Dict[str, str]), ("name", str), ("namespace", str), ("preferred_anti_other_labels", bool), ("preferred_priority", bool), ("trace_id", str), ("custom_resources", Dict[str, float]), ("custom_extensions", Dict[str, str]), ] for actual, expected_type in attributes_to_check: actual_value = getattr(self, actual) if actual_value is None: continue origin = get_origin(expected_type) check_type = origin if origin is not None else expected_type if not isinstance(actual_value, check_type): raise TypeError( f"invalid type for '{actual}', actual: {type(actual_value)}, expect: {expected_type}" )
[docs] def check_options_range(self): """ Check whether the options are in the valid range. """ attrs = [ "retry_times", "recover_retry_times", "max_instances", "max_invoke_latency", "min_instances", ] for attr in attrs: value = getattr(self, attr) if attr in ["max_instances", "max_invoke_latency"] and value == -1: continue if not _MIN_INT <= value <= _MAX_INT: raise ValueError( f"{attr} 超过范围, 请输入 {_MIN_INT}{_MAX_INT} 范围的值" )
def dataclass_from_dict(klass, d): """ parse dataclass from dict """ try: field_types = {f.name: f.type for f in dataclasses.fields(klass)} return klass(**{f: dataclass_from_dict(field_types[f], d[f]) for f in d}) except Exception: return d # Not a dataclass field @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class MetaFunctionID: """ meta function id """ cpp: str = "" python: str = "" java: str = "" @dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False) class MetaConfig: """ TaskMetadata is used to convey control information { "jobID": "xxx", "codePath": "xx", "recycleTime": 2, "maxTaskInstanceNum": -1, "maxConcurrencyCreateNum": 100, "enableMetrics": false, "threadPoolSize": 10, "functionID": { "cpp": "xxx", "python": "xxx", "java": "xxx" } } """ jobID: str codePath: str recycleTime: int = _DEFAULT_RECYCLE_TIME maxTaskInstanceNum: int = _DEFAULT_MAX_TASK_INSTANCE_NUM maxConcurrencyCreateNum: int = _DEFAULT_MAX_CONCURRENCY_CREATE_NUM enableMetrics: bool = _DEFAULT_ENABLE_METRICS threadPoolSize: int = 10 functionID: MetaFunctionID = field(default_factory=MetaFunctionID) functionMasters: list = field(default_factory=list) @classmethod def parse(cls, data): """ parse TaskMetadata from json or dict Returns: TaskMetadata object """ self = dataclass_from_dict(MetaConfig, json.loads(data)) return self def to_json(self): """ convert to json Returns: json """ return json.dumps(asdict(self))