Collective#

CollectiveGroup#

The parameter structure is supplemented with the following explanation:

struct CollectiveGroupSpec#

Configuration specification for collective communication groups.

Public Members

int worldSize#

Total number of processes in the group (world size).

std::string groupName = "default"#

Name of the group, default is “default”. Must match regex: ^[a-zA-Z0-9_!#%^*()+=:;-]+$.

Backend backend = Backend::GLOO#

Backend type to use, default is GLOO.

int timeout = DEFAULT_COLLECTIVE_TIMEOUT#

Operation timeout in milliseconds, default is 60000 ms (60 seconds).

Type and enumeration definitions:

enum YR::Collective::Backend#

Backend type for collective communication.

Values:

enumerator GLOO#

Use GLOO backend for collective communication.

enumerator INVALID#

Invalid backend type.

enum YR::DataType#

Data type enumeration for collective communication operations.

Values:

enumerator INT#

Integer type (int).

enumerator DOUBLE#

Double precision floating point type (double).

enumerator LONG#

Long integer type (long).

enumerator FLOAT#

Single precision floating point type (float).

enumerator INVALID#

Invalid data type.

enum YR::ReduceOp#

Reduction operator enumeration for collective communication operations.

Values:

enumerator SUM#

Sum operation.

enumerator PRODUCT#

Product operation.

enumerator MIN#

Minimum operation.

enumerator MAX#

Maximum operation.

Constants:

const int YR::Collective::DEFAULT_COLLECTIVE_TIMEOUT = 60 * 1000#

Default timeout for collective communication operations in milliseconds.

const std::string YR::Collective::DEFAULT_GROUP_NAME = "default"#

Default name for collective communication groups.

Collective-GroupOps#

Collective communication group management interfaces for creating, initializing, and destroying collective communication groups.

void YR::Collective::InitCollectiveGroup(const CollectiveGroupSpec &groupSpec, int rank, const std::string &prefix = "")#

Initializes a collective communication group in an actor instance.

This function is used to initialize a collective communication group in the current actor instance. Typically in distributed training or parallel computing scenarios, each process needs to call this function to join a collective communication group.

        YR::Collective::CollectiveGroupSpec spec;
        spec.worldSize = 4;
        spec.groupName = "my_group";
        spec.backend = YR::Collective::Backend::GLOO;
        spec.timeout = 60000;

        int rank = 0;  // Current process rank
        YR::Collective::InitCollectiveGroup(spec, rank);

        // Use collective communication operations...

Note

Must be called after YR::Init(). The same groupName cannot be initialized repeatedly, otherwise an exception will be thrown. groupName must match the regex: ^[a-zA-Z0-9-_!#%\^*()+=\:;]*$ Mixing CreateCollectiveGroup (in driver) and InitCollectiveGroup (in actor) for the same group is not supported. All members of a group must use either CreateCollectiveGroup or InitCollectiveGroup exclusively. Dynamic addition or removal of group members is not supported. Once a group is created, the member count is fixed.

Parameters:
  • groupSpec – Configuration specification for the collective communication group, including worldSize, groupName, backend, and timeout.

  • rank – Rank of the current process in the group, should be in the range [0, worldSize-1].

  • prefix – Storage prefix for key-value storage used by backend communication. Default is an empty string.

Throws:

Exception – Thrown if called before initialization, if groupName is invalid, or if the collective group already exists.

void YR::Collective::CreateCollectiveGroup(const CollectiveGroupSpec &groupSpec, const std::vector<std::string> &instanceIDs, const std::vector<int> &ranks)#

Creates a collective communication group in the driver using actor instance IDs.

This function is used to create a collective communication group in the driver process, specifying the actor instances participating in collective communication and their corresponding ranks. Typically in distributed training scenarios, the driver process calls this function to create the group, and then each actor instance joins the group through InitCollectiveGroup.

    YR::Config conf;
    YR::Init(conf);

    // Create collective communication group with 4 instances
    std::vector<YR::NamedInstance<CollectiveActor>> instances;
    std::vector<std::string> instanceIDs;
    for (int i = 0; i < 4; ++i) {
        auto ins = YR::Instance(CollectiveActor::FactoryCreate).Invoke();
        instances.push_back(ins);
        instanceIDs.push_back(ins.GetInstanceId());
    }

    std::string groupName = "test-group";
    YR::Collective::CollectiveGroupSpec spec{
        .worldSize = 4,
        .groupName = groupName,
        .backend = YR::Collective::Backend::GLOO,
        .timeout = YR::Collective::DEFAULT_COLLECTIVE_TIMEOUT,
    };
    YR::Collective::CreateCollectiveGroup(spec, instanceIDs, {0, 1, 2, 3});

    // Invoke collective operations on all instances
    std::vector<int> input = {1, 2, 3, 4};
    std::vector<YR::ObjectRef<int>> res;
    for (int i = 0; i < 4; ++i) {
        res.push_back(instances[i]
                          .Function(&CollectiveActor::Compute)
                          .Invoke(input, groupName, static_cast<uint8_t>(YR::ReduceOp::SUM)));
    }

    // Get results
    auto res0 = *YR::Get(res[0]);  // allreduce result
    auto res1 = *YR::Get(res[1]);  // send recv result(input)
    std::cout << "AllReduce result: " << res0 << ", Recv result: " << res1 << std::endl;

    // Destroy the collective group
    YR::Collective::DestroyCollectiveGroup(groupName);
    YR::Finalize();
    return 0;

Note

The size of instanceIDs must equal worldSize. The size of ranks must equal worldSize. If groupName already exists, an exception will be thrown. You need to call DestroyCollectiveGroup first to destroy the existing group. Mixing CreateCollectiveGroup (in driver) and InitCollectiveGroup (in actor) for the same group is not supported. All members of a group must use either CreateCollectiveGroup or InitCollectiveGroup exclusively. Dynamic addition or removal of group members is not supported. Once a group is created, the member count is fixed.

Parameters:
  • groupSpec – Configuration specification for the collective communication group.

  • instanceIDs – List of actor instance IDs, size must equal worldSize.

  • ranks – List of ranks corresponding to each instance, size must equal worldSize, and rank values should be in the range [0, worldSize-1].

Throws:

Exception – Thrown if instanceIDs, ranks, and worldSize don’t match, if groupName already exists, or if groupName is invalid.

void YR::Collective::DestroyCollectiveGroup(const std::string &groupName)#

Destroys the specified collective communication group.

This function is used to destroy a created collective communication group and release related resources.

Note

If the group doesn’t exist, this function won’t throw an exception and will handle it silently.

Parameters:

groupName – Name of the group to destroy.

int YR::Collective::GetWorldSize(const std::string &groupName = DEFAULT_GROUP_NAME)#

Gets the total number of processes in the specified group.

This function returns the total number of processes (world size) in the collective communication group.

// Initialize collective communication group...
std::string groupName = "my_group";
int worldSize = YR::Collective::GetWorldSize(groupName);
std::cout << "World size: " << worldSize << std::endl;

Parameters:

groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

Returns:

The total number of processes (world size) in the group.

int YR::Collective::GetRank(const std::string &groupName = DEFAULT_GROUP_NAME)#

Gets the rank of the current process in the specified group.

This function returns the rank of the current process in the collective communication group.

// Initialize collective communication group...
std::string groupName = "my_group";
int rank = YR::Collective::GetRank(groupName);
std::cout << "My rank: " << rank << std::endl;

Parameters:

groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

Returns:

The rank of the current process in the group, in the range [0, worldSize-1].

Collective-CommOps#

Collective communication operation interfaces, providing distributed communication primitives such as AllReduce, Reduce, AllGather, Broadcast, etc.

void YR::Collective::AllReduce(const void *sendbuf, void *recvbuf, int count, DataType dtype, const ReduceOp &op, const std::string &groupName = DEFAULT_GROUP_NAME)#

Performs a reduction operation across all processes and broadcasts the result to all processes.

This function performs a reduction operation (such as sum, max, etc.) across all processes and writes the result back to all processes’ recvbuf. All processes’ recvbuf will eventually contain the same result.

        std::vector<int> output(in.size());

        // AllReduce: perform reduction across all processes and broadcast result to all
        YR::Collective::AllReduce(in.data(), output.data(), in.size(), YR::DataType::INT, YR::ReduceOp(op), groupName);

        // Barrier: synchronize all processes
        YR::Collective::Barrier(groupName);
        YR::Collective::DestroyCollectiveGroup(groupName);
        int result = 0;
        for (int i = 0; i < in.size(); ++i) {
            result += output[i];
        }
        return result;

Note

sendbuf and recvbuf can point to the same memory (in-place operation). All processes must call this function with consistent parameters. Must be called after the group is created and initialized.

Parameters:
Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Reduce(const void *sendbuf, void *recvbuf, int count, DataType dtype, const ReduceOp &op, int dstRank, const std::string &groupName = DEFAULT_GROUP_NAME)#

Performs a reduction operation across all processes and sends the result to the specified destination process.

This function performs a reduction operation across all processes, but only the dstRank process’s recvbuf will contain the reduction result. The recvbuf content of other processes is undefined.

        std::string groupName = "my_group";
        std::vector<int> localData(100, 1);
        std::vector<int> result(100);

        int rootRank = 0;  // Result sent to rank 0
        YR::Collective::Reduce(localData.data(), result.data(), localData.size(), YR::DataType::INT, YR::ReduceOp::SUM,
                               rootRank, groupName);

        int rank = YR::Collective::GetRank(groupName);
        if (rank == rootRank) {
            // Only rootRank's result contains valid result
            std::cout << "Reduced result: " << result[0] << std::endl;
        }

Note

The recvbuf output data of non-root ranks (non-dstRank) is unreliable and should not be used. Only the dstRank process’s recvbuf contains valid reduction results.

Parameters:
  • sendbuf – Send buffer containing local input data.

  • recvbuf – Receive buffer, only used in the dstRank process, for storing the reduction result. Other processes’ recvbuf content is undefined.

  • count – Number of data elements.

  • dtype – Data type.

  • op – Reduction operator.

  • dstRank – Rank of the destination process where the reduction result will be sent.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::AllGather(const void *sendbuf, void *recvbuf, int count, DataType dtype, const std::string &groupName = DEFAULT_GROUP_NAME)#

Gathers data from all processes and broadcasts the result to all processes.

This function gathers data from all processes and writes the gathered data back to all processes’ recvbuf in rank order. The size of recvbuf should be count * worldSize.

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);
        int worldSize = YR::Collective::GetWorldSize(groupName);

        // Each process sends different data
        std::vector<float> localData(10, static_cast<float>(rank));
        std::vector<float> gatheredData(10 * worldSize);

        YR::Collective::AllGather(localData.data(), gatheredData.data(), localData.size(), YR::DataType::FLOAT,
                                  groupName);

Note

The size of recvbuf must be at least count * worldSize. Gathered data is arranged in rank order in recvbuf.

Parameters:
  • sendbuf – Send buffer containing local data to send.

  • recvbuf – Receive buffer for storing data gathered from all processes. Size should be count * worldSize.

  • count – Number of data elements sent by each process.

  • dtype – Data type.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Broadcast(const void *sendbuf, void *recvbuf, int count, DataType dtype, int srcRank, const std::string &groupName = DEFAULT_GROUP_NAME)#

Broadcasts data from the source process to all processes.

This function broadcasts data from the srcRank process to all processes in the group. All processes’ recvbuf will eventually contain the same data (from srcRank’s sendbuf).

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);
        int srcRank = 0;

        std::vector<int> data(100);
        if (rank == srcRank) {
            // Source process initializes data
            for (int i = 0; i < 100; i++) {
                data[i] = i;
            }
        }

        YR::Collective::Broadcast(data.data(), data.data(), data.size(), YR::DataType::INT, srcRank, groupName);
        // All processes' data now contains the same content (from srcRank)

