Source code for yr.stream
#!/usr/bin/env python3
# coding=UTF-8
# Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""stream"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Union
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class ProducerConfig:
"""
The configuration class created by the producer.
"""
#: After Send, Flush will be triggered after a delay up to the specified duration.
#: < 0: do not auto flush,
#: = 0: flush immediately,
#: > 0: delay duration in milliseconds before flushing.
#: Default value is 5.
delay_flush_time: int = 5
#: Represents the buffer page size for the producer, in bytes (B). A flush is triggered when a page is full.
#: The value must be greater than 0 and a multiple of 4 KB.
#: Default is ``1`` MB (``1024 * 1024``).
page_size: int = 1024 * 1024
#: Specifies the maximum shared memory size that a stream can use on a worker, in bytes (B).
#: The default is ``1`` GB ( ``1024 * 1024 * 1024``),
#: and the valid range is [64 KB, size of the worker's shared memory].
max_stream_size: int = 1024 * 1024 * 1024
#: Specifies whether the stream enables the auto-cleanup feature. Default is ``false``.
auto_clean_up: bool = False
#: Specifies whether content encryption is enabled for the stream. Default is ``false`` (disabled).
encrypt_stream: bool = False
#: The data sent by the producer will be retained until received by the Nth consumer.
#: The default value is ``0``, meaning that if there are no consumers when the producer sends data,
#: the data will not be retained and may be missed when consumers are created later.
retain_for_num_consumers: int = 0
#: Represents the reserved memory size in bytes (B).
#: When creating a producer, an attempt will be made to reserve ``reserve_size`` bytes of memory.
#: If the reservation fails, creating the producer will raise an exception.
#: ``reserve_size`` must be an integer multiple of ``page_size`` and within the range ``[0, max_stream_size]``.
#: If ``reserve_size`` is ``0``, it will be set to `page_size`.
#: Default value is ``0``.
reserve_size: int = 0
#: Extended configuration stored as a dictionary, allowing users to customize configuration items.
#: Default value is an empty dictionary.
extend_config: Dict[str, str] = field(default_factory=dict)
class SubscriptionType(Enum):
"""
SubscriptionType
Attributes:
STREAM: default mode.
ROUND_ROBIN: not support
KEY_PARTITIONS: not support.
"""
STREAM = 0
ROUND_ROBIN = 1
KEY_PARTITIONS = 2
[docs]
@dataclass(init=True, repr=False, eq=False, order=False, unsafe_hash=False)
class SubscriptionConfig:
"""
The configuration class subscribed by consumers.
"""
#: Subscription name, used to identify subscriptions in the producer configuration.
#: The value of this attribute is a string.
subscription_name: str
#: Subscription type, including ``STREAM``, ``ROUND_ROBIN``, and ``KEY_PARTITIONS``.
#: ``STREAM`` means single consumer consumption within a subscription group,
#: ``ROUND_ROBIN`` means multiple consumers in a subscription group share load in a round-robin manner,
#: ``KEY_PARTITIONS`` means multiple consumers in a subscription group share load by key partitioning.
#: Currently, only ``STREAM`` type is supported; other types are not supported.
#: The default subscription type is ``STREAM``.
subscriptionType: SubscriptionType = SubscriptionType.STREAM
#: Extended configuration.
#: stored in dictionary form, allows users to customize configuration items.
#: The default value is an empty dictionary. the dictionary generated through ``field(default_factory=dict)``.
extend_config: Dict[str, str] = field(default_factory=dict)
[docs]
class Element:
"""
Element class containing an element ID and data buffer.
Args:
value (Union[bytes, memoryview]): data to send.
ele_id (int, optional): element id. Default to ``0``.
"""
[docs]
def __init__(self, value: Union[bytes, memoryview], ele_id: int = 0) -> None:
self.data = value
self.id = ele_id