并发基础
多进程
多线程基于回调的非阻塞 /异步 IO协程
协程
与 传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不 会导致系统资源衰竭,
而线程和进程通常最多也不能超过 1万个。这也是协程也叫轻量级线程的 原因。 多数语言在语法层面并不直接支持协程,而是通过库的方式支持,但用库的方式支持的功能 也并不完整, 比如仅仅提供轻量级线程的创建、销毁与切换等能力。如果在这样的轻量级线程中 调用一个同步 IO 操作, 比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程, 从而无法真正达到轻量级线程本身期望达到的目标。
Go 语言在语言级别支持轻量级线程,叫goroutine。Go 语言标准库提供的所有系统调用操作 (当然也包括所有同步 IO 操作),都会出让 CPU 给其他goroutine 。这让事情变得非常简单,让轻 量级线程的切换管理不依赖于系统的线程和进程,也不依赖于CPU的核心数量。
goroutine
在一个函数调用前加上 go 关键字,这次调用就会在一个新的 goroutine中并发执行。当被调用 的函数返回时,这个 goroutine也自动结束了。需要注意的是,如果这个函数有返回值,那么这个 返回值会被丢弃。
package main
import"fmt"
func Add(x, y int){
z := x + y
fmt.Println(z)
}
func main(){
for i :=0; i <10; i++{
go Add(i, i)
}
}
Go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出, 且程序并不等待其他goroutine(非主goroutine)结束。 所以看不到输出结果
要让主函数等待所有goroutine退出后再返回,如何知道goroutine都退出了呢?这就引出了多个 goroutine之间通信的问题。下一节我们将主要解决这个问题。
并发通信
在工程上,有两种最常见的并发通信模型:共享数据和消息。
共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的 数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无 疑是内存了,也就是常说的共享内存。
下面是共享内存的实现
var count int
func Count(lock*sync.Mutex){
lock.Lock()
count++
fmt.Println(count)
lock.Unlock()
}
func main(){
lock:=&sync.Mutex{}
for i :=0; i <10; i++{
go Count(lock)
}
for{
lock.Lock()
c := count
lock.Unlock()
runtime.Gosched()
if c >10{
break
}
}
}
Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。
Go语言提供的消息通信机制被称为channel,接下来我们将详细介绍channel。现在,让我们 用Go 语言社区的那句著名的口号来结束这一小节: “ 不要通过共享内存来通信,而应该通过通信来共享内存 。”
channel
channel是Go语言在语言级别提供的goroutine间的通信方式。
//声明一个chan
varch chan int
var mch map[string]chan bool
//声明并初始化一个int类型的chan
chan1 := make(chan int,1)
//将一个数据写入channel中
chan1 <-1
getchan1 :=<-chan1
从channel中取数据与写入数据
chan1 := make(chan int,1)
//将1写入channel中
chan1 <-1
//将一个数据从channel中读取到getchan1中
getchan1 :=<-chan1
fmt.Println(getchan1) //输出1
select
通过调用select()函数来监控一系列的文件句 柄。 一旦其中一个文件句柄发生了 IO动作,该 select() 调用就会被返回
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由 case 语句来描述。
与 switch 语句可以选择任何可使用相等比较的条件相比, select 有比较多的 限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作,大致的结构如下:
select{
case<-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <-1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
ch := make(chan int,1)
for{
select{
case ch <-0:
case ch <-1:
}
i :=<-ch
fmt.Println("Value received:", i)
}
缓冲机制
//创建一个带缓冲的channel
c := make(chan int,1024)
超时机制
在并发编程的通信过程中,最需要处理的就是超时问题,即向channel写数据时发现channel 已满,或者从 channel试图读取数据时发现 channel为空。
如果不正确处理这些情况,很可能会导 致整个 goroutine锁死。
因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。 所以可以使用select来避免goroutine阻塞问题
// 首先,我们实现并执行一个匿名的超时等待函数
timeout := make(chan bool,1)
go func(){
time.Sleep(1e9)// 等待1秒钟
timeout <-true
}()
// 然后我们把timeout这个channel利用起来
select{
case<-ch:
// 从ch中读取到数据
case<-timeout:
// 一直没有从ch中读取到数据,但从timeout中读取到了数据
}
这样使用select机制可以避免永久等待的问题
channel的传递
type PipeDatastruct{
value int
handler func(int)int
next chan int
}
func handle(queue chan *PipeData){
for data := range queue {
data.next<- data.handler(data.value)
}
}
单向channel
单向 channel只能用于发送或者接收数据 。
channel本身必然是同时支持读写的, 否则根本没法用。假如一个 channel真的只能读,那么肯定只会是空的,因为你没机会往里面写数 据。同理,如果一个channel只允许写,即使写进去了,也没有丝毫意义,因为没有机会读取里面 的数据。所谓的单向channel概念,其实只是对channel的一种使用限制。
我们在将一个channel变量传递到一个函数时,可以通过将其指定为单向channel变量,从
而限制该函数中可以对此channel的操作,比如只能往这个channel写,或者只能从这个 channel读。
var ch1 chan int// ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
var ch3 <-chan int// ch3是单向channel,只用于读取int数据
//单项channel初始化,ch4被转换为一个单项读channel和一个单向写channel
ch4 := make(chan int)
ch5 :=<-chan int(ch4)// ch5就是一个单向的读取channel
ch6 := chan<-int(ch4)// ch6 是一个单向的写入channel
关闭channel
close(ch)
如何判断一个channel是否已经被关 闭
x, ok :=<-ch //
返回值是false则表示ch已经被关闭。
多核并行化
type Vector[]float64
// 分配给每个CPU的计算任务
func (v Vector)DoSome(i, n int, u Vector, c chan int){
for; i < n; i++{
v[i]+= u.Op(v[i])
}
c <-1// 发信号告诉任务管理者我已经计算完成了
}
const NCPU =16// 假设总共有16核
func (v Vector)DoAll(u Vector){
c := make(chan int, NCPU)// 用于接收每个CPU的任务完成信号
for i :=0; i < NCPU; i++{
go v.DoSome(i*len(v)/NCPU,(i+1)*len(v)/NCPU, u, c)
}
// 等待所有CPU的任务完成
for i :=0; i < NCPU; i++{
<-c // 获取到一个数据,表示一个CPU计算完成了
}
// 到这里表示所有计算已经结束
}
在Go语言升级到默认支持多CPU的某个版本之前,我们可以先通过设置环境变量
GOMAXPROCS的值来控制使用多少个CPU核心。具体操作方法是通过直接设置环境变量
GOMAXPROCS的值,或者在代码中启动goroutine之前先调用以下这个语句以设置使用16个CPU
核心:
runtime.GOMAXPROCS(16)
到底应该设置多少个CPU核心呢,其实runtime包中还提供了另外一个函数NumCPU()来获
取核心数。可以看到,Go语言其实已经感知到所有的环境信息,下一版本中完全可以利用这些
信息将goroutine调度到所有CPU核心上,从而最大化地利用服务器的多核计算能力。抛弃
GOMAXPROCS只是个时间问题。 出让时间片
runtime.Gosched()
同步
同步锁
Go 语言包中的 sync 包提供了两种锁类型: sync.Mutex 和 sync.RWMutex
Mutex是最简单 的一种锁类型,同时也比较暴力,当一个 goroutine 获得了 Mutex 后,其他 goroutine 就只能乖乖等 到这个 goroutine 释放该 Mutex 。 RWMutex 相对友好些,是经典的单写多读模型。在读锁占用的情 况下,会阻止写,但不阻止读,也就是多个 goroutine 可同时获取读锁(调用 RLock() 方法;而写 锁(调用 Lock() 方法)会阻止任何其他 goroutine (无论读和写)进来,整个锁相当于由该 goroutine 独占。从 RWMutex 的实现看, RWMutex 类型其实组合了 Mutex :
对于这两种锁类型,任何一个 Lock() 或 RLock() 均需要保证对应有 Unlock() 或 RUnlock()
调用与之对应,否则可能导致等待该锁的所有goroutine处于饥饿状态,甚至可能导致死锁。锁的
典型使用模式如下: var l sync.Mutex
func foo(){
l.Lock()
defer l.Unlock()
//...
}
这里我们再一次见证了Go语言defer关键字带来的优雅。
全局唯一性操作
对于从全局的角度只需要运行一次的代码,比如全局初始化操作,Go语言提供了一个Once 类型来保证全局的唯一性操作,具体代码如下:
var a string
var once sync.Once
func setup(){
a ="hello, world"
}
func doprint(){
once.Do(setup)
print(a)
}
func twoprint(){
go doprint()
go doprint()
}
once的Do()方法可以保证在全局范围内只调用指定的函数一次(这里指
setup()函数),而且所有其他goroutine在调用到此语句时,将会先被阻塞,直至全局唯一的
once.Do()调用结束后才继续。