Stream#

std::shared_ptr<Producer> YR::CreateProducer(const std::string &streamName, ProducerConf producerConf = {})#

Creates a producer.

std::string streamName = "streamName";
// create stream producer
YR::ProducerConf producerConf{};
std::shared_ptr<YR::Producer> producer = YR::CreateProducer(streamName, producerConf);

Parameters:
  • streamName – The name of the stream.

  • producerConf – Configuration information for the producer.

Throws:

Exception4006: not support local mode.

Returns:

A pointer to the created producer.

The parameter structure is supplemented with the following explanation:

struct ProducerConf#

Public Members

int64_t delayFlushTime = 5#

After sending, the producer will delay for the specified duration before triggering a flush.

<0: Do not automatically flush; 0: Flush immediately; otherwise, the delay duration in milliseconds. Default: 5.

int64_t pageSize = 1024 * 1024ul#

Specifies the buffer page size for the producer, in bytes (B).

When a page is full, it will trigger a flush. Default: 1 MB. Must be greater than 0 and a multiple of 4 KB.

uint64_t maxStreamSize = 100 * 1024 * 1024ul#

Specifies the maximum shared memory size that the stream can use on a worker, in bytes (B).

Default: 100 MB. Range: [64 KB, size of worker shared memory].

bool autoCleanup = false#

Specifies whether to enable automatic cleanup for the stream.

Default: false (disabled). When the last producer/consumer exits, the stream will be automatically cleaned up.

bool encryptStream = false#

Specifies whether to enable content encryption for the stream.

Default: false (disabled).

uint64_t retainForNumConsumers = 0#

Specifies how many consumers should retain the producer’s data.

Default: 0. If set to 0, data will not be retained if there are no consumers. This parameter is only effective for the first consumer created, and the current valid range is [0, 1]. Multiple consumers are not supported. A consumer created after the producer may not receive data.

uint64_t reserveSize = 0#

Specifies the reserved memory size, in bytes (B).

When creating a producer, it will attempt to reserve reserveSize bytes of memory. If reservation fails, an exception will be thrown. reserveSize must be an integer multiple of pageSize and within the range [0, maxStreamSize]. If reserveSize is 0, it will be set to pageSize by default. Default: 0.

std::unordered_map<std::string, std::string> extendConfig#

Extended configuration for the producer.

Common configuration items include: “STREAM_MODE”: The stream mode, which can be “MPMC”, “MPSC”, or “SPSC”. Default: “MPMC”. If an unsupported mode is specified, an exception will be thrown. MPMC represents multi-producer multi-consumer, MPSC represents multi-producer single-consumer, and SPSC represents single-producer single-consumer. If MPSC or SPSC is selected, the data system will enable multi-stream shared page functionality internally.

std::string traceId#

Custom trace ID for troubleshooting and performance optimization.

