避免过度并发

Contents

避免过度并发#

相比普通函数调用,并发或者分布式调用需要更多的工作负载。并发调用过于细粒度的作业可能会比普通函数调用更低效。为了避免上述问题,需谨慎设计函数并规划并发作业。如果需要并发调用一个细粒度的作业,可以使用批量处理,让每一次调用处理更多有效工作。

使用示例#

# demo.py
import time
import numpy as np
import yr

yr.init()


def process_ordinary(batch):
    time.sleep(0.00001)
    # 使用 numpy 向量化计算,极快
    return batch * 2


@yr.invoke
def process_batch(batch):
    # 使用 numpy 向量化计算,极快
    return process_ordinary(batch)


# 将数据切分为较大的块(Chunks)
datas = np.arange(1000)

start_time = time.time()
results = [process_ordinary(data) for data in datas]
end_time = time.time()
print(f"Ordinary function call takes {end_time - start_time} seconds")
# Ordinary function call takes 0.07148480415344238 seconds

start_time = time.time()
results = yr.get([process_batch.invoke(data) for data in datas])
end_time = time.time()
print(f"Parallelizing tasks takes {end_time - start_time} seconds")
# Parallelizing tasks takes 6.002047777175903 seconds

chunks = np.array_split(datas, 10)  # 拆分成 10 个大任务
start_time = time.time()
results = yr.get([process_batch.invoke(chunk) for chunk in chunks])  # 只有 10 次调度开销,计算效率极高
end_time = time.time()
print(f"Parallelizing tasks with batching takes {end_time - start_time} seconds")
# Parallelizing tasks with batching takes 0.016678571701049805 seconds

yr.finalize()

上述例子展示了并发调度的效率不如普通调用的效率。但是通过批量处理,使并发中的每一次调用处理了更多的作业,从而实现了预期的处理效率。