浅谈Rust和Golang协程设计

根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。

何为有栈协程——以goroutine为例

根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。

从逻辑上来说,协程和线程的主要区别,在于协程是协作式处理多任务,而线程是抢占式处理多任务。协程之间的切换一般不涉及系统调用,在用户态就可以完成。

在Golang中,使用go关键字,可以将函数立即创建为一个goroutine。例如main.go文件内容如下

package main

func add(a, b int64) (int64, int64) {
	var tmp int64 = 1
	tmp = tmp + a
	return a + b, a - b
}

func main() {
	var c int64 = 10
	var d int64 = 12
	go add(c, d)
}

使用go tool命令,可以其编译为汇编代码(部分前面的汇编与主题无关,略去),进一步查看go关键字底层的实现机制。

go tool compile -N -l -S main.go

SP是栈指针寄存器,一般指向局部调用栈的栈顶,也可以用来在函数调用时传参。SB是静态区寄存器,用来获取函数指针。函数调用或创建。通过SUBQ指令修改SP的值,分配新的栈空间,通过ADDQ指令修改SP的值,回收或释放栈空间。

        0x001d 00029 (main.go:10)       MOVQ    $10, "".c+56(SP)
        0x0026 00038 (main.go:11)       MOVQ    $12, "".d+48(SP)
        0x002f 00047 (main.go:12)       MOVL    $32, (SP);
        0x0036 00054 (main.go:12)       LEAQ    "".add·f(SB), AX;将函数指针保存到AX寄存器
        0x003d 00061 (main.go:12)       MOVQ    AX, 8(SP);将AX寄存器保存到新分配的8字节
        0x0042 00066 (main.go:12)       MOVQ    "".c+56(SP), AX;将参数c保存到AX寄存器
        0x0047 00071 (main.go:12)       MOVQ    AX, 16(SP);将AX寄存器保存到新分配的8字节
        0x004c 00076 (main.go:12)       MOVQ    $12, 24(SP);将参数d保存到新分配的8字节
        0x0055 00085 (main.go:12)       PCDATA  $1, $0;和GC相关,可忽略
        0x0055 00085 (main.go:12)       CALL    runtime.newproc(SB);!!此处创建goroutine
        0x005a 00090 (main.go:13)       MOVQ    64(SP), BP;一般是接收返回值
        0x005f 00095 (main.go:13)       ADDQ    $72, SP;回收main本地栈空间
        0x0063 00099 (main.go:13)       RET

可以看到源码12行的go关键字,实际调用了runtimenewproc函数。

而在newproc函数内部,先在stack上分配了一段连续的栈空间(通常是2KB,栈帧最小值),也可以叫做栈帧(stackframe),将通过SP传入的参数值拷贝到这个空间中,且这个空间仅属于这个goroutine(也就是GPM模型里的g)。然后把goroutine加入到执行队列中,供调度器去调度执行。

以上整个过程作为一个函数,运行在system stack上,可简化视作直接调用函数。

关于函数——func systemstack(fn func()) systemstack is being called from the limited stack of an ordinary goroutine. In this case, systemstack switches to the per-OS-thread stack, calls fn, and switches back.

systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})

如果goroutine执行过程中,预先分配的栈空间不足,那么会分配更大的一块栈空间,并将旧的栈内容完全拷贝到新的空间中去,栈里的内容不会被其他goroutine共享。

// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g

我们可以在add函数中,增加以下几行代码,可以打印goroutine调用堆栈的信息。

var (
		buf [256]byte
		n   = runtime.Stack(buf[:], false)
		stk = string(buf[:n])
	)
	println(stk)

输出内容内,可以看到包含了传入参数的值10和12,以及add函数指针的值,另外,也可以通过这种方法获取goroutine的ID。

goroutine 5 [running]:
main.add(0xa, 0xc, 0x1068800, 0xc00001c0b8)
        main.go:11 +0x69

