Source code for yr.metrics

# coding=utf-8
# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re

from dataclasses import field
from typing import Dict

from yr.base_runtime import AlarmInfo
from yr.config_manager import ConfigManager
from yr.runtime_holder import global_runtime
from yr.common.utils import GaugeData, UInt64CounterData, DoubleCounterData

_METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$')
_METRIC_LABEL_NAME_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
_RESERVED_METRIC_LABEL_NAME_RE = re.compile(r'^__.*$')


class Metrics:
    @classmethod
    def _check_name(cls, name: str):
        if len(name) == 0:
            raise ValueError("invalid metric name, should not be empty")
        if not _METRIC_NAME_RE.match(name):
            raise ValueError(f'invalid metric name: {name}')

    @classmethod
    def _check_label(cls, labels: dict):
        if not isinstance(labels, dict):
            raise ValueError(f'invalid metrics label, {type(labels)} not dict type')
        for key, value in labels.items():
            if not _METRIC_LABEL_NAME_RE.match(key):
                raise ValueError(f'invalid metrics label name: {key}')
            if _RESERVED_METRIC_LABEL_NAME_RE.match(key):
                raise ValueError(f'reserved metrics label name: {key}')
            if not isinstance(value, str):
                raise ValueError(f'invalid metrics label value: {value}, {type(value)} not str')

    @classmethod
    def _check_non_negative_float(cls, value, field_name: str):
        fvalue = float(value)
        if fvalue < 0:
            raise ValueError(f"invalid {field_name}, should be >= 0")
        return fvalue

    @classmethod
    def _check_positive_float(cls, value, field_name: str):
        fvalue = float(value)
        if fvalue <= 0:
            raise ValueError(f"invalid {field_name}, should be > 0")
        return fvalue

    @classmethod
    def _check_positive_int(cls, value, field_name: str):
        ivalue = int(value)
        if ivalue <= 0:
            raise ValueError(f"invalid {field_name}, should be > 0")
        return ivalue


class CustomGauge(Metrics):
    """Gauge wrapper for custom metrics without business labels."""

    def __init__(self, name: str, description: str, unit: str = ''):
        self.__name = name
        self.__description = description
        self.__unit = unit
        self._check_name(name)

    def set(self, value: float) -> None:
        fvalue = self._check_non_negative_float(value, "gauge value")
        if ConfigManager().is_driver:
            raise ValueError("custom gauge metrics set not support in driver")
        data = GaugeData(name=self.__name, description=self.__description, unit=self.__unit, value=fvalue)
        global_runtime.get_runtime().set_gauge(data)

    def inc(self, delta: float) -> None:
        fvalue = self._check_positive_float(delta, "gauge delta")
        if ConfigManager().is_driver:
            raise ValueError("custom gauge metrics increase not support in driver")
        data = GaugeData(name=self.__name, description=self.__description, unit=self.__unit, value=fvalue)
        global_runtime.get_runtime().increase_gauge(data)

    def dec(self, delta: float) -> None:
        fvalue = self._check_positive_float(delta, "gauge delta")
        if ConfigManager().is_driver:
            raise ValueError("custom gauge metrics decrease not support in driver")
        data = GaugeData(name=self.__name, description=self.__description, unit=self.__unit, value=fvalue)
        global_runtime.get_runtime().decrease_gauge(data)

    def get_value(self) -> float:
        if ConfigManager().is_driver:
            raise ValueError("custom gauge metrics get value not support in driver")
        data = GaugeData(name=self.__name, description=self.__description, unit=self.__unit)
        return global_runtime.get_runtime().get_value_gauge(data)


class CustomCounter(Metrics):
    """Counter wrapper for custom metrics without business labels."""

    def __init__(self, name: str, description: str, unit: str = ''):
        self.__name = name
        self.__description = description
        self.__unit = unit
        self._check_name(name)

    def inc(self, delta: int = 1) -> None:
        ivalue = self._check_positive_int(delta, "counter delta")
        if ConfigManager().is_driver:
            raise ValueError("custom counter metrics increase not support in driver")
        data = UInt64CounterData(name=self.__name, description=self.__description, unit=self.__unit, value=ivalue)
        global_runtime.get_runtime().increase_uint64_counter(data)

    def get_value(self) -> int:
        if ConfigManager().is_driver:
            raise ValueError("custom counter metrics get value not support in driver")
        data = UInt64CounterData(name=self.__name, description=self.__description, unit=self.__unit)
        return global_runtime.get_runtime().get_value_uint64_counter(data)


