Stateful Functions#
A stateful function is essentially a stateful worker process (or service). When you instantiate a stateful function, openYuanrong creates a new worker process and schedules method calls of the stateful function to that worker process, through which you can access and modify the state of the worker process.
Method calls to different stateful function instances execute in parallel, while method calls on the same stateful function instance execute sequentially and share state.
Note
openYuanrong does not actively recycle stateful function instances. You need to actively call the terminate method to destroy them when all tasks are completed, otherwise resource leaks will occur.
import yr
# Declare stateful function
@yr.instance
class Counter:
def __init__(self):
self.count = 0
def add(self, n):
self.count += n
return self.count
def get(self):
return self.count
if __name__ == "__main__":
# Initialize openYuanrong runtime environment
yr.init()
# Create stateful function instance
counter = Counter.invoke()
# Asynchronously (non-blocking) call stateful function methods to interact with it
result_add = counter.add.invoke(3)
result_get = counter.get.invoke()
# Synchronously (blocking) get results, output 3 3
# You can also use yr.wait() method, which only waits for call completion without directly getting results
print(yr.get(result_add), yr.get(result_get))
# Destroy function instance
counter.terminate()
# Release openYuanrong environment resources
yr.finalize()
#include <iostream>
#include "yr/yr.h"
class Counter {
public:
Counter() : count(0) {}
static Counter *FactoryCreate()
{
return new Counter();
}
int Add(int n)
{
count += n;
return count;
}
int Get()
{
return count;
}
YR_STATE(count);
private:
int count;
};
// Declare stateful function
YR_INVOKE(Counter::FactoryCreate, &Counter::Add, &Counter::Get)
int main(int argc, char *argv[])
{
// Initialize openYuanrong runtime environment
YR::Init(YR::Config{}, argc, argv);
// Create stateful function instance
auto counter = YR::Instance(Counter::FactoryCreate).Invoke();
// Asynchronously (non-blocking) call stateful function methods to interact with it
auto resultAdd = counter.Function(&Counter::Add).Invoke(3);
auto resultGet = counter.Function(&Counter::Get).Invoke();
// Synchronously (blocking) get results, output 3:3
// You can also use YR::Wait() method, which only waits for call completion without directly getting results
std::cout << *YR::Get(resultAdd) << ":" << *YR::Get(resultGet) << std::endl;
// Destroy function instance
counter.Terminate();
// Release openYuanrong environment resources
YR::Finalize();
return 0;
}
// Counter.java
package com.example;
// Define stateful function
public class Counter {
private int count;
public Counter() {
this.count = 0;
}
public int add(int n) {
this.count += n;
return this.count;
}
public int get() {
return this.count;
}
}
// Main.java
package com.example;
import org.yuanrong.Config;
import org.yuanrong.InvokeOptions;
import org.yuanrong.api.YR;
import org.yuanrong.call.InstanceHandler;
import org.yuanrong.exception.YRException;
import org.yuanrong.runtime.client.ObjectRef;
public class Main {
public static void main(String[] args) throws YRException {
// Initialize openYuanrong runtime environment
YR.init();
// Create stateful function instance
InstanceHandler counter = YR.instance(Counter::new).invoke();
// Asynchronously (non-blocking) call stateful function methods to interact with it
ObjectRef refAdd = counter.function(Counter::add).invoke(3);
ObjectRef refGet = counter.function(Counter::get).invoke();
// Synchronously (blocking) get results, output 3:3
// You can also use YR.wait() method, which only waits for call completion without directly getting results
System.out.println(YR.get(refAdd, 9) + ":" + YR.get(refGet, 9));
// Destroy function instance
counter.terminate();
// Release openYuanrong environment resources
YR.Finalize();
}
}
Specifying Resources Required by Stateful Functions#
When instantiating a stateful function, you can dynamically configure its resources. When not configured, the default resources are cpu 500 millicores and memory 500 MiB. Other custom resources (such as NPU, GPU, etc.) can be configured through the custom_resources field as key-value pairs. For more information about resources, please refer to the Resources chapter.
The types and total amounts of custom resources need to be specified when deploying openYuanrong. Except for GPU and NPU, openYuanrong does not detect other custom resources. Refer to the following example to specify custom resources on nodes. Custom resource amounts are only used for logical deduction during scheduling and do not limit openYuanrong functions’ usage of actual physical resources.
yr start --master -s 'function_agent.args.custom_resources="{\"ssd\":1}"'
The following is an example of configuring resources for stateful functions.
import yr
@yr.instance
class Counter:
def __init__(self):
self.count = 0
def add(self, n):
self.count += n
return self.count
def get(self):
return self.count
if __name__ == "__main__":
yr.init()
# Run Counter function on 1 CPU core, 1G memory, 1 custom ssd resource
opt = yr.InvokeOptions()
opt.cpu = 1000
opt.memory = 1024
opt.custom_resources = {"ssd": 1}
counter = Counter.options(opt).invoke()
result_add = counter.add.invoke(3)
print(yr.get(result_add))
counter.terminate()
yr.finalize()
#include <iostream>
#include "yr/yr.h"
class Counter {
public:
Counter() : count(0) {}
static Counter *FactoryCreate()
{
return new Counter();
}
int Add(int n)
{
count += n;
return count;
}
int Get()
{
return count;
}
YR_STATE(count);
private:
int count;
};
YR_INVOKE(Counter::FactoryCreate, &Counter::Add, &Counter::Get)
int main(int argc, char *argv[])
{
YR::Init(YR::Config{}, argc, argv);
// Run Counter function on 1 CPU core, 1G memory, 1 custom ssd resource
YR::InvokeOptions opt;
opt.cpu = 1000;
opt.memory = 1024;
opt.customResources["ssd"] = 1;
auto counter = YR::Instance(Counter::FactoryCreate).Options(opt).Invoke();
auto resultAdd = counter.Function(&Counter::Add).Invoke(3);
std::cout << *YR::Get(resultAdd) << std::endl;
counter.Terminate();
YR::Finalize();
return 0;
}
// Counter.java
package com.example;
// Define stateful function
public class Counter {
private int count;
public Counter() {
this.count = 0;
}
public int add(int n) {
this.count += n;
return this.count;
}
public int get() {
return this.count;
}
}
// Main.java
package com.example;
import org.yuanrong.Config;
import org.yuanrong.InvokeOptions;
import org.yuanrong.api.YR;
import org.yuanrong.call.InstanceHandler;
import org.yuanrong.call.InstanceCreator;
import org.yuanrong.exception.YRException;
import org.yuanrong.runtime.client.ObjectRef;
public class Main {
public static void main(String[] args) throws YRException {
YR.init();
// Run Counter function on 1 CPU core, 1G memory, 1 custom ssd resource
InvokeOptions opt = new InvokeOptions.Builder().addCustomResource("ssd", 1.0f).cpu(1000).memory(1024).build();
InstanceHandler counter = YR.instance(Counter::new).options(opt).invoke();
ObjectRef refAdd = counter.function(Counter::add).invoke(3);
System.out.println(YR.get(refAdd, 9));
counter.terminate();
YR.Finalize();
}
}
Passing Stateful Function Handles#
Stateful function handles can be passed as parameters to other stateless or stateful functions, and the handle can be used to call stateful function methods within those functions.
import yr
@yr.invoke
def get(counter):
return yr.get(counter.get.invoke())
@yr.instance
class Counter:
def __init__(self):
self.count = 0
def add(self, n):
self.count += n
return self.count
def get(self):
return self.count
if __name__ == "__main__":
yr.init()
counter = Counter.invoke()
result_add = counter.add.invoke(3)
print(yr.get(result_add))
result_get = get.invoke(counter)
# Output 3
print(yr.get(result_get))
counter.terminate()
yr.finalize()
#include <iostream>
#include "yr/yr.h"
class Counter {
public:
Counter() : count(0) {}
static Counter *FactoryCreate()
{
return new Counter();
}
int Add(int n)
{
count += n;
return count;
}
int Get()
{
return count;
}
YR_STATE(count);
private:
int count;
};
YR_INVOKE(Counter::FactoryCreate, &Counter::Add, &Counter::Get)
int Get(YR::NamedInstance<Counter> counter)
{
return *YR::Get(counter.Function(&Counter::Get).Invoke());
}
YR_INVOKE(Get)
int main(int argc, char *argv[])
{
YR::Init(YR::Config{}, argc, argv);
auto counter = YR::Instance(Counter::FactoryCreate).Invoke();
auto resultAdd = counter.Function(&Counter::Add).Invoke(3);
std::cout << *YR::Get(resultAdd) << std::endl;
auto resultGet = YR::Function(Get).Invoke(counter);
std::cout << *YR::Get(resultGet) << std::endl;
// Destroy function instance
counter.Terminate();
// Release openYuanrong environment resources
YR::Finalize();
return 0;
}
This feature is not currently supported.
Communication Between Stateful Functions#
Communication between stateful function instances can be completed through method calls within functions, and data can be shared through Data Objects or data streams to achieve coordination.
Scheduling#
openYuanrong will select appropriate nodes to run stateful functions based on the specified resources and configured scheduling policies. For details, please refer to the Scheduling chapter.