[toc]
Go并发
goroutine
Go的并发是指让某个函数独立于其他函数运行的能力。当函数创建为goroutine时,Go将会将其视为一个独立的工作单元。这个单元会通过Go调度器被调度到逻辑处理器上执行。
Go调度器能管理被创建的所有goroutine并为其分配执行时间。调度器在操作系统之上,将操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器运行goroutine。它能够在任何给定的时间,全面控制哪个goroutine在哪个逻辑处理器上运行。每个逻辑处理器都分别绑定在单个操作系统线程上。
Go的并发模型来自于CSP(Communicatting Sequential Process) 通信顺序进程。它时一种消息模型,通过goroutine之间传递数据来传递消息。而不是通过对数据加锁来实现同步访问。
过程
执行系统IO
如果创建一个goroutine并准备运行,这个goroutine就会被放到调度器的全局运行队列中,之后,就会为这个goroutine分配一个逻辑处理器,并被放到逻辑处理对应的本地队列中等待执行。
正在运行的goroutine要执行一个阻塞的系统调用,如果读文件的IO操作。线程和goroutine会从逻辑处理器上分离。该线程会继续阻塞,等待系统调用的返回。
与此同时,逻辑处理器就失去了用来运行的线程。所以调度器会创建一个新线程,并将其绑定在该逻辑处理器上。之后,调度器会选择本地运行队列里的另外一个goroutine来运行。
一旦被阻塞的系统调用执行完并返回,对应的goroutine会放回到本地运行队列,而之前的线程会保存好,以便后续可以继续使用。
执行网络IO
如果goroutine要做一个网络IO的调用。goroutine会和逻辑处理器分离(注意线程和逻辑处理器不分离),并移到集成了网络轮询器的运行时。一旦轮询器指示某个网络读或者写操作就绪后,对应的goroutine就会重新分配到逻辑处理上完成操作。
调度器创建的逻辑处理器没有限制,但是语言运行时默认限制每个程序最多创建10000个线程。
一个goroutine在工作结束前,可以被停止并重新调度。调度器这么做的目的就是防止某个goroutine占用逻辑处理器的时间太长。此时,调度器会停止当前正在运行的goroutine,并给其他可运行的goroutine运行的机会.
go关键字修饰的函数,可以像普通的函数一样,传递参数进去,但是goroutine终止时,是无法获取函数的返回值的。
使用
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
// 分配1个逻辑处理器
runtime.GOMAXPROCS(2)
//wg 使用WaitGroup等待goroutine完成,类似于信号量
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start goroutine")
go func() {
defer wg.Done() // 保证每个工作完成都调用wg.Done方法
for count := 0; count < 2; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}() // 后加括号表示立即调用
go func() {
defer wg.Done()
for count := 0; count < 2; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
fmt.Printf("wait for finish\n")
wg.Wait()
fmt.Printf("\n terminal")
}
锁机制
Go 提供了同步goroutine的机制,就是对共享资源加锁,可以使用atomic和sync包里面的函数实现顺序访问。
原子函数
原子函数能够以底层的加锁机制来同步访问整形变量和指针。
Atomic包提供类似于Java中Java中的原子操作。比如atomic.AddInt64等。
同时还提供原子读和原子写,LoadInt64和StoreInt64
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
counter int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go increate(1)
go increate(2)
wg.Wait()
fmt.Println("Final counter:", counter)
}
func increate(num int) {
defer wg.Done()
for count := 0; count < 2; count++ {
// 原子添加
atomic.AddInt64(&counter, 1)
// 当前goroutine 从线程退出,并放回队列
runtime.Gosched()
}
}
互斥锁
互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行这个临界区代码
package main
import (
"fmt"
"runtime"
"sync"
)
var (
counter int64
wg sync.WaitGroup
// 定义互斥锁
mutex sync.Mutex
)
func main() {
wg.Add(2)
go increate(1)
go increate(2)
wg.Wait()
fmt.Println("Final counter:", counter)
}
func increate(num int) {
defer wg.Done()
for count := 0; count < 2; count++ {
mutex.Lock() // 加锁
{
value := counter
value++
runtime.Gosched()
counter = value
}
mutex.Unlock() // 释放锁
}
}
通道Channel
Go中,可以使用原子函数和互斥锁来保证对共享资源的安全访问和消除竞争状态,还可以使用通过,通过发送和接收需要共享的资源,在goroutine之间做同步。
Channel又分有缓存的通道和无缓存的通道。
- 无缓存通道(unbuffered channel)要求发送goroutie和接收goroutine同时准备好,才能发送发送和接收操作。如果有两者没有准备好,通道会导致先执行发送或者接收操作的goroutine进行阻塞。这种交互行为时同步的
- 有缓存通道(buffered channel)是在被接受前能存储一个或者多个值的通道。它不强制要求goroutine之间完成发送和接收。只有通道中没有接收的值时,接收动作才会被阻塞。或者通道没有缓存区容纳被发送的值,发送动作才会阻塞。
对于有缓存的通道,当通道关闭后,goroutine 依旧可以从通道中接收数据,但是不允许向通道发送数据。这样能够保证通道关闭后,不会有数据丢失。如果通道内没有数据,则会返回立即返回通道类型的零值,或者获取值时加入可选标志,得到通道信息。
通道的使用
在使用通道过程中,箭头指向接收的变量
同时,当通道不再用的时候,需要对通道进行关闭
// 声明无缓存的通道
count := make(chan int)
// 声明有缓存的通道
count1 := make(chan int)
// 从通道中添加值
count <- 1
// 从通道中取值
value, ok := <-count
// 关闭通道,必须要记得关闭
clost(count)