Go并发模式Actor还是CSP详细介绍

Posted by     "richie" on Monday, March 16, 2020

Go并发模式Actor还是CSP详细介绍

本文摘自Go语言中文网,原文中有些错误,已经修改,在此只为做记录学习.

基本概念

了解并发和并行

  • 并发: 强调一段时间做多件事
  • 并行: 强调同一时间做多件事

CSP vs Actor 模型

Actor

Actor

Actor 模型是一个通用的并发编程模型, 可以应用在几乎任何一种编程语言中, 典型的是Erlang. 多个Actor(进程)可以同时运行,不共享状态, 通过向与进程绑定的消息队列(也成为信箱)异步发送消息来进行通信。

图例中Actor-1Actor-2 进程通信依赖一个消息队列,而且消息队列与进程互相耦合绑定. Actor-1 在发送完消息之后, 在Actor-2没有处理该消息的情况下, 可以继续执行其他任务, 这说明Actor进程之间的通信是异步的.

  • 优点:

1: 消息传输和封装, 多个Actor可以同时运行, 但不共享状态, 而且单个Actor中的事件是串行执行(这归功于队列)
2: Actor 模型支持共享内存模型, 也支持分布式内存模型

  • 缺点:

1: 尽管Actor模型比使用线程和锁模型的程序更易debug, 但是也会存在死锁的问题,而且还需要担心绑定进程的队列溢出的问题
2: 没有对并行提供直接支持, 需要通过并发的技术来构造并行方案.

CSP

csp

CSP 即通信顺序进程(communicating sequential processes), 与Actor模型类似, 该模型也是由独立的, 并发执行的实体组成, 实体之间通过发送消息进行通信. go中的csp模型channel对于goroutine来说是匿名的, 不需要和gid绑定, 通过channel完成goroutine之间的通信.(channelcsp代表通道的概念, 这里只讨论Go相关,channel等价于Go中的channel)

  • 优点:

1: 与Actor 相比, CSP最大的优点是灵活.Actor模型,负责通信的媒介和执行单元是耦合的. 而csp 中, channel是第一类对象,可以被独立创造, 写入, 读取数据,也可以在不同的执行单元中传递.

  • 缺点:

CSP模型也易受死锁影响, 且没有提供直接的并行支持. 并行需要建立在并发的基础上,引入了不确定性.

区别

  • Actor 模型重在参与交流的实体(即进程), 而CSP重在交流的通道, 如Go中的channel
  • CSP 模型不关注发送消息的进程, 而是关注发送消息时使用的channel, 而channel 不像Actor模型那样进程和队列紧耦合. 而是可以单独创建和读写, 并在进程(goroutine) 之间传递.

Go 中的并发模型

Go是采用的CSP 的思想, channelgo在并发编程通信的推荐手段, Go 的设计者Rob Pike有一句经典的名言:

Do not communicate by sharing memory; instead, share memory by communicating.

这句话是说"不要使用共享内存通信, 而是应该使用通信去共享内存", Go语言推荐我们使用通信来进行进程间同步消息. 而这样做有三点好处, 来源于draveness的文章.

  • 1: 首先,使用发送消息来同步信息相比于直接使用内存和互斥锁是一种更高级的抽象, 使用更高级的抽象能够为我们在程序设计上提供更好的封装,让程序的逻辑更加清晰;

  • 2: 其次, 消息发送在解耦方面与共享内存相比也有优势, 我们可以将线程的职责分成生产者和消费者, 并通过消息传递的方式将他们解耦, 不需要再依赖共享内存.

  • 3: 最后, Go 语言选择消息发送的方式, 通过保证同一时间只有一个活跃的线程能够访问数据, 能够从设计上天然地避免线程竞争和数据冲突的问题;

并发设计模式

上文介绍了Go中使用的并发模型,而在这种并发模型下面channel 是一个重要的概念,而下面每一种模式的设计都依赖于channel,所以有必要了解一下.

Barrier模式

