Running Distributed Jobs in Function Service#
Function services are commonly used for developing service-type applications, while stateless and stateful functions are often used for developing job-type applications. In practical business scenarios, a complex service request may need to complete multiple distributed jobs. A common approach is to use a function service to receive and parse external requests, then invoke stateless or stateful functions to process business logic. We introduce the implementation of such approach through an example of calculating π using the Monte Carlo method.
Prerequisites#
We use Python for development, defining a stateless function to handle point-counting tasks and a function service to receive external requests with customizable task counts. The example is deployed on an openYuanrong host cluster.
Deployed openYuanrong on hosts and configured to support function services.
Created the same code package directory on all nodes, e.g.,
/opt/code, to store the built executable function code.
Implementation Process#
Define Stateless Function to Handle Point-counting Tasks#
Create a new file monte_carlo.py in the code package directory with the following content. Define a stateless function compute_pi to handle point-counting tasks and count the number of points falling within the circle.
import yr
import random
@yr.invoke
def compute_pi(total_points: int) -> int:
circle_points = 0
for i in range(total_points):
rand_x = random.uniform(-1, 1)
rand_y = random.uniform(-1, 1)
origin_dist = rand_x**2 + rand_y**2
if origin_dist <= 1:
circle_points += 1
return circle_points
Use curl tool to register this stateless function. For parameter meanings, see API documentation:
# Replace /opt/code with your code package directory
META_SERVICE_ENDPOINT=<meta service component endpoint, default http://{master node ip}:31182>
curl -H "Content-type: application/json" -X POST -i ${META_SERVICE_ENDPOINT}/serverless/v1/functions -d '{"name":"0-baas-task","runtime":"python3.9","kind":"yrlib","cpu":600,"memory":512,"timeout":60,"storageType":"local","codePath":"/opt/code"}'
Record the value of the id field in the return format for use in the FaaS function, which corresponds to sn:cn:yrk:default:function:0-baas-task:$latest.
{"code":0,"message":"SUCCESS","function":{"id":"sn:cn:yrk:default:function:0-baas-task:$latest","createTime":"2025-04-28 11:31:20.986 UTC","updateTime":"","functionUrn":"sn:cn:yrk:default:function:0-baas-task","name":"0-baas-task","tenantId":"default","businessId":"yrk","productId":"","reversedConcurrency":0,"description":"","tag":null,"functionVersionUrn":"sn:cn:yrk:default:function:0-baas-task:$latest","revisionId":"20250428113120986","codeSize":0,"codeSha256":"","bucketId":"","objectId":"","handler":"","layers":null,"cpu":600,"memory":512,"runtime":"python3.9","timeout":60,"versionNumber":"$latest","versionDesc":"$latest","environment":{},"customResources":null,"statefulFlag":0,"lastModified":"","Published":"2025-04-28 11:31:20.986 UTC","minInstance":0,"maxInstance":100,"concurrentNum":100,"funcLayer":[],"status":"","instanceNum":0,"device":{},"created":""}}
Define Function Service to Receive External Requests#
Create a new file service_entry.py in the code package directory with the following content. The handler interface of this function service parses the tasksNumber parameter configured in external requests, sets the number of point-counting tasks, and invokes the stateless function compute_pi to run the tasks.
import yr
import monte_carlo
def handler(event, context):
tasks_number = event.get("tasksNumber")
POINTS_PER_TASK = 10000000
TOTAL_POINTS = tasks_number * POINTS_PER_TASK
# Dynamically specify resources used when invoking stateless function compute_pi. If not specified, the resources configured when registering the function metadata will be used (cpu 600 millicores, memory 512 MiB)
opt = yr.InvokeOptions(cpu=1000, memory=1000)
results = [
monte_carlo.compute_pi.options(opt).invoke(POINTS_PER_TASK)
for i in range(tasks_number)
]
# Calculate final result
circle_points = sum(yr.get(results))
pi = (circle_points * 4) / TOTAL_POINTS
print(f"π is: {pi}")
return pi
def init(context):
# Configure registration id of stateless function compute_pi
conf = yr.Config(function_id="sn:cn:yrk:default:function:0-baas-task:$latest")
yr.init(conf)
Use curl tool to register the function. For parameter meanings, see API documentation:
# Replace /opt/code with your code package directory
META_SERVICE_ENDPOINT=<meta service component endpoint, default http://{master node ip}:31182>
curl -H "Content-type: application/json" -X POST -i ${META_SERVICE_ENDPOINT}/serverless/v1/functions -d '{"name":"0@faas@demo","runtime":"python3.9","handler":"service_entry.handler","kind":"faas","cpu":600,"memory":512,"timeout":60,"extendedHandler":{"initializer":"service_entry.init"},"extendedTimeout":{"initializer":30},"storageType":"local","codePath":"/opt/code"}'
Record the value of the functionVersionUrn field in the return format for invocation, which corresponds to sn:cn:yrk:default:function:0@faas@demo:latest.
{"code":0,"message":"SUCCESS","function":{"id":"sn:cn:yrk:default:function:0@faas@demo:latest","createTime":"2025-04-28 12:02:21.930 UTC","updateTime":"","functionUrn":"sn:cn:yrk:default:function:0@faas@demo","name":"0@faas@demo","tenantId":"default","businessId":"yrk","productId":"","reversedConcurrency":0,"description":"","tag":null,"functionVersionUrn":"sn:cn:yrk:default:function:0@faas@demo:latest","revisionId":"2025042812022193","codeSize":0,"codeSha256":"","bucketId":"","objectId":"","handler":"service_entry.handler","layers":null,"cpu":600,"memory":512,"runtime":"python3.9","timeout":60,"versionNumber":"latest","versionDesc":"latest","environment":{},"customResources":null,"statefulFlag":0,"lastModified":"","Published":"2025-04-28 12:02:21.930 UTC","minInstance":0,"maxInstance":100,"concurrentNum":100,"funcLayer":[],"status":"","instanceNum":0,"device":{},"created":""}}
Test Application#
Use curl tool to invoke the function service 0@faas@demo. For parameter meanings, see API documentation:
FRONTEND_ENDPOINT=<frontend component endpoint, default http://{master node ip}:8888>
FUNCTION_VERSION_URN=<functionVersionUrn recorded in previous step>
curl -H "Content-type: application/json" -X POST -i ${FRONTEND_ENDPOINT}/serverless/v1/functions/${FUNCTION_VERSION_URN}/invocations -d '{"tasksNumber":2}'
Result output:
3.1412376