Note

sendbuf and recvbuf can point to the same memory (in-place operation). All processes must call this function with consistent parameters.

Parameters:
  • sendbuf – Send buffer, only used in the srcRank process, containing data to broadcast.

  • recvbuf – Receive buffer for storing the broadcast data. All processes’ recvbuf will eventually contain the same data.

  • count – Number of data elements.

  • dtype – Data type.

  • srcRank – Rank of the source process.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Scatter(const std::vector<void*> sendbuf, void *recvbuf, int count, DataType dtype, int srcRank, const std::string &groupName = DEFAULT_GROUP_NAME)#

Scatters data from the source process to all processes.

This function scatters data from the srcRank process’s sendbuf vector to all processes. The sendbuf vector of the srcRank process should contain worldSize buffers, each buffer corresponding to a target rank.

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);
        int worldSize = YR::Collective::GetWorldSize(groupName);
        int srcRank = 0;

        std::vector<int> recvData(10);

        if (rank == srcRank) {
            // Source process prepares data to scatter
            std::vector<std::vector<int>> sendData(worldSize, std::vector<int>(10));
            for (int i = 0; i < worldSize; i++) {
                for (int j = 0; j < 10; j++) {
                    sendData[i][j] = i * 10 + j;
                }
            }

            std::vector<void *> sendbuf;
            for (auto &vec : sendData) {
                sendbuf.push_back(vec.data());
            }

            YR::Collective::Scatter(sendbuf, recvData.data(), 10, YR::DataType::INT, srcRank, groupName);
        } else {
            // Non-source processes
            std::vector<void *> sendbuf;  // Can be empty
            YR::Collective::Scatter(sendbuf, recvData.data(), 10, YR::DataType::INT, srcRank, groupName);
        }

        // Each process's recvData now contains corresponding data from srcRank