barrier 屏障模式故名思议 就是一种屏障, 用来阻塞到聚合所有goroutine返回结果. 可以使用channel来实现.

使用场景

  • 多个网络请求并发, 聚合结果
  • 粗粒度任务拆分并发执行, 聚合结果

barrier

代码实现

/*
Barrier
*/
package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "time"
)

type barrierResp struct {
    Err    error
    Resp   string
    Status int
}

// 构造请求
func makeRequest(out chan<- barrierResp, url string) {
    res := barrierResp{}

    client := http.Client{
        Timeout: time.Duration(20 * time.Second),
    }
    resp, err := client.Get(url)
    if resp != nil {
        res.Status = resp.StatusCode
    }
    if err != nil {
        res.Err = err
        out <- res
        return
    }
    byt, err := ioutil.ReadAll(resp.Body)
    defer resp.Body.Close()

    if err != nil {
        res.Err = err
        out <- res
        return
    }
    res.Resp = string(byt)
    out <- res
}

// 合并结果
func barrier(endpoints ...string) {
    requestNumber := len(endpoints)

    in := make(chan barrierResp, requestNumber)
    response := make([]barrierResp, requestNumber)
    defer close(in)

    for _, endpoint := range endpoints {
        go makeRequest(in, endpoint)
    }

    var hasError bool

    for i := 0; i < requestNumber; i++ {
        resp := <-in
        if resp.Err != nil {
            fmt.Println("ERROR:", resp.Err, resp.Status)
            hasError = true
        }
        response[i] = resp
    }
    if !hasError {
        for _, resp := range response {
            fmt.Println(resp.Status)
        }
}

}

func main() {
    barrier([]string{"https://www.baidu.com", "http://www.sina.com.cn", "https://segmentfault.com/"}...)
}

Tips

Barrier模式也可以使用errgroup扩展库来实现, 这样就更加简单明了.这个包有点类似于sync.WaitGroup, 但是区别是当其中一个任务发生错误时,可以返回该错误. 而这也满足我们Barrier模式的需求.


func barrier(endpoints ...string) {
    var g errgroup.Group
    var mu sync.Mutex

    response := make([]barrierResp, len(endpoints))

    for i, endpoint := range endpoints {
        i, endpoint := i, endpoint // create local for closure below
        g.Go(func() Error {
            res := barrierResp{}
            resp, err := http.Get(endpoint)
            if err != nil {
                return err
            }
             byt, err := ioutil.ReadAll(resp.Body)
             defer resp.Body.Close()
             if err != nil {
                return err
             }

             res.Resp = string(byt)
             mu.Lock()
             response[i] = res
             mu.Unlock()
             return err
            })
        if err := g.Wait(); err != nil {
            fmt.Println(err)
        }
        for _, resp := range response {
            fmt.Println(resp.Status)
        }
        }

}

Future 模式

future即未来, 来自未来的模式. 这个模式常用在异步处理也成为Promise模式,采用一种fire-and-forget 的方式,是指主goroutine 不等子goroutine执行完成直接返回了,然后等到未来执行完的时候再去取结果. 在Go中由于goroutine的存在,实现这种模式是挺简单的.

使用场景

异步

代码实现

/*
*Future
*/
package main

import "fmt"

type Function func(string) (string, error)

type Future interface {
    SuccessCallback() error
    FailCallback() error
    Execute(Function) (bool, chan struct{})
}

type AccountCache struct {
    Name string
}

func (a *AccountCache) SuccessCallback() error {
    fmt.Println("It's success~~")
    return nil
}

func (a *AccountCache) FailCallback() error {
    fmt.Println("It's fail!~")
    return nil
}

func (a *AccountCache) Execute(f Function) (bool, chan struct{}) {
    done := make(chan struct{})
    go func(a *AccountCache) {
        _, err := f(a.Name)
        if err != nil {
            _ = a.FailCallback()
        } else {
            _ = a.SuccessCallback()
        }
        done <- struct{}{}
    }(a)
    return true, done
}