Only supported in the cloud; settings outside the cloud will not take effect. Maximum length: 36. Valid characters must match the regular expression: ^[a-zA-Z0-9\~\.\-\/_!@#%\^\&\*\(\)\+\=\:;]*$.

class StreamProducer : public YR::Producer#

Public Functions

virtual void Send(const Element &element)#

Sends data to the producer.

The data is first placed in the buffer and then flushed based on the configured automatic flush strategy (either after a certain interval or when the buffer is full), or manually via Flush to make the data available to consumers.

// producer send data
std::string str = "hello";
YR::Element element((uint8_t *)(str.c_str()), str.size());
producer->Send(element);

Parameters:

element – The Element data to be sent.

Throws:

Exception

  • 4299: failed to send element.

virtual void Send(const Element &element, int64_t timeoutMs)#

Sends data to the producer.

The data is first placed in the buffer and then flushed based on the configured automatic flush strategy (either after a certain interval or when the buffer is full), or manually via Flush to make the data available to consumers.

Parameters:
  • element – The Element data to be sent.

  • timeoutMs – Optional timeout in milliseconds.

Throws:

Exception

  • 4299: failed to send element.

virtual void Close()#

Closes the producer, triggering an automatic flush of the buffer and indicating that the buffer will no longer be used.

Once closed, the producer cannot be used again.

producer->Close();

Throws:

Exception

  • 4299: failed to close producer.

std::shared_ptr<Consumer> YR::Subscribe(const std::string &streamName, const SubscriptionConfig &config, bool autoAck = false)#

Create a consumer.

YR::SubscriptionConfig config("subName", YR::SubscriptionType::STREAM);
std::shared_ptr<YR::Consumer> consumer = YR::Subscribe(streamName, config);

Parameters:
  • streamName – The name of the stream. Must be less than 256 characters and contain only the following characters: (a-zA-Z0-9.-\/_!#%\^&*()+=\:;).

  • config – Configuration information for the consumer.

  • autoAck – If autoAck is true, the consumer will automatically send an Acknowledgment (Ack) for received messages to the data system. Default value: false.

Throws:

Exception4006: not support local mode.

Returns:

A pointer to the created consumer.

The parameter structure is supplemented with the following explanation:

struct Element#

Public Members

uint8_t *ptr#

Pointer to the data.

uint64_t size#

Size of the data.

uint64_t id#

ID of the Element.

struct SubscriptionConfig#

Public Functions

inline SubscriptionConfig(std::string subName, const SubscriptionType subType)#

Constructor for SubscriptionConfig.

Parameters:
  • subName – Subscription name

  • subType – Subscription type

Public Members

std::string subscriptionName#

Subscription name.

SubscriptionType subscriptionType = SubscriptionType::STREAM#

Subscription type, including three types: STREAM, ROUND_ROBIN, and KEY_PARTITIONS.

STREAM indicates that a single consumer in a subscription group consumes the stream. ROUND_ROBIN indicates that multiple consumers in a subscription group consume the stream in a round-robin load-balancing manner. KEY_PARTITIONS indicates that multiple consumers in a subscription group consume the stream in a key-partitioned load-balancing manner. Currently, only the STREAM type is supported; other types are temporarily unsupported. Default subscription type: STREAM.

std::unordered_map<std::string, std::string> extendConfig#

Extended configuration for SubscriptionConfig.

std::string traceId#

Custom trace ID for troubleshooting and performance optimization.

Only supported in the cloud; settings outside the cloud will not take effect. Maximum length: 36. Valid characters must match the regular expression: ^[a-zA-Z0-9\~\.\-\/_!@#%\^\&\*\(\)\+\=\:;]*$.

class StreamConsumer : public YR::Consumer#

Public Functions

virtual void Receive(uint32_t expectNum, uint32_t timeoutMs, std::vector<Element> &outElements)#

Receives data with subscription functionality.

The consumer waits for the expected number of elements (expectNum) to be received. The call returns when the timeout (timeoutMs) is reached or the expected number of elements is received.

// consumer receive data
std::vector<YR::Element> elements;
consumer->Receive(1, 6000, elements);  // timeout 6s
consumer->Ack(elements[0].id);
std::string actualData0(reinterpret_cast<char *>(elements[0].ptr), elements[0].size);
std::cout << "receive: " << actualData0 << std::endl;

Parameters:
  • expectNum – The expected number of elements to receive.

  • timeoutMs – The timeout in milliseconds.

  • outElements – The actual elements received.

Throws:

YR::Exception – Thrown in the following cases:

  • 3003: the total size exceed the uint64_t max value or the total size exceed the limit.

  • 4299: failed to receive element with expectNum.

virtual void Receive(uint32_t timeoutMs, std::vector<Element> &outElements)#

Receives data with subscription functionality.

The consumer waits for the expected number of elements (expectNum) to be received. The call returns when the timeout (timeoutMs) is reached or the expected number of elements is received.

Parameters:
  • timeoutMs – The timeout in milliseconds.

  • outElements – The actual elements received.

Throws:

YR::Exception – Thrown in the following cases:

  • 3003: the total size exceed the uint64_t max value or the total size exceed the limit.

  • 4299: failed to receive element.

virtual void Ack(uint64_t elementId)#

Acknowledges that a specific element (identified by elementId) has been consumed.

This allows workers to determine if all consumers have finished consuming the element, enabling internal memory回收 mechanisms if all consumers have acknowledged it. If not acknowledged, the element will be automatically acknowledged when the consumer exits.

// consumer receive data
std::vector<YR::Element> elements;
consumer->Receive(1, 6000, elements);  // timeout 6s
consumer->Ack(elements[0].id);
std::string actualData0(reinterpret_cast<char *>(elements[0].ptr), elements[0].size);
std::cout << "receive: " << actualData0 << std::endl;

Parameters:

elementId – The ID of the element to acknowledge.

Throws:

YR::Exception

  • 4299: failed to ack.

virtual void Close()#

Closes the consumer.

Once closed, the consumer cannot be used again.

consumer->Close();

Throws:

YR::Exception

  • 4299: failed to Close consumer.

void YR::DeleteStream(const std::string &streamName)#

Deletes a stream.

When the global count of producers and consumers for the stream reaches zero, this stream is no longer in use, and all related metadata on workers and master nodes will be cleaned up. This function can be called on any Host node.

// delete stream
YR::DeleteStream(streamName);

Parameters:

streamName – The name of the stream. Must be less than 256 characters and contain only the following characters: (a-zA-Z0-9.-\/_!#%\^&*()+=\:;).

Throws:

Exception4006: not support local mode.