#!/usr/bin/env python3
# coding=UTF-8
# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
yr api config for user
"""
import dataclasses
import json
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Union, Optional, get_origin, Any
from enum import Enum, IntEnum
from yr.affinity import Affinity
_DEFAULT_CONNECTION_NUMS = 100
_DEFAULT_ENABLE_METRICS = False
_DEFAULT_MAX_TASK_INSTANCE_NUM = -1
_DEFAULT_MAX_CONCURRENCY_CREATE_NUM = 100
_DEFAULT_CONCURRENCY = 1
_DEFAULT_RECYCLE_TIME = 2
_DEFAULT_HTTP_IOC_THREADS_NUM = 400
_DEFAULT_RPC_TIMOUT = 30 * 60
_MAX_INT = 0x7FFFFFFF
_MIN_INT = 0
NPU_RESOURCE_NAME = "NPU"
CPU_RESOURCE_NAME = "CPU"
MEMORY_RESOURCE_NAME = "Memory"
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class UserTLSConfig:
"""
The SSL/TLS configuration used by users when communicating with external clusters.
"""
#: Path to the root certificate file.
root_cert_path: str
#: Path to the module certificate file.
module_cert_path: str
#: Path to the module key file.
module_key_path: str
#: Server name, defaults is ``None``.
server_name: str = None
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class DeploymentConfig:
"""
AutoDeploymentConfig
Attributes:
cpu(str): cpu acquired, the unit is millicpu
mem(str): mem acquiored (MB)
datamem(str): data system mem acquired (MB)
spill_path(str): spill path, when out of memory will flush data to disk
spill_size(str): spill size limit (MB)
"""
cpu: int = 0
mem: int = 0
datamem: int = 0
spill_path: str = ""
spill_limit: int = 0
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class Config:
"""
YR API config.
"""
#: Function id which you deploy, get default by env `YRFUNCID`.
#: etc. ``sn:cn:yrk:default:function:0-test-test:$latest``.
function_id: str = ""
#: Cpp function id which you deploy, get default by env `YR_CPP_FUNCID`.
cpp_function_id: str = ""
#: Use default function for cpp.
cpp_auto_function_name: str = ""
#: Function name which need in runtime.
function_name: str = ""
#: System cluster address, get default by env `YR_SERVER_ADDRESS`.
server_address: str = ""
#: DataSystem address, get default by env `YR_DS_ADDRESS`.
ds_address: str = ""
#: Only ``False`` when initialize in runtime, default is ``True``.
is_driver: bool = True
#: YR api log level have ``ERROR/WARNING/INFO/DEBUG``, default is ``WARNING``.
log_level: Union[str, int] = ""
#: Http client read timeout(sec), default is ``900``.
invoke_timeout: int = 900
#: Run code in local, default is ``False``.
local_mode: bool = False
#: The code directory path that must be configured at runtime initialization.
code_dir: str = ""
#: Http client connection nums.
#: default is ``100``, limit: [1,∞).
connection_nums: int = _DEFAULT_CONNECTION_NUMS
#: Instance recycle period(sec).
#: default is ``2 second``, limit: (0,300].
recycle_time: int = _DEFAULT_RECYCLE_TIME
#: If ``True`` will use DataSystem in cluster client, default: ``False``.
in_cluster: bool = None
#: Auto generated by init.
job_id: str = ""
#: For out cluster https ssl.
tls_config: UserTLSConfig = None
#: Auto start distribute-executor when `yr.init`, and auto stop distribute-executor when `yr.finalize`.
#: default is ``False``.
auto: bool = False
#: When `auto=True` needed, use to define deployment detail.
deployment_config: "DeploymentConfig" = None
#: Runtime server, keep default in driver.
rt_server_address: str = ""
#: Log directory, specifies the path where log files will be stored.
#: Default is the current working directory ("./").
log_dir: str = "./"
#: Path to environment variable file (.env format) to load at startup.
#: The file should contain environment variables in KEY=VALUE format, one per line, e.g.:
#: "KEY1=VALUE1"
#: "KEY2=VALUE2"
#: Lines starting with # are treated as comments and ignored. Empty lines are ignored.
#: If specified, environment variables from this file will be loaded into os.environ.
env_file: str = ""
#: Max size for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``40``).
log_file_size_max: int = 0
#: Max number for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``20``).
log_file_num_max: int = 0
#: Interval for log flush, default is ``5``.
log_flush_interval: int = 5
#: Runtime id, keep default in driver.
runtime_id: str = "driver"
#: The maximum number of instances of stateless function.
max_task_instance_num: int = _DEFAULT_MAX_TASK_INSTANCE_NUM
#: Code loading path.
load_paths: list = field(default_factory=list)
#: The timeout used for RPC.
rpc_timeout: int = _DEFAULT_RPC_TIMOUT
#: Whether to enable client two-way authentication, default is ``False``.
enable_mtls: bool = None
enable_tls: bool = None
#: Client private key file path.
private_key_path: str = ""
#: Client certificate file path.
certificate_file_path: str = ""
#: Server certificate file path.
verify_file_path: str = ""
#: Client private key encryption password.
private_key_paaswd: str = ""
#: HTTP link worker thread.
http_ioc_threads_num: int = _DEFAULT_HTTP_IOC_THREADS_NUM
#: Server name, used to identify and connect to a specific server instance.
server_name: str = ""
#: Namespace, used to organize and isolate configurations or resources.
ns: str = ""
tenant_id: str = ""
#: Whether to enable metric collection. ``False`` indicates disabled, and ``True`` indicates enabled.
#: The default value is ``True``. This takes effect only when called in the cluster.
enable_metrics: bool = True
#: Used to set custom environment variables for the runtime. Currently, only `LD_LIBRARY_PATH` is supported.
custom_envs: Dict[str, str] = field(default_factory=dict)
#: Function master address list.
master_addr_list: list = field(default_factory=list)
#: Specify the user code or the local path location that it depends on, absolute path, and ensure that
#: it exists on all nodes in the cluster. The default value is empty.
working_dir: str = ""
#: Whether to enable data system TLS authentication.
#: If ``True``, enable data system tls authentication, else not.
enable_ds_encrypt: bool = False
#: The path of worker public key for data system tls authentication,
#: if enable_ds_encrypt is true and the ds_public_key_path is empty, an exception will be thrown.
ds_public_key_path: str = ""
#: The path of client public key for data system tls authentication.
#: if enable_ds_encrypt is true and the runtime_public_key_path is empty, an exception will be thrown.
runtime_public_key_path: str = ""
#: The path of client private key for data system tls authentication.
#: if enable_ds_encrypt is true and the runtime_private_key_path is empty, an exception will be thrown.
runtime_private_key_path: str = ""
num_cpus: Optional[int] = None
runtime_env: Optional[Dict[str, Any]] = None
#: If ``True``, the output from all of the job processes on all nodes will be directed to the driver,
#: default is ``False``.
log_to_driver: bool = False
#: If ``True``, deduplicates logs that appear redundantly across multiple processes, default True.
#: The first instance of each log message is always immediately printed. However, subsequent log
#: messages of the same pattern are buffered for up to five seconds and printed in batch.
dedup_logs: bool = True
auth_token: str = ""
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class ClientInfo:
"""
Use to store yr client info.
"""
#: Automatically generated when `yr.init` is called, a unique identifier for a task.
job_id: str
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class Device:
"""
Use to init xpu task
"""
name: str = ""
batch_size: int = 1
[docs]
class SchedulingAffinityType(IntEnum):
"""
Bundle affinity type.
**Please use ONLY REQUIRED_AFFINITY_IN_EACH_BUNDLE.**
**All other attributes are inherited from IntEnum and should not be used directly.**
"""
#: Currently, only REQUIRED_AFFINITY_IN_EACH_BUNDLE is supported,
#: indicating strong affinity within each bundle.
REQUIRED_AFFINITY_IN_EACH_BUNDLE = 0
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class ResourceGroupOptions:
"""
Resource group options.
"""
#: The name of the ResourceGroup that needs to be scheduled. By default, it is empty.
#: indicating that it is not scheduled to any ResourceGroup.
#: If it is not empty, it is scheduled to the specified ResourceGroup.
resource_group_name: str = ""
#: The index of the bundle to be scheduled takes effect only if resource_group_name is not empty.
#: The value range is [-1, the number of bundles in the ResourceGroup).
#: default value is ``-1``, indicating that no specific bundle is specified;
#: if it is a value other than ``-1`` within the value range,
#: it indicates that the bundle is scheduled to the corresponding index of the ResourceGroup;
#: if it is any other value, an error will be generated.
bundle_index: int = -1
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class DebugConfig:
"""
debug instance configurations.
"""
enable: bool = False
[docs]
@dataclass
class FunctionGroupOptions:
"""
Function group options.
"""
#: Required CPU size in millicores (m), limited to the range [300, 16000].
cpu: Optional[int] = None
#: Required memory size in MB, limited to the range [128, 65536].
memory: Optional[int] = None
#: Custom resources, currently supports "NPU/XX/YY", where XX is the card model such as Ascend910B4,
#: and YY can be ``count``, ``latency``, or ``stream``.
resources: Dict[str, float] = field(default_factory=dict)
#: The affinity type of instances within the bundle, Default is ``None``.
scheduling_affinity_type: Optional[SchedulingAffinityType] = None
#: The number of function instances within each bundle.
#: Timeout period, measured in seconds. Restriction: ``-1, [0, 0x7FFFFFFF]``,
#: The default value is ``-1``, indicating blocking, etc.
scheduling_affinity_each_bundle_size: Optional[int] = None
#: Timeout in seconds, valid values are ``-1`` or within ``[0, 0x7FFFFFFF]``. Default is ``-1``,
#: meaning blocking wait.
timeout: Optional[int] = None
#: Instance concurrency, limited to the range [1, 1000].
concurrency: Optional[int] = None
#: Number of recovery retry attempts, used when instance recovery fails.Default is ``0``.
recover_retry_times: int = 0
def function_group_enabled(opts: FunctionGroupOptions, group_size: int) -> bool:
"""
if function group enabled.
"""
if opts.scheduling_affinity_each_bundle_size is None:
return False
if opts.scheduling_affinity_each_bundle_size > 0 and (
opts.scheduling_affinity_each_bundle_size <= group_size):
return True
if group_size == 0 and opts.scheduling_affinity_each_bundle_size == 0:
return False
raise RuntimeError("enable function group failed, group_size: {}, bundle_size: {}".format(
group_size,
opts.scheduling_affinity_each_bundle_size))
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class DeviceInfo:
#: 处理器 HDC 通道号
device_id: int = 0
#: 处理器真实网卡 IP
device_ip: str = ""
#: rank 的标识,rank id 从 0 开始
rank_id: int = 0
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class ServerInfo:
#: 挂载到本函数实例的设备信息
devices: List[DeviceInfo] = field(default_factory=list)
#: 设备所在的节点 id
server_id: str = ""
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class FunctionGroupContext:
"""
A context class for managing function group information.
"""
#: The ID of this function instance within the function group.
#: Range: [0, world_size - 1]
#: Default value: ``0``.
rank_id: int = 0
#: Total number of function instances in the group.
#: Default value: ``0``.
world_size: int = 0
#: Server info list for inter-instance communication.
#: Default: empty list.
server_list: List['ServerInfo'] = field(default_factory=list)
#: Name of the device used by this function instance, e.g., NPU/Ascend910B.
#: Default: empty string.
device_name: str = ""
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class GroupOptions:
"""
Configuration options for grouped instance scheduling.
The `GroupOptions` structure defines parameters for the lifecycle management of grouped instances, including timeout
settings for rescheduling when kernel resources are insufficient.
"""
#: Timeout for rescheduling when kernel resources are insufficient, in seconds.
#: If set to `-1`, the kernel will retry scheduling indefinitely.
#: If set to a value less than `0`, an exception will be thrown.
#: Default value: ``-1``.
timeout: int = -1
#: Whether to enable the fate-sharing configuration for grouped instances.
#: `True` (default): Instances in the group will be created and destroyed together.
#: `False`: Instances can have independent lifecycles.
#: Default value: ``True``.
same_lifecycle: bool = True
#: The strategy to create the group
#: None: No strategy.
#: SPREAD: Distribute multiple instances across different nodes as much as possible.
#: STRICT_PACK: All instances must be placed on the same node, otherwise creation fails.
#: PACK: Pack multiple instances into the same node as much as possible.
#: STRICT_SPREAD: All instances must be placed on different nodes, otherwise creation fails.
#: Default: ``None``.
strategy: str = ""
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class PortForwarding:
"""Defines a port to be forwarded into the sandbox.
Attributes:
port: The port number inside the sandbox. Range: [1, 65535].
protocol: The protocol type. Supported: "TCP", "UDP". Default: "TCP".
"""
port: int = 0
protocol: str = "TCP"
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class InvokeOptions:
"""Use to set the invoke options.
Examples:
>>> import yr
>>> import time
>>> yr.init()
>>> opt = yr.InvokeOptions()
>>> opt.pod_labels["k1"] = "v1"
>>> @yr.invoke(invoke_options=opt)
... def func():
... time.sleep(100)
>>> ret = func.invoke()
>>> yr.get(ret)
>>> yr.finalize()
"""
#: The size of the CPU required. Value Range is [300, 16000] and unit is m (milli-core).
cpu: int = 500
#: The size of memory required. Unit: MB. Range: [128, 1073741824].
memory: int = 500
#: Instance concurrency. Value Range is [1, 1000]. Priority is higher than
#: the "Concurrency" configured in custom_extensions. It is recommended to use this parameter for configuration.
#: If not set (None), the default value will be determined by the runtime:
#: - For async task/actor: 1000
#: - For non-async task/actor: 1
concurrency: Optional[int] = None
#: Custom resources currently support "GPU/XX/YY" and "NPU/XX/YY", where XX is the card model such as Ascend910B4,
#: and YY can be count, latency, or stream.
custom_resources: Dict[str, float] = field(default_factory=dict)
custom_extensions: Dict[str, str] = field(default_factory=dict)
"""
Specify user-defined configurations, such as function concurrency.
It can also be used as a user-defined tag for metrics to collect user information.
.. list-table:: Common `custom_extensions` configuration
* - "Concurrency"
- Concurrency. Range: [1,1000].
* - "lifecycle"
- detached, supports detached mode.
* - "DELEGATE_DIRECTORY_INFO"
- Custom directories support the ability to create and delete subdirectories. When an instance is created,
if the user-defined directory exists and has read and write permissions, a subdirectory is created under
it as the working directory; otherwise, a subdirectory is created under the `/tmp` directory as the working
directory. When the instance is destroyed, the working directory is destroyed. The user function can
obtain the working directory through the `INSTANCE_WORK_DIR` environment variable.
* - "DELEGATE_DIRECTORY_QUOTA"
- Subdirectory quota size, value range is greater than ``0 M`` and less than ``1 TB``. If this configuration
is not set, the default is ``512 M``. If the configuration is ``-1``, monitoring is not performed. Unit: MB.
* - "GRACEFUL_SHUTDOWN_TIME"
- Customize the graceful exit time, in seconds. Limit: ``>=0``, ``0`` means immediate exit, and does not
guarantee that the user's graceful exit function can be completed; if configured <0, the system
configuration at deployment time is used as the timeout time.
* - "RECOVER_RETRY_TIMEOUT"
- Customize the recover timeout time. The instance recover timeout time is in milliseconds. Limit: ``>0``,
Default to ``10 * 60 * 1000``
When used as a user-defined tag for metrics:
>>> import yr
>>> yr.init()
>>> opt = yr.InvokeOptions()
>>> opt.custom_extensions["YR_Metrics"] = "{\'endpoint\':\'127.0.0.1\', \'project_id\':\'my_project_id\'}"
In Prometheus, select `metrics name` as `yr_app_instance_billing_invoke_latency`, and you can find the custom tag
information in the collected invoke information:
.. code-block:: text
yr_app_instance_billing_invoke_latency{
...
endpoint="127.0.0.1",
...}
"""
pod_labels: Dict[str, str] = field(default_factory=dict)
"""
Pod labels only used in Kubernetes environment. When creating a function instance, pod_labels can accept key-value
pairs from the user and pass them to the function system.
* After the ActorPattern function instance specialization is completed (Running),
the Scaler applies the incoming labels to the POD.
* When an ActorPattern function instance fails or is deleted,
the Scaler sets the corresponding label of the POD to empty (Remove it);
* Constraints:
* The number of labels that can be stored in `pod_labels` cannot exceed 5.
* Constraints on the key and value in `pod_labels`:
* key:Supports uppercase and lowercase letters, numbers, and hyphens, and allows a length of 1-63.
Does not start or end with a hyphen. Empty strings are not allowed.
* value:Supports uppercase and lowercase letters, numbers, and hyphens, with a length of 1-63.
Does not start or end with a hyphen. Allows empty strings.
* Raises:
When the `pod_labels` passed by the user does not meet the constraints,
the corresponding exception and error message will be thrown.
"""
#: Labels of instance
labels: List[str] = field(default_factory=list)
#: Affinity of instance
affinity: Dict[str, str] = field(default_factory=dict)
#: Specify the name of the model used by the heterogeneous function.
device: Device = field(default_factory=Device)
#: Specify the time when the invoke call of the desired heterogeneous function is completed.
max_invoke_latency: int = 5000
#: Specify the minimum number of instances for a stateless function.
min_instances: int = 0
#: Specify the maximum number of instances for a stateless function.
max_instances: int = 0
#: The number of instance recovery times (when an instance abnormally exits, the instance is automatically restored
#: to the latest state). If the value is ``0``, the instance is not automatically restored when it abnormally exits.
recover_retry_times: int = 0
#: Whether to enable order-preserving. Only effective for stateful functions.
need_order: bool = False
#: Used to specify the name of the instance. When `namespace` is specified, the instance name is `namespace-name`,
#: otherwise it is `name`.
name: str = ""
#: Used to specify the namespace of the instance.
namespace: str = ""
#: Set affinity condition list.
schedule_affinities: List[Affinity] = field(default_factory=dict)
#: Whether to enable data affinity scheduling.
is_data_affinity: bool = False
#: Set whether to enable weak affinity priority scheduling. If enabled, when multiple weak affinity conditions are
#: passed, match and score them in order. Scheduling is successful as soon as one condition is met.
preferred_priority = True
#: Set whether to enable strong affinity priority scheduling. If enabled, when multiple strong affinity conditions
#: are passed, they are matched and scored in order. If none of the strong affinity conditions are met,
#: the scheduling fails.
required_priority = False
#: Whether to enable anti-affinity for non-selectable resources. If enabled, scheduling fails when none of the
#: weak affinity conditions are met. When preferred_anti_other_labels is set to True, if no PODs that meet the
#: conditions are found for weak affinity/anti-affinity, scheduling fails and no other resources' PODs are
#: selected for scheduling.
preferred_anti_other_labels = False
resource_group_options: ResourceGroupOptions = field(
default_factory=ResourceGroupOptions)
"""
Specify the ResourceGroup option, which includes resource_group_name and bundle_index.
When creating a function instance: If `resource_group_name` is set, it will be passed to the kernel to schedule to
the specified ResourceGroup. If both `resource_group_name` and `bundle_index` are set, they are passed to the
kernel to schedule the bundle to the specified ResourceGroup and index. The default value of `resource_group_name`
is empty, and the default value of `bundle_index` is ``-1``.
* Constraints:
* When `resource_group_name` is empty, the instance will not be scheduled to the specified ResourceGroup,
and the bundle_index field is not effective.
* When `resource_group_name` is not empty:
* When `bundle_index` is ``-1``, the instance is scheduled to the specified ResourceGroup.
* When ``0<= bundle_index < number of bundles in ResourceGroup``,schedule the instance to a specified bundle
in a specified ResourceGroup.
* When ``bundle_index < -1`` or ``bundle_index >= number of bundles in ResourceGroup``,raise error.
* Raises:
* There is no ResourceGroup with the `resource_group_name` provided by the user.
* The user passes a non-empty `resource_group_name` and ``bundle_index < -1``.
* When the user passes a non-empty `resource_group_name` and `bundle_index` >= the number of
bundles in the ResourceGroup
* Scheduling failed: For example, the specified ResourceGroup or the specified bundle of the specified
ResourceGroup does not have enough resources to handle instance scheduling.
"""
#: Function group options.
function_group_options: FunctionGroupOptions = field(
default_factory=FunctionGroupOptions)
#: Set environment variables when the instance starts.
env_vars: Dict[str, str] = field(default_factory=dict)
#: Number of retries for stateless functions.
retry_times: int = 0
#: Set the traceId for function calls for link tracing.
trace_id: str = ""
#: In the scenario where a function is invoked by a specified alias in cross-function invocation, when the
#: alias is a rule alias, this parameter is used to set the kv parameter that the rule alias depends on.
alias_params: Dict[str, str] = field(default_factory=dict)
runtime_env: Dict = field(default_factory=dict)
debug: DebugConfig = field(default_factory=DebugConfig)
"""
Configure the stateful/stateless function runtime environment with `conda`, `pip`, `working_dir`, and `env_vars`.
* `conda` provides different Python runtime environments for stateful function.
* Specify an existing conda environment (the environment exists on all nodes)
``runtime_env = {"conda":"pytorch_p39"}``
* Create and use conda environments through configuration.
``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9",
"msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}``
* Create and use a conda environment through a YAML file (the YAML file meets the conda requirements).
``runtime_env = {"conda":"/home/env.yaml"}``
* `pip` installs dependencies for Python runtime environment.
* `working_dir` configure the code path of the job.
* `env_vars` configure process-level environment variables.
``runtime_env = {"env_vars":{"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}}``
* `shared_dir` supports configuring a shared directory for some instance, with yr managing the lifecycle of this
shared directory. `shared_dir` supports two fields: name and TTL. The name field only allows numbers, letters,
"-", and "_". The TTL supports integers greater than 0 and less than INTMAX.
``runtime_env = {"shared_dir":{"name": "user_define", "TTL": 5}}``
* Constraints of `runtime_env`:
* The keys supported by runtime_env are `conda`, `env_vars`, `pip`, `working_dir`.
Other keys will not take effect and will not cause errors.
* Run the yr function with conda. The environment needs to have yr and its third-party dependencies. It is
recommended that users first create a conda environment and then specify it with `runtime_env`, for example:
``runtime_env = {"conda":"pytorch_p39"}``
* `runtime_env` supports creating and switching conda environments using configurations. The configuration needs
to install third-party dependencies for yr, for example:
``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9",
"msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}``
* The environment created using conda in `runtime_env` needs to be cleaned up by the user.
* In `runtime_env`, conda can use `pip` to install dependencies, which are managed directly by conda.
``runtime_env = {"conda":{'name': 'my_project_env', 'channels': ['defaults', 'conda-forge'],
'dependencies': ['python=3.9', {'pip': ['requests==2.25.1']}]}}``
* Currently, Python 3.9 and Python 3.11 SDKs are available. The Python version of conda needs to
be consistent with the SDK version.
* If both `InvokeOptions.env_vars` and `InvokeOptions.runtime_env["env_vars"]` are configured with the same key,
the configuration in `InvokeOptions.env_vars` will be used.
* If `InvokeOptions.runtime_env["working_dir"]` is configured, use this configuration,
otherwise, use `YR.Config.working_dir` and finally use the configuration in `InvokeOptions.env_vars`.
* If you use conda, you need to specify the environment variable `YR_CONDA_HOME` to point to installation path.
* `shared_dir` has the following constraints:
1. It is not recommended to configure different TTL for the same shared directory.
2. The minimum cleanup interval for shared directories is 5 seconds.
3. When multiple yr Agents are deployed on the same node, each Agent must be configured with
different root directory to prevent conflicts in shared directory management.
"""
#: Whether an instance can be preempted is effective only in the priority scenario (when the maxPriority
#: configuration item deployed by YuanRong is greater than ``0``). The default value is ``False``.
preempted_allowed: bool = False
#: The priority of an instance is determined by its value. The higher the value, the higher the priority.
#: A high-priority instance can preempt a low-priority instance that is configured as `preempted_allowed = True`.
#: It only takes effect in priority scenarios (scenarios where the maxPriority configuration item of YuanRong
#: deployment is greater than ``0``). The minimum value of `instance_priority` is ``0`` and the maximum value
#: is the maxPriority configuration of YuanRong deployment. The default is ``0``.
instance_priority: int = 0
#: The scheduling timeout time of an instance. Unit: milliseconds. Value range:
#: [-1, the maximum value of the int type]. Default value: ``30000``.
schedule_timeout_ms: int = 30000
group_name: str = ""
is_delete_remote_tensor: bool = False
get_if_exists: bool = False
#: Whether to skip serializing the instance class code.
#: Set to ``True`` for pre-deployed classes (e.g., SDK built-in classes)
#: to avoid cross-version serialization issues.
#: Default: ``False``.
skip_serialize: bool = False
port_forwardings: List[PortForwarding] = field(default_factory=list)
"""
Configure port forwarding rules for the sandbox. Each entry specifies a port to be forwarded
inside the sandbox environment.
When configured, the port forwarding rules are serialized to JSON and passed to the runtime
via ``createOptions["network"]``. Supports configuring multiple ports simultaneously.
* Constraints:
* ``port``: Must be an integer in the range [1, 65535].
* ``protocol``: Must be ``"TCP"`` or ``"UDP"``. Default is ``"TCP"``.
* Raises:
* ``TypeError``: If ``port`` is not an ``int`` or ``protocol`` is not a ``str``.
* ``ValueError``: If ``port`` is out of range or ``protocol`` is unsupported.
Example::
>>> import yr
>>> yr.init()
>>> opt = yr.InvokeOptions()
>>> opt.port_forwardings = [
... yr.PortForwarding(port=8080),
... yr.PortForwarding(port=9090, protocol="UDP"),
... ]
>>> @yr.invoke(invoke_options=opt)
... def serve():
... pass
The above configuration produces the following JSON in ``createOptions["network"]``::
{"portForwardings": [{"port": 8080, "protocol": "TCP"}, {"port": 9090, "protocol": "UDP"}]}
"""
[docs]
def check_options_valid(self):
"""
Check whether the options are valid.
Raises:
TypeError: If options are invalid, throw this exception.
"""
attributes_to_check = [
("env_vars", Dict[str, str]),
("name", str),
("namespace", str),
("preferred_anti_other_labels", bool),
("preferred_priority", bool),
("trace_id", str),
("custom_resources", Dict[str, float]),
("custom_extensions", Dict[str, str]),
]
for actual, expected_type in attributes_to_check:
actual_value = getattr(self, actual)
if actual_value is None:
continue
origin = get_origin(expected_type)
check_type = origin if origin is not None else expected_type
if not isinstance(actual_value, check_type):
raise TypeError(
f"invalid type for '{actual}', actual: {type(actual_value)}, expect: {expected_type}"
)
[docs]
def check_options_range(self):
"""
Check whether the options are in the valid range.
"""
attrs = [
"retry_times",
"recover_retry_times",
"max_instances",
"max_invoke_latency",
"min_instances",
]
for attr in attrs:
value = getattr(self, attr)
if attr in ["max_instances", "max_invoke_latency"] and value == -1:
continue
if not _MIN_INT <= value <= _MAX_INT:
raise ValueError(
f"{attr} 超过范围, 请输入 {_MIN_INT} 到 {_MAX_INT} 范围的值"
)
def dataclass_from_dict(klass, d):
"""
parse dataclass from dict
"""
try:
field_types = {f.name: f.type for f in dataclasses.fields(klass)}
return klass(**{f: dataclass_from_dict(field_types[f], d[f]) for f in d})
except Exception:
return d # Not a dataclass field
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class MetaFunctionID:
"""
meta function id
"""
cpp: str = ""
python: str = ""
java: str = ""
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class MetaConfig:
"""
TaskMetadata is used to convey control information
{
"jobID": "xxx",
"codePath": "xx",
"recycleTime": 2,
"maxTaskInstanceNum": -1,
"maxConcurrencyCreateNum": 100,
"enableMetrics": false,
"threadPoolSize": 10,
"functionID": {
"cpp": "xxx",
"python": "xxx",
"java": "xxx"
}
}
"""
jobID: str
codePath: str
recycleTime: int = _DEFAULT_RECYCLE_TIME
maxTaskInstanceNum: int = _DEFAULT_MAX_TASK_INSTANCE_NUM
maxConcurrencyCreateNum: int = _DEFAULT_MAX_CONCURRENCY_CREATE_NUM
enableMetrics: bool = _DEFAULT_ENABLE_METRICS
threadPoolSize: int = 10
functionID: MetaFunctionID = field(default_factory=MetaFunctionID)
functionMasters: list = field(default_factory=list)
@classmethod
def parse(cls, data):
"""
parse TaskMetadata from json or dict
Returns: TaskMetadata object
"""
self = dataclass_from_dict(MetaConfig, json.loads(data))
return self
def to_json(self):
"""
convert to json
Returns: json
"""
return json.dumps(asdict(self))