Note

The sendbuf vector is only used in the srcRank process, other processes can pass an empty vector. The size of the sendbuf vector must equal worldSize.

Parameters:
  • sendbuf – Send buffer vector, only used in the srcRank process. Size should be worldSize, each element points to data to be sent to the corresponding rank.

  • recvbuf – Receive buffer for storing data received from srcRank.

  • count – Number of data elements received by each process.

  • dtype – Data type.

  • srcRank – Rank of the source process.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Barrier(const std::string &groupName = DEFAULT_GROUP_NAME)#

Synchronization barrier that blocks until all processes in the group reach this point.

This function is used to synchronize all processes, ensuring that all processes reach the Barrier call point before executing subsequent code.

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);

        // Perform some asynchronous operations...
        std::cout << "Rank " << rank << " before barrier" << std::endl;

        YR::Collective::Barrier(groupName);

        // All processes will wait here until all processes arrive
        std::cout << "Rank " << rank << " after barrier" << std::endl;

Note

All processes must call this function, otherwise it will cause a deadlock. This function blocks until all processes reach the Barrier call point.

Parameters:

groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Send(const void *sendbuf, int count, DataType dtype, int dstRank, int tag = 0, const std::string &groupName = DEFAULT_GROUP_NAME)#

Sends data to the specified process.