func NewAccountCache(name string) *AccountCache {
    return &AccountCache{Name: name,}
}

func testFuture() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error) {
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <- done
    }()

}

func main() {
    var future Future
    future = NewAccountCache("Tom")
    updateFunc := func(name string) (string, error) {
        fmt.Println("cache update:", name)
        return name, nil
    }
    _, done := future.Execute(updateFunc)
    defer func() {
        <- done
    }()
    fmt.Println("balbalbalba")
// do something
}

这里有一个技巧: 为什么使用struct类型作为channel的通知?
很多开源代码都是使用这种方式来作为信号通知机制, 主要是因为空struct的在Go中占的内存是最少的.

Pipline 模式

使用场景

  • 可以利用多核的优势把一段粗粒度逻辑分解成多个goroutine执行, Pipline本身翻译过来就是管道的意思,注意和Barrier模式不同的是,它是按顺序的,类似于流水线.

  • pipline 可以分simple, fan in And fan out 模式

simple pipline

fan-out-fan-in

上面两个图不是很能表达并行的概念, 其实三个goroutine 是同时执行的,通过buffer channel 将三者串起来,只要前面groutine 处理完一部分数据,就往下传递, 达到并行的目的.

simple 模式代码实现

实现一个功能, 给定一个切片,然后求它的子项的平方和. 例如:

[1,2,3] -> 1^2 + 2^2 + 3^2 = 14

正常的逻辑, 遍历切片, 然后求平方累加. 使用pipline模式, 可以把求和和求方平 拆分出来并行.

package main

import (
    "fmt"
)

/*
* Pipeline mode
 */

func generator(max int) <-chan int {
    out := make(chan int, 100)
    go func() {
        defer close(out)
        for i := 1; i <= max; i++ {
            out <- i
        }
        }()
    return out
}

func power(in <-chan int) <-chan int {
    out := make(chan int, 100)
    go func() {
        defer close(out)
        for v := range in {
            out <- v * v
        }
    }()
    return out
}

func sum(in <-chan int) <-chan int {
    out := make(chan int, 100)
    go func() {
        var sum int
        defer close(out)
        for v := range in {
            sum += v
        }
        out <- sum
        }()
    return out
}

func main() {
    fmt.Println(<-sum(power(generator(5))))

}

Workers Pool模式

使用场景

  • 高并发任务

Gogoroutine已经足够轻量, 甚至net/http server 的处理方式也是goroutine-per-connection 的, 所以比其他语言来说可能场景稍微少一些. 每个goroutine的初始内存消耗在2~8kb, 当我们有大批量任务的时候, 需要起很多goroutine 来处理, 这会给系统带来很大的内存开销和GC压力, 这个时候就可以考虑一下协程池.

代码实现

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

/*
* Worker Pool
 */

type TaskHandler func(interface{})

type Task struct {
    Param   interface{}
    Handler TaskHandler
}

type WorkerPoolImpl interface {
    AddWorker()    // 增加worker
    SendTask(Task) // 发送任务
    Release()      //释放
}

type WorkerPool struct {
    wg sync.WaitGroup
    inCh chan Task
}

func (d *WorkerPool) AddWorker() {
    d.wg.Add(1)
    go func() {
        for task := range d.inCh {
            task.Handler(task.Param)
        }
        d.wg.Done()
    }()
}

func (d *WorkerPool) Release() {
    close(d.inCh)
    d.wg.Wait()
}
func (d *WorkerPool) SendTask(t Task) {
    d.inCh <- t

}

func NewWorkerPool(buffer int) WorkerPoolImpl {
    return &WorkerPool{
        wg:   sync.WaitGroup{},
        inCh: make(chan Task, buffer),
    }
}

func main() {
    bufferSize := 100
    var workerPool = NewWorkerPool(bufferSize)
    workers :=4
    for i :=0; i < workers; i++ {
        workerPool.AddWorker()
    }

    var sum int32
    testFunc := func(i interface{}) {
        n := i.(int32)
        atomic.AddInt32(&sum, n)
    }
    var i, n int32
    n = 1000
    for ; i < n; i++ {
        task := Task{
            i,
            testFunc,
        }
        workerPool.SendTask(task)

    }
    workerPool.Release()
    fmt.Println(sum)
}