[docs] class Gauge(Metrics): """ Used for reporting metrics data. Note: 1. Prometheus-like data structure. 2. Billing information is reported to the backend openYuanRong collector. 3. Not usable within the driver. Args: name (str): name. description (str): description. unit (str, optional): Unit. labels (Dict[str, str], optional): Label. e.g. ``labels = {"request_id": "abc"}``. """
[docs] def __init__(self, name: str, description: str, unit: str = '', labels: Dict[str, str] = {}, ): self.__name = name self.__description = description self.__unit = unit self.__labels = labels self._check_name(name) self._check_label(labels)
[docs] def add_labels(self, labels: Dict[str, str]) -> None: """ add label. Args: labels (Dict[str,str]): Both labels and key-value pairs must be strings, and keys cannot begin with special characters like `*`. Raises: ValueError: Labels are missing, or the data does not conform to Prometheus standard requirements. Example: >>> import yr >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> @yr.instance ... class GaugeActor: ... def __init__(self): ... self.labels = {"key1": "value1"} ... self.gauge = yr.Gauge( ... name="test", ... description="", ... unit="ms" ... ) ... self.gauge.add_labels(self.labels) ... self.gauge.set(50) ... print("Initial labels:", self.labels) ... ... def set_value(self, value): ... self.gauge.set(value) ... return { ... "value": value, ... "labels": self.labels ... } >>> >>> actor = GaugeActor.options(name="gauge_actor").invoke() >>> result = actor.set_value.invoke(75) >>> print("Driver got:", yr.get(result)) """ self._check_label(labels) if len(labels) == 0: raise ValueError("invalid metrics labels, should not be empty") old = self.__labels self.__labels = {**old, **labels}
[docs] def set(self, value: float) -> None: """ set value. Args: value (float): Target value. Raises: ValueError: Invoked in the driver or fails to meet the data type requirements (such as reporting data types other than float). Example: >>> import yr >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> >>> @yr.instance ... class GaugeActor: ... def __init__(self): ... self.labels = {"key1": "value1"} ... self.gauge = yr.Gauge( ... name="test", ... description="", ... unit="ms", ... labels=self.labels ... ) ... self.gauge.set(50) ... ... print("Initial labels:", self.labels) ... ... def set_value(self, value): ... self.gauge.set(value) ... return { ... "value": value, ... "labels": self.labels ... } >>> >>> actor = GaugeActor.options(name="gauge_actor").invoke() >>> >>> result = actor.set_value.invoke(75) >>> print(yr.get(result)) """ fvalue = float(value) if ConfigManager().is_driver: raise ValueError("gauge metrics report not support in driver") data = GaugeData(name=self.__name, description=self.__description, unit=self.__unit, labels=self.__labels, value=fvalue) global_runtime.get_runtime().report_gauge(data)
[docs] class UInt64Counter(Metrics): """ A class representing a 64-bit unsigned integer counter for metrics. Args: name (str): Name of the counter. description (str): Description of the counter. unit (str): Unit of the counter. labels (Dict[str, str]): Optional labels for the counter. """
[docs] def __init__(self, name: str, description: str, unit: str, labels: Dict[str, str] = field(default_factory=dict), ): self.__uint_counter_name = name self.__description = description self.__unit = unit self.__uint_counter_labels = labels self._check_name(name) self._check_label(labels)
[docs] def add_labels(self, labels: dict) -> None: """ add lable for metrics data. Args: labels (dict): A dictionary of labels where both keys and values must be strings. Raises: ValueError: If the label is empty or does not meet the data type requirements. Example: >>> import yr >>> >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> >>> @yr.instance >>> class Actor: ... def __init__(self): ... labels = {"key1": "value1", "key2": "value2"} ... self.data = yr.UInt64Counter( ... name="test", ... description="", ... unit="ms", ... labels=labels ... ) ... self.data.add_labels({"key3": "value3"}) ... print("Actor init done") ... ... def run(self): ... self.data.set(5) ... self.data.add_labels({"phase": "run"}) ... msg = ( ... f"Actor run: {self.data._UInt64Counter__uint_counter_labels}, ... f"value: {self.data.get_value()}" ... ) ... print(msg) ... return msg >>> >>> actor1 = Actor.options(name="actor").invoke() >>> print(yr.get(actor1.run.invoke())) """ self._check_label(labels) if len(labels) == 0: raise ValueError("invalid metrics labels, should not be empty") old = self.__uint_counter_labels self.__uint_counter_labels = {**old, **labels}
[docs] def set(self, value: int) -> None: """ Set uint64 counter to the given value. Args: value (int): The value to be set. Raises: ValueError: Invoked in the driver. Example: >>> import yr >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> >>> @yr.instance ... class MyActor: ... def __init__(self): ... self.data = yr.UInt64Counter( ... "userFuncTime", ... "user function cost time", ... "ms", ... {"runtime": "runtime1"} ... ) ... self.data.add_labels({"stage": "init"}) ... self.data.set(100) ... ... def add(self, n=1, labels=None): ... current = self.data.get_value() ... self.data.set(current + n) ... if labels: ... self.data.add_labels(labels) ... return self.get() ... ... def get(self): ... return { ... "labels": self.data._UInt64Counter__uint_counter_labels, ... "value": self.data.get_value() ... } >>> actor = MyActor.options(name="actor").invoke() >>> result = actor.add.invoke(5) >>> print(yr.get(result)) """ ivalue = int(value) if ConfigManager().is_driver: raise ValueError("uint64 counter metrics set not support in driver") data = UInt64CounterData(name=self.__uint_counter_name, description=self.__description, unit=self.__unit, labels=self.__uint_counter_labels, value=ivalue) global_runtime.get_runtime().set_uint64_counter(data)
[docs] def reset(self) -> None: """Reset uint64 counter.""" if ConfigManager().is_driver: raise ValueError("uint64 counter metrics reset not support in driver") data = UInt64CounterData(name=self.__uint_counter_name, description=self.__description, unit=self.__unit, labels=self.__uint_counter_labels) global_runtime.get_runtime().reset_uint64_counter(data)
[docs] def increase(self, value: int) -> None: """Increase uint64 counter to the given value.""" ivalue = int(value) if ConfigManager().is_driver: raise ValueError("uint64 counter metrics increase not support in driver") data = UInt64CounterData(name=self.__uint_counter_name, description=self.__description, unit=self.__unit, labels=self.__uint_counter_labels, value=ivalue) global_runtime.get_runtime().increase_uint64_counter(data)
[docs] def get_value(self) -> int: """Get value of uint64 counter.""" if ConfigManager().is_driver: raise ValueError("uint64 counter metrics get value not support in driver") data = UInt64CounterData(name=self.__uint_counter_name, description=self.__description, unit=self.__unit, labels=self.__uint_counter_labels) return global_runtime.get_runtime().get_value_uint64_counter(data)
[docs] class DoubleCounter(Metrics): """ Initialize the double counter. Args: name (str): The name of the counter. description (str): The description of the counter. unit (str): unit. labels (Dict[str, str], optional): Labels for the counter, defaults to an empty dictionary. """
[docs] def __init__(self, name: str, description: str, unit: str, labels: Dict[str, str] = field(default_factory=dict), ): self.__double_counter_name = name self.__description = description self.__unit = unit self.__double_counter_labels = labels self._check_name(name) self._check_label(labels)
[docs] def add_labels(self, labels: dict) -> None: """ add lable for metrics data. Args: labels (dict): The dictionary of labels to add. Raises: ValueError: If the labels are empty. Example: >>> import yr >>> >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> >>> @yr.instance >>> class Actor: >>> def __init__(self): >>> self.data = yr.DoubleCounter( >>> "userFuncTime", >>> "user function cost time", >>> "ms", >>> {"runtime": "runtime1"} >>> ) >>> self.data.add_labels({"stage": "init"}) >>> print("Actor init:", self.data._DoubleCounter__double_counter_labels) >>> >>> def run(self): >>> self.data.set(5) >>> self.data.add_labels({"phase": "run"}) >>> msg = ( ... f"Actor run: {self.data._UInt64Counter__uint_counter_labels}, " ... f"value: {self.data.get_value()}" ... ) >>> print(msg) >>> return msg >>> actor1 = Actor.options(name="actor").invoke() >>> result = actor1.run.invoke() >>> print("run result:", yr.get(result)) """ self._check_label(labels) if len(labels) == 0: raise ValueError("invalid metrics labels, should not be empty") old = self.__double_counter_labels self.__double_counter_labels = {**old, **labels}
[docs] def set(self, value: float) -> None: """ Set double counter to the given value. Args: value (float): The value to set. Raises: ValueError: Invoked in the driver. Example: >>> import yr >>> config = yr.Config(enable_metrics=True) >>> yr.init(config) >>> >>> @yr.instance >>> class Actor: >>> def __init__(self): >>> try: >>> self.data = yr.DoubleCounter( >>> "userFuncTime", >>> "user function cost time", >>> "ms", >>> {"runtime": "runtime1"} >>> ) >>> except Exception as err: >>> print(f"error: {err}") >>> >>> def run(self, *args, **kwargs): >>> try: >>> self.data.set(1) >>> except Exception as err: >>> print(f"error: {err}") """ fvalue = float(value) if ConfigManager().is_driver: raise ValueError("double counter metrics set not support in driver") data = DoubleCounterData(name=self.__double_counter_name, description=self.__description, unit=self.__unit, labels=self.__double_counter_labels, value=fvalue) global_runtime.get_runtime().set_double_counter(data)
[docs] def reset(self) -> None: """Reset double counter.""" if ConfigManager().is_driver: raise ValueError("double counter metrics reset not support in driver") data = DoubleCounterData(name=self.__double_counter_name, description=self.__description, unit=self.__unit, labels=self.__double_counter_labels) global_runtime.get_runtime().reset_double_counter(data)
[docs] def increase(self, value: float) -> None: """Increase double counter to the given value.""" fvalue = float(value) if ConfigManager().is_driver: raise ValueError("double counter metrics increase not support in driver") data = DoubleCounterData(name=self.__double_counter_name, description=self.__description, unit=self.__unit, labels=self.__double_counter_labels, value=fvalue) global_runtime.get_runtime().increase_double_counter(data)
[docs] def get_value(self) -> float: """Get value of double counter.""" if ConfigManager().is_driver: raise ValueError("double counter metrics get value not support in driver") data = DoubleCounterData(name=self.__double_counter_name, description=self.__description, unit=self.__unit, labels=self.__double_counter_labels) return global_runtime.get_runtime().get_value_double_counter(data)
[docs] class Alarm(Metrics): """ Used to set and manage alarm information. Args: name (str): The name of the alarm. description (str): Description of the alarm. """
[docs] def __init__(self, name: str, description: str, ): self.__name = name self.__description = description self._check_name(name)
[docs] def set(self, alarm_info: AlarmInfo) -> None: """ Set alarm to the given info. Args: alarm_info (AlarmInfo): An object containing detailed alarm information. Raises: ValueError: Invoked in the driver. ValueError: If alarm_name is None. Example: >>> import yr >>> import time >>> config = yr.Config(enable_metrics=True) >>> config.log_level="DEBUG":378 >>> yr.init(config) >>> @yr.instance >>> class Actor: >>> def __init__(self): >>> self.state = 0 >>> self.name = "aa" >>> def add(self, value): >>> self.state += value >>> if self.state > 10: >>> alarm_info = yr.AlarmInfo(alarm_name="aad") >>> alarm = yr.Alarm(self.name, description="") >>> alarm.set(alarm_info) >>> return self.state >>> >>> def get(self): >>> return self.state >>> >>> actor1 = Actor.options(name="actor1").invoke() >>> >>> print("actor1 add 5:", yr.get(actor1.add.invoke(5))) >>> print("actor1 add 7:", yr.get(actor1.add.invoke(7))) >>> print("actor1 state:", yr.get(actor1.get.invoke())) """ if ConfigManager().is_driver: raise ValueError("alarm metrics set not support in driver") if len(alarm_info.alarm_name) == 0: raise ValueError("invalid alarm name, should not be empty") global_runtime.get_runtime().set_alarm(self.__name, self.__description, alarm_info)