This function is used for point-to-point communication, sending data to the specified destination process. Must be paired with the Recv function, matched through the tag parameter.

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);
        int worldSize = YR::Collective::GetWorldSize(groupName);

        if (rank == 0) {
            // Rank 0 sends data to Rank 1
            std::vector<float> data(100, 1.0f);
            YR::Collective::Send(data.data(), data.size(), YR::DataType::FLOAT, 1, 0, groupName);
        } else if (rank == 1) {
            // Rank 1 receives data from Rank 0
            std::vector<float> recvData(100);
            YR::Collective::Recv(recvData.data(), recvData.size(), YR::DataType::FLOAT, 0, 0, groupName);
        }

Note

Send and Recv must be paired, and tags must match. Send operation blocks until the corresponding Recv operation is called.

Parameters:
  • sendbuf – Send buffer containing data to send.

  • count – Number of data elements to send.

  • dtype – Data type.

  • dstRank – Rank of the destination process.

  • tag – Message tag for matching the corresponding Recv operation, default is 0.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.

void YR::Collective::Recv(void *recvbuf, int count, DataType dtype, int srcRank, int tag = 0, const std::string &groupName = DEFAULT_GROUP_NAME)#

Receives data from the specified process.

This function is used for point-to-point communication, receiving data from the specified source process. Must be paired with the Send function, matched through the tag parameter.

        std::string groupName = "my_group";
        int rank = YR::Collective::GetRank(groupName);
        int worldSize = YR::Collective::GetWorldSize(groupName);

        if (rank == 0) {
            // Rank 0 sends data to Rank 1
            std::vector<float> data(100, 1.0f);
            YR::Collective::Send(data.data(), data.size(), YR::DataType::FLOAT, 1, 0, groupName);
        } else if (rank == 1) {
            // Rank 1 receives data from Rank 0
            std::vector<float> recvData(100);
            YR::Collective::Recv(recvData.data(), recvData.size(), YR::DataType::FLOAT, 0, 0, groupName);
        }

Note

Recv and Send must be paired, and tags must match. Recv operation blocks until the corresponding Send operation is called. count and dtype must match the corresponding Send operation.

Parameters:
  • recvbuf – Receive buffer for storing received data.

  • count – Number of data elements to receive.

  • dtype – Data type.

  • srcRank – Rank of the source process.

  • tag – Message tag for matching the corresponding Send operation, default is 0.

  • groupName – Name of the group, default is “default”.

Throws:

Exception – Thrown if the group doesn’t exist or hasn’t been created yet.