协程池使用了反射来获取执行的函数及参数,在Go中可能有点让人不舒服. 但是如果批量执行的函数是已知的, 可以优化成一种只执行指定函数的协程池,能够提升性能.

Pub/Sub 模式

发布订阅是一种消息通知模式, 发布者发送消息, 订阅者接受消息.

使用场景

消息队列

代码实现

package main

import (
    "fmt"
    "time"
)

/*
* Pub/Sub
 */

// 订阅者需要实现的方法
type SubscriberImpl interface {
    Notify(interface{}) error
    Close()
}
type Subscriber struct {
    in    chan interface{}
    id    int
    topic string
    stop  chan struct{}
}

func (s *Subscriber) Close() {
    s.stop <- struct{}{}
    close(s.in)
}

func (s *Subscriber) Notify(msg interface{}) (err error) {
    defer func() {
        if rec := recover(); rec != nil {
            err = fmt.Errorf("%#v", rec)
        }
    }()

    select {
    case s.in <- msg:
    case <-time.After(time.Second):
        err = fmt.Errorf("Timeout \n")
    }
    return
}

func NewSubscriber(id int) SubscriberImpl {
    s := &Subscriber{
        in:   make(chan interface{}),
        id:   id,
        stop: make(chan struct{}),
    }
    go func() {
        for {
            select {
            case <-s.stop:
                close(s.stop)
                return
            default:
                for msg := range s.in {
                    fmt.Printf("(W%d):%v\n", s.id, msg)
                }
        }

        }
    }()
    return s
}

type publisher struct {
    subscribers []SubscriberImpl
    addSubCh    chan SubscriberImpl
    removeSubCh chan SubscriberImpl
    in          chan interface{}
    stop        chan struct{}
}

// sub 订阅pub
func Register(sub *SubscriberImpl, pub *publisher) {
    pub.addSubCh <- *sub
    return
}

// 实例化
func NewPublisher() *publisher {
    return &publisher{
        addSubCh:    make(chan SubscriberImpl),
        removeSubCh: make(chan SubscriberImpl),
        in:          make(chan interface{}),
        stop:        make(chan struct{}),
    }
}

// 监听
func (p *publisher) start() {
    for {
        select {
        // pub 发送消息
        case msg := <-p.in:
            for _, sub := range p.subscribers {
                _ = sub.Notify(msg)
            }
        // 移除指定sub
 
        case sub := <-p.removeSubCh:
            for i, candidate := range p.subscribers {
                if candidate == sub {
                    p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                    candidate.Close()
                    break
                }
            }
            // 增加一个sub
            case sub := <- p.addSubCh:
                p.subscribers = append(p.subscribers, sub)
 
                // 关闭 pub
                case <- p.stop:
                    for _, sub := range p.subscribers {
                        sub.Close()
                    }
                    close(p.addSubCh)
                    close(p.in)
                    close(p.removeSubCh)
                    return
            }
    }
}

func main() {
    //测试代码
    pub := NewPublisher()
    go pub.start()

    sub1 := NewSubscriber(1)
    Register(&sub1, pub)

    sub2 := NewSubscriber(2)
    Register(&sub2, pub)

    commands := []int{1,2,3,4,5,6,7,8,9}

    for _, c := range commands{
        pub.in <- c
    }

    pub.stop <- struct{}{}
    time.Sleep(time.Second * 2)
}

注意事项

1: 同步问题,尤其是同步原语和channel一起使用时, 容易出现死锁 2: goroutine 崩溃问题, 如果子goroutine panic 没有recover 会引起主goroutine 异常退出 3: goroutine 泄露问题, 确保goroutine 能正常关闭.

「真诚赞赏,手留余香」

Richie Time

真诚赞赏,手留余香

使用微信扫描二维码完成支付