有栈协程的好处,由于栈帧可以直接完全保存运行期上下文(主要是寄存器值),因此可以在任何时刻暂停协程的运行,这就很方便地支持了抢占式的调度器。而无栈协程的上下文是一般通过类似结构体的方式保存在内存中,它依赖使用者显式地切换协程,否则协程不会主动让出执行权。

另外,有栈协程更方便将同步代码改造为异步代码,就像我们的例子一样,只需改动一行,加上go关键字就可以了。而无栈协程,同步改造为异步则更为复杂,甚至会导致牵一发动全身(async关键字扩散问题)。

Rust无栈协程

既然已经有了有栈协程,那么无栈协程是否还有优势呢。答案肯定的!

通常,无栈协程在内存空间和协程上下文切换的效率更高。值得说明的是,无栈协程并不是说不需要运行时的栈空间,而是和协程的创建者共用同一块运行时的栈空间。

如果一定要用一句话概括无栈协程,那就是:无栈协程可以看做是有状态的函数(generator),每次执行时会根据当前的状态和输入参数,得到(generate)输出,但不一定为最终结果

Async-await

在Rust中,async fn用来定义一种可以在执行中暂停的函数,通过await将控制权转移给runtime,等一段时间之后被重新唤醒执行。

调用async fn所产生的返回值被包在Future中。但与其他语言不同,直接调用async fn,异步函数不会立即被调度器调度执行,只有调用方通过future.await才能实际触发async fn的执行,并拿到结果。

对于Rust普通业务开发来说,一般来说只需要使用底层库提供的异步API,结合asyncawait关键字,就可以实现程序整体的异步化。

浅谈Rust和Golang协程设计

如果要搞懂异步API的实现,不断优化程序性能,那么理解Rust异步的实现机制,就是必不可少的了。

运行前准备

运行测试代码前,先添加如下依赖

[dependencies]
futures = "0.3"
tokio = { version = "0.3", features = ["full"] }

以及导入依赖模块

use futures::Future;
use std::pin::Pin;
use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
    Arc,
};
use tokio::runtime::Runtime;
use futures::task::AtomicWaker;
use std::task::Context;
use std::task::Poll;
use std::{thread, time::Duration};

我们首先定义一个结构体Test,用来模拟计算的过程。

struct Test {
    waker: Arc<AtomicWaker>, // 用来通知Executor执行poll
    result: AtomicU64,       // 用来暂存每次运行的中间结果
    signal: Arc<AtomicBool>, // 用来模拟事件消息
}

Future

提到Rust协程,首先需要介绍的是Future Trait,它是理解协程整个执行过程的关键。对于Golang使用者来说,可以将Trait看做Golang里的interface,Trait包含可以被实现的方法。

Future Trait仅定义了一个方法,那就是poll,顾名思义,这个方法可以被调用很多次。Future Trait可以看做是计算过程的抽象。调用poll方法意味着:异步计算过程由于某种原因暂停,或满足执行条件继续执行,直至计算出最终的结果。

poll方法的签名如下,Self可以是用户自定义的结构体,用来保存每一次调用poll方法计算的结果。

Context目前仅仅是对Waker的封装,Waker是唤起再次执行任务的结构体,后面会详细介绍。

poll方法返回的Poll是一个枚举值,表示本次调用后,当前协程需要暂停等待(Pending),还是已经完成(Ready)。

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>

我们的Test实现了Future Trait,代码如下:

impl Future for Test {
    type Output = u64;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let result = self.result.load(SeqCst);
        self.waker.register(cx.waker());
        println!("poll called {}", result);
        self.result.store(result + 1, SeqCst);
        if self.signal.load(SeqCst) {
            println!("poll ready");
            Poll::Ready(result)
        } else {
            println!("poll not ready");
            Poll::Pending
        }
    }
}

既然每次执行poll,返回值是个枚举值,那么把协程的执行,看做状态机的状态转换过程,是不是也是自然而然了呢?答案是肯定的。

浅谈Rust和Golang协程设计

