Stream#
-
std::shared_ptr<Producer> YR::CreateProducer(const std::string &streamName, ProducerConf producerConf = {})#
Creates a producer.
std::string streamName = "streamName"; // create stream producer YR::ProducerConf producerConf{}; std::shared_ptr<YR::Producer> producer = YR::CreateProducer(streamName, producerConf);
- Parameters:
streamName – The name of the stream.
producerConf – Configuration information for the producer.
- Throws:
Exception – 4006: not support local mode.
- Returns:
A pointer to the created producer.
The parameter structure is supplemented with the following explanation:
-
struct ProducerConf#
Public Members
-
int64_t delayFlushTime = 5#
After sending, the producer will delay for the specified duration before triggering a flush.
<0: Do not automatically flush; 0: Flush immediately; otherwise, the delay duration in milliseconds. Default: 5.
-
int64_t pageSize = 1024 * 1024ul#
Specifies the buffer page size for the producer, in bytes (B).
When a page is full, it will trigger a flush. Default: 1 MB. Must be greater than 0 and a multiple of 4 KB.
-
uint64_t maxStreamSize = 100 * 1024 * 1024ul#
Specifies the maximum shared memory size that the stream can use on a worker, in bytes (B).
Default: 100 MB. Range: [64 KB, size of worker shared memory].
-
bool autoCleanup = false#
Specifies whether to enable automatic cleanup for the stream.
Default: false (disabled). When the last producer/consumer exits, the stream will be automatically cleaned up.
-
bool encryptStream = false#
Specifies whether to enable content encryption for the stream.
Default: false (disabled).
-
uint64_t retainForNumConsumers = 0#
Specifies how many consumers should retain the producer’s data.
Default: 0. If set to 0, data will not be retained if there are no consumers. This parameter is only effective for the first consumer created, and the current valid range is [0, 1]. Multiple consumers are not supported. A consumer created after the producer may not receive data.
-
uint64_t reserveSize = 0#
Specifies the reserved memory size, in bytes (B).
When creating a producer, it will attempt to reserve reserveSize bytes of memory. If reservation fails, an exception will be thrown. reserveSize must be an integer multiple of pageSize and within the range [0, maxStreamSize]. If reserveSize is 0, it will be set to pageSize by default. Default: 0.
-
std::unordered_map<std::string, std::string> extendConfig#
Extended configuration for the producer.
Common configuration items include: “STREAM_MODE”: The stream mode, which can be “MPMC”, “MPSC”, or “SPSC”. Default: “MPMC”. If an unsupported mode is specified, an exception will be thrown. MPMC represents multi-producer multi-consumer, MPSC represents multi-producer single-consumer, and SPSC represents single-producer single-consumer. If MPSC or SPSC is selected, the data system will enable multi-stream shared page functionality internally.
-
std::string traceId#
Custom trace ID for troubleshooting and performance optimization.
Only supported in the cloud; settings outside the cloud will not take effect. Maximum length: 36. Valid characters must match the regular expression:
^[a-zA-Z0-9\~\.\-\/_!@#%\^\&\*\(\)\+\=\:;]*$.
-
int64_t delayFlushTime = 5#
-
class StreamProducer : public YR::Producer#
Public Functions
-
virtual void Send(const Element &element)#
Sends data to the producer.
The data is first placed in the buffer and then flushed based on the configured automatic flush strategy (either after a certain interval or when the buffer is full), or manually via Flush to make the data available to consumers.
// producer send data std::string str = "hello"; YR::Element element((uint8_t *)(str.c_str()), str.size()); producer->Send(element);
-
virtual void Send(const Element &element, int64_t timeoutMs)#
Sends data to the producer.
The data is first placed in the buffer and then flushed based on the configured automatic flush strategy (either after a certain interval or when the buffer is full), or manually via Flush to make the data available to consumers.
-
virtual void Send(const Element &element)#
-
std::shared_ptr<Consumer> YR::Subscribe(const std::string &streamName, const SubscriptionConfig &config, bool autoAck = false)#
Create a consumer.
YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM); std::shared_ptr<YR::Consumer> consumer = YR::Subscribe(streamName, config);
- Parameters:
streamName – The name of the stream. Must be less than 256 characters and contain only the following characters: (a-zA-Z0-9.-\/_!#%\^&*()+=\:;).
config – Configuration information for the consumer.
autoAck – If
autoAckis true, the consumer will automatically send an Acknowledgment (Ack) for received messages to the data system. Default value: false.
- Throws:
Exception – 4006: not support local mode.
- Returns:
A pointer to the created consumer.
The parameter structure is supplemented with the following explanation:
-
struct Element#
-
struct SubscriptionConfig#
Public Functions
-
inline SubscriptionConfig(std::string subName, const SubscriptionType subType)#
Constructor for SubscriptionConfig.
- Parameters:
subName – Subscription name
subType – Subscription type
Public Members
-
std::string subscriptionName#
Subscription name.
-
SubscriptionType subscriptionType = SubscriptionType::STREAM#
Subscription type, including three types: STREAM, ROUND_ROBIN, and KEY_PARTITIONS.
STREAM indicates that a single consumer in a subscription group consumes the stream. ROUND_ROBIN indicates that multiple consumers in a subscription group consume the stream in a round-robin load-balancing manner. KEY_PARTITIONS indicates that multiple consumers in a subscription group consume the stream in a key-partitioned load-balancing manner. Currently, only the STREAM type is supported; other types are temporarily unsupported. Default subscription type: STREAM.
-
std::unordered_map<std::string, std::string> extendConfig#
Extended configuration for SubscriptionConfig.
-
std::string traceId#
Custom trace ID for troubleshooting and performance optimization.
Only supported in the cloud; settings outside the cloud will not take effect. Maximum length: 36. Valid characters must match the regular expression:
^[a-zA-Z0-9\~\.\-\/_!@#%\^\&\*\(\)\+\=\:;]*$.
-
inline SubscriptionConfig(std::string subName, const SubscriptionType subType)#
-
class StreamConsumer : public YR::Consumer#
Public Functions
-
virtual void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector<Element> &outElements)#
Receives data with subscription functionality.
The consumer waits for the expected number of elements (
expectNum) to be received. The call returns when the timeout (timeoutMs) is reached or the expected number of elements is received.// consumer receive data std::vector<YR::Element> elements; consumer->Receive(1, 6000, elements); // timeout 6s consumer->Ack(elements[0].id); std::string actualData0(reinterpret_cast<char *>(elements[0].ptr), elements[0].size); std::cout << "receive: " << actualData0 << std::endl;
- Parameters:
expectNum – The expected number of elements to receive.
timeoutMs – The timeout in milliseconds.
outElements – The actual elements received.
- Throws:
YR::Exception – Thrown in the following cases:
3003: the total size exceed the uint64_t max value or the total size exceed the limit.
4299: failed to receive element with expectNum.
-
virtual void Receive(uint32_t timeoutMs, std::vector<Element> &outElements)#
Receives data with subscription functionality.
The consumer waits for the expected number of elements (
expectNum) to be received. The call returns when the timeout (timeoutMs) is reached or the expected number of elements is received.- Parameters:
timeoutMs – The timeout in milliseconds.
outElements – The actual elements received.
- Throws:
YR::Exception – Thrown in the following cases:
3003: the total size exceed the uint64_t max value or the total size exceed the limit.
4299: failed to receive element.
-
virtual void Ack(uint64_t elementId)#
Acknowledges that a specific element (identified by
elementId) has been consumed.This allows workers to determine if all consumers have finished consuming the element, enabling internal memory回收 mechanisms if all consumers have acknowledged it. If not acknowledged, the element will be automatically acknowledged when the consumer exits.
// consumer receive data std::vector<YR::Element> elements; consumer->Receive(1, 6000, elements); // timeout 6s consumer->Ack(elements[0].id); std::string actualData0(reinterpret_cast<char *>(elements[0].ptr), elements[0].size); std::cout << "receive: " << actualData0 << std::endl;
- Parameters:
elementId – The ID of the element to acknowledge.
- Throws:
YR::Exception –
4299: failed to ack.
-
virtual void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector<Element> &outElements)#
-
void YR::DeleteStream(const std::string &streamName)#
Deletes a stream.
When the global count of producers and consumers for the stream reaches zero, this stream is no longer in use, and all related metadata on workers and master nodes will be cleaned up. This function can be called on any Host node.
// delete stream YR::DeleteStream(streamName);
- Parameters:
streamName – The name of the stream. Must be less than 256 characters and contain only the following characters: (a-zA-Z0-9.-\/_!#%\^&*()+=\:;).
- Throws:
Exception – 4006: not support local mode.