现在,我们对于Future有了明确的概念 ,那么一个协程是如何在暂停后被唤醒的?该Waker发挥它的作用了。

Waker

Future不同,Future是一个Trait,抽象了异步的计算过程,对不同的异步执行场景,它的实现不同。而Waker的功能就简单多了,对一些系统资源(fs, timer, channel, socket),它可以作为回调会被注册到系统的事件循环(event loop),每次事件产生后,Waker负责告诉调度器:我收到了一个事件通知,这个Future此时需要执行下(poll)。也可以使用代码驱动协程执行,我们的例子中就是这样实现的。

fn main() {
    let a = AtomicU64::new(0);
    let b = Arc::new(AtomicBool::new(false));
    let waker = Arc::new(AtomicWaker::new());
    let waker_clone = waker.clone();
    let c = b.clone();
    thread::spawn(move || {
        let t = Test {
            waker: waker,
            result: a,
            signal: b,
        };
        let rt = Runtime::new().unwrap();
        rt.spawn(async {
            let result = t.await;
            println!("finally result is {}", result);
        });
        thread::sleep(Duration::from_secs(10));
        println!("thread runtime exited");
    });
    thread::sleep(Duration::from_secs(2));
    c.store(true, SeqCst);
    println!("notify to poll");
    waker_clone.wake();
    thread::sleep(Duration::from_secs(2));
}

首先,我们要在实现Future的结构体中创建一个AtomicWaker,它的内部其实是一个自旋锁,每次调用register其实都会更新内部Waker

waker: Arc<AtomicWaker>

每次调用poll时,我们都要调用register将Context入参中的Waker取出,保存在waker字段中。

self.waker.register(cx.waker());

这样,我们在main thread中,就可以通过waker字段保存的值,唤醒Executor再次执行任务。

let waker_clone = waker.clone();
waker_clone.wake();

我们不难发现,wake方法执行的操作是和Executor实现有关的,而RawWaker是Rust标准库中就包括的结构体,且不是Trait,这是怎么做到的?答案是:通过虚拟函数指针表实现(Virtual function pointer table),简而言之就是结构体内保存的不是具体实现,而是函数指针,由不同的Executor进行初始化创建。

// RawWakerVTable用来定义函数指针表的字段
clone: unsafe fn(*const ()) -> RawWaker
wake: unsafe fn(*const ())
wake_by_ref: unsafe fn(*const ())
drop: unsafe fn(*const ())

在tokio库中,初始化这个函数指针表,而当我们实际调用wake的时候,调用的其实是tokio库中的实现,

RawWakerVTable::new(
    clone_arc_raw::<W>,
    wake_arc_raw::<W>,
    wake_by_ref_arc_raw::<W>,
    drop_arc_raw::<W>,
)
浅谈Rust和Golang协程设计

当事件产生后或满足执行条件,wake会导致Executor去尝试执行这个异步函数(poll)。Executor一般来说有如下几种实现思路:

  1. Waker修改一个全局的原子布尔值(AtomicBool)表示Executor当前是否可执行,这种方法的缺点是Executor同时只能运行最多一个异步函数,一般用于嵌入式平台上。
  2. Executor使用一个map保存全部待执行的异步函数,每个函数对应一个Task IDWaker将可执行的Task ID发送给ExecutorExecutor就可以执行对应的异步函数了。
  3. Executor使用一个或多个队列保存待执行的任务,Waker同时是指向某个任务的带引用计数功能的指针,当Waker发现任务可执行时,将自身放入Executor的执行队列中即可。

通常采用第三种方式实现。例如tokio库,实现是这样的:

//harness.rs
pub(super) fn wake_by_ref(&self) {
    if self.header().state.transition_to_notified() {
        self.core().schedule(Notified(self.to_task()));
    }
}

可以看到,wake调用后,任务便进入了调度,Scheduler分配线程后,由Executor执行。

Executor

Rust的Future仅仅定义了异步计算过程,还需要一个调度执行的角色——它实际驱动着整个异步计算流程进行。这个角色就叫做Executor

Executor每时每刻可能要执行成千上万个异步任务 ,因此它需要队列来管理这些任务,使用一个/多个线程执行这些就绪的任务。

当调用Runtime传入Future时,tokio库会先将Future封装成一个Task,然后放入内部的全局队列。

// 向Runtime提交一个新的Future
let rt = Runtime::new().unwrap();
rt.spawn(async {
    let result = t.await;
    println!("finally result is {}", result);
});
// Runtime全局队列字段
inject: queue::Inject<Arc<Worker>>

然后,查看当前有几个空闲线程,找出一个空闲线程执行

// 查找空闲线程的过程
fn notify_parked(&self) {
    if let Some(index) = self.idle.worker_to_notify() {
        self.remotes[index].unpark.unpark();
    }
}

unpark调用的实际上是parking_lot这个库,这个库会唤醒等待的线程

fn unpark_condvar(&self) {
    drop(self.mutex.lock());
    self.condvar.notify_one()
}

线程启动后,从本地的工作队列获取任务,开始执行,如果本地工作队列没有任务,那么可以从其他线程的执行队列中偷取任务。

fn run( & self, mut core: Box < Core > ) - > RunResult {
    while !core.is_shutdown {
        // Increment the tick
        core.tick();

        // Run maintenance, if needed
        core = self.maintenance(core);

        // First, check work available to the current worker.
        if let Some(task) = core.next_task( & self.worker) {
            core = self.run_task(task, core) ? ;
            continue;
        }

        // There is no more **local** work to process, try to steal work
        // from other workers.
        if let Some(task) = core.steal_work( & self.worker) {
            core = self.run_task(task, core) ? ;
        } else {
            // Wait for work
            core = self.park(core);
        }
    }

    // Signal shutdown
    self.worker.shared.shutdown(core, self.worker.clone());
    Err(())
}
浅谈Rust和Golang协程设计

可以看到,Executor核心逻辑其实并不复杂,单纯从队列中获取Task然后执行,不过加入了一些从其他线程队列中偷取任务的逻辑。用户也可以实现自己的Executor。

小结

三年前开始接触Golang开发,很快就被Golang简洁、清晰的开发模式所吸引。曾经Java里并发编程的繁琐过程,此刻只需要一个go关键字就能代替。goroutine这种有栈协程的设计,最大程度模仿了OS Thread的执行过程,对开发人员非常友好。美中不足的是,runtime为goroutine的实现封装了大量实现细节,而这些细节对于不了解runtime的使用者来说,是很难修改和调试的。

Rust和Golang类似,也是一门非常年轻的语言,但它遵循的原则是zero-cost abstraction,目的是最终生成安全高效的程序。为了实现这个目的,Rust既有编译器的严格检查、无runtime、无GC的设计,又有对于标准库范围的严格限制。Rust并发编程的实现思路,也充分体现着上述的特点:

  1. 标准库直接支持Native Thread。
  2. 语言支持关键字Async-await和Future Trait,但Executor Reactor Scheduler交给第三方库实现。
  3. 第三库对协程的实现细节用户完全可见,因为无栈协程的结构简单,代码可读性要远好于golang runtime相关的代码。

Golang和Rust现在都在快速发展当中,它们虽然设计思想上大相径庭,但追求更易用、更安全、更高性能的初心,是不会变的。

参考资料

https://www.programmersought.com/article/8537662156/

http://www.mit.edu/afs.new/sipb/project/golang/doc/asm.html

https://golang.org/src/runtime/stubs.go

https://mthli.xyz/stackful-stackless/

https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/

https://samsartor.dev/coroutines-1/

https://rust-lang.github.io/async-book/02_execution/01_chapter.html

https://boats.gitlab.io/blog/post/wakers-i/

本文来自腾讯云计算社区,转载请注明出处:https://computeinit.com/archives/2904

发表评论

登录后才能评论
交流群