区块链技术博客
www.b2bchain.cn

基础篇·第二章[2]·GO并发编程基础求职学习资料

本文介绍了基础篇·第二章[2]·GO并发编程基础求职学习资料,有助于帮助完成毕业设计以及求职,是一篇很好的资料。

对技术面试,学习经验等有一些体会,在此分享。

[TOC]

1. 定义

GO语言天生支持并发,这个并发的“基石”就是GO程(goroutine)。

goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine(单个goroutine开销小至2K)并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。相比于C、C++、Java等语言,程序员只能使用系统级线程或进程,且需要手动维护线程池,需要自己去调度线程执行任务并维护上下文切换,释放资源等等,心智负担极重。而GO在语言设计之初,就从语言层面本身内置了调度GMP机制,智能地将 goroutine 中的任务合理地分配给每个CPU线程。Go语言也被称为21世纪的C语言。
并发:是在单核的情况下,轮转时间片,来实现多任务的调度切换执行(同一时段); 并行:是在多核的情况下,在多核之间轮转时间片,来实现多任务的调度切换执行(同一时刻);

GO并发的思想是遵循“通过通信(channel)来共享内存”而不是“通过共享内存来通信”。
CSP:Communication Sequential Process

2. 使用

2.1 单goroutine

关键字go
GO语言中使用goroutine非常简单粗暴,即只需在调用的函数前面加上go关键字,主线程就开启了一个goroutine去执行这个函数任务。如

package main import (     "fmt"     "time" )  func Show(){     fmt.Println("one goroutine called the func Show") }  func main() {     go Show()     time.Sleep(time.Second) //此处让主线程等待1s好让goroutine有机会执行到,否则主线程退出子进程也将被结束。 }

2.2 多goroutine

package main  import (     "fmt"     "runtime"     "time" )  func main() {     for i := 0; i < 10; i++ {         // 不传参数,直接使用外部i变量的情形         //go func() {         //  fmt.Printf("goroutine %d doing biz ...n", i)         //}()          // 传参数,使用外部i变量的拷贝的情形         go func(j int) {             fmt.Printf("goroutine %d doing biz ...n", j)         }(i)     }     runtime.Gosched()       // 此处只是让main稍微让出CPU一次,不确保所有子goroutine都能执行完     time.Sleep(time.Second) //此处让主线程等待1s好让所有goroutine有机会执行到,否则主线程退出子进程也将被结束。     fmt.Println("main exit") }

3. GO程间通信

上面示例中,主线程和子goroutine之间并无通信,主线程通过runtime.Gosched()或time.Sleep(time.Second)的方式都不能友好的知道是否所有子goroutine都已完成了任务。

3.1 通道

为了goroutine之间能方便的通信和调度控制,GO提供了通道技术。

3.1.1 通道的声明

关键字chan
var 通道变量名 chan 通道内值类型T

如 var noBufChan chan struct{} fmt.Println(noBufChan) // <nil>

3.1.2 初始化

格式make(chan 值类型 [, 缓冲大小])
通过make进行通道初始化,初始化后的通道才能使用,否则编译通过,但运行时将报错(不易发现要注意)。
基础篇·第二章[2]·GO并发编程基础

通道有三种操作,发送(ch<-)、接收(<-ch)和关闭(close(ch))。
对于关闭有话要说:
一般只在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
通道是可以被垃圾回收机制回收的,区别于文件等资源必须是在结束操作后关闭,通道则不然。

对一个【关闭的】通道,特点如下:
– 再次关闭panic;
– 再次发送panic;
– 再次接收会取得值至通道为空最终取得元素零值;
试想想如果不是关闭的,又分别会是什么情况?

3.1.3 无缓冲通道

所谓无缓冲通道,也可理解为阻塞式通道或同步通道,即读写必须同时具备,否则将死锁。
基础篇·第二章[2]·GO并发编程基础

package main  import (     "fmt" )  func main() {     var noBufChan chan struct{}     noBufChan = make(chan struct{}) //必须先初始化才能使用, 否则编译通过但运行时会报错      go func() {         fmt.Println("one goroutine snd struct{} to noBufChan")         <-noBufChan // 无缓冲通道如果只有写入,没有相应的读取,将报错     }()      noBufChan <- struct{}{}      fmt.Println("all goroutine finished bizs!")     fmt.Println("main exit") }

检查是否有无死锁或资源竞争,可以在运行时加选项-race,如: go run -race main.go

3.1.4 有缓冲通道

相对无缓冲通道而言,有缓存通道必须在初始化时指定通道的容量数。
使用中,如果通道满了,则将出现写被阻塞;通道空了,则出现读被阻塞;

package main  import "fmt"  func main() {     var noBufChan chan struct{}      //无缓冲通道     //noBufChan = make(chan struct{})     //无缓冲通道,如果没有一个读取者,将panic     //go func() {     //  <-noBufChan     //}()      //有缓冲通道     noBufChan = make(chan struct{}, 1) //容量为1(只要大于0)     //有缓冲通道,即使没有读取者,不会阻塞,也不会panic     noBufChan <- struct{}{}      fmt.Println("main exit") }

3.1.4.1 通道的遍历

  • fori + 条件判断
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {         for {             v, ok := <-bufChan //此处一定是要对应bufChan有一个关闭动作             if !ok {                 println("sub rcved all !!!")                 <-done                 break             }             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }      }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }     // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{}// 同步通道,等待子goroutine完成任务  }
  • range 【推荐】
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {                 // for range形式遍历通道,底层已判断通道是否被关闭         for v := range bufChan {             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }          println("sub rcved all !!!")         <-done     }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }      // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{} // 同步通道,等待子goroutine完成任务 }

3.1.5 单向通道

单向通道,是对通道进行更加精细的操作的一种粒度,可以区分通道只能读或只能写或都可以;特别是作为参数进行传递,让代码更具可读性,同时更加健壮和安全。

package main  import (     "fmt" )  var bufChan = make(chan rune, 3) var done = make(chan struct{})  func snd(inChan chan<- rune) {     for _, c := range "collection" {         inChan <- c         fmt.Printf("snd %cn", c)     }     fmt.Println("snd ok !")     close(inChan) //此处重要,否则rcv中的for range将阻塞,进而导致done通道只有写入却无法走到读取而出现死锁panic }  func rcv(outChan <-chan rune) {     for v := range outChan {         fmt.Printf("rcv %cn", v)     }     fmt.Println("rcv all !")     <-done }  func main() {     go snd(bufChan) //更细粒度的通道控制 只写     go rcv(bufChan) //更细粒度的通道控制 只读     done <- struct{}{}     fmt.Println("main exit") }

3.2 简单的work-pool和sync.WaitGroup

在代码中使用time.Sleep是无法预期任务执行完毕耗时的,使用sync.WaitGroup可以优雅的等待并发任务全部完成。
本质上sync.WaitGroup内部维护了一个计数器,计数器Add()的值随子goroutine的创建而加1,Done()随之退出而减1。而Wait()则等待计数器值为0时,表示所有并发任务已经完成。

package main  import (     "fmt"     "sync"     "time" )  var bufChan = make(chan int, 100) var done = make(chan int)  var wg1 sync.WaitGroup var wg2 sync.WaitGroup  func product(pid int, inChan chan<- int) {     defer wg1.Done()     for i := 1; i <= 5; i++ {         inChan <- pid*10 + i         time.Sleep(time.Millisecond * 10)         fmt.Printf("pid %d create new request: %dn", pid, pid*10+i)     }      time.Sleep(time.Second) }  func consume(outChan <-chan int) {     defer wg2.Done()     for v := range outChan {         time.Sleep(time.Millisecond * 100)         fmt.Printf("done respone: %dn", v)     } }  func main() {     // 假设4个生产者,每个生产者请求10次(每次间隔10毫秒)     for i := 1; i <= 4; i++ {         wg1.Add(1)         go product(i, bufChan)     }      // 假设2个消费者,每100毫秒响应一个应答     for i := 1; i <= 2; i++ {         wg2.Add(1)         go consume(bufChan)     }      wg1.Wait()     close(bufChan)     wg2.Wait()     fmt.Printf("Both product and consume works Done !") }

3.3 select多路复用

对于一些异步业务场景,使用多通道,那么通道的先后执行顺序可以交给select多路复用的随机调度去自由处理。
select 同时响应多个通道的操作。
如果所有case条件都未达到,且没有default的话,select就会一直阻塞等待;
如果所有case条件都未达到,但是有default的话,select就会立即执行default;
某个case的通信操作完成时,就会执行case分支对应的语句;
多个case的通信操作完成时,select会随机选择一个;

package main  import (     "fmt"     "sync" )  var wg sync.WaitGroup  func main() {     ch1 := make(chan string, 10)     ch2 := make(chan string, 10)      wg.Add(1)     go func() {         defer wg.Done()         for i := 1; i <= 10; i++ {             ch1 <- fmt.Sprintf("request %d", i)         }         close(ch1)     }()      wg.Add(1)     go func() {         defer wg.Done()         for i := 101; i <= 110; i++ {             ch2 <- fmt.Sprintf("response %d", i)         }         close(ch2)     }()      wg.Add(1)     go func() {         defer wg.Done()         flag1, flag2 := false, false         for {             select {             case s1, ok := <-ch1:                 if !ok {                     fmt.Println("ch1 read all !")                     flag1 = true                 }                 fmt.Println("select from ch1:", s1)             case s2, ok := <-ch2:                 if !ok {                     fmt.Println("ch2 read all !")                     flag2 = true                 }                 fmt.Println("select from ch2:", s2)             default:                 if flag1 && flag2 {                     break                 }             }             if flag1 && flag2 {                 break             }         }     }()      wg.Wait() }

3.4 goroutine的并发安全和锁机制

并发给我们带来优越性能的同时,也引入了安全方面的问题,特别是并发“争抢”资源时,一不小心就可能出现“竟态”问题。经常被用来举例的就是“售票”例子。

package main  import (     "fmt"     "sync" )  func main() {     var wg sync.WaitGroup     ticketTotal := 100      for i := 0; i < 100; i++ {         wg.Add(1)         go func() {                         defer wg.Done()             ticketTotal--         }()     }      wg.Wait()     fmt.Println("Final result: ", ticketTotal) //结果并非预期的0 }

3.4.1 互斥锁

关键字sync.Metux
售票的例子中,并发的goroutine可能同时修改ticketTotal,而此时ticketTotal值可能相同,那么相当于在同一时刻做了重复的事情,所以ticketTotal最终“少卖”,结果值大于0的概率极高。
解决“竞争”问题,可以用“锁”机制,对资源进行加锁和解锁操作,来达到结果的准确性。
有一种锁叫“互斥锁”,是完全互斥的锁,即一个时刻,不管是读还是写,只能有一个goroutine在操作资源。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var lock sync.Mutex     var wg sync.WaitGroup     ticketTotal := 100 //100张票      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             lock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             lock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.4.2 读写锁

关键字sync.RWMetux
上面通过互斥锁,虽然能让结果正确。但是,一个时刻只能有一个goroutine操作资源(不管是读还是写),而现实的场景多数是“读多写少”,因为一个写而影响多数的读是有很大问题的。为此,可以通过读写锁来适配“读多写少”的场景。(性能问题另外讨论)

读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var rwlock sync.RWMutex     var wg sync.WaitGroup     ticketTotal := 100      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             rwlock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.RLock()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)             rwlock.RUnlock()         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.5 sync.Once

3.6 sync.Map

3.7 原子操作

[TOC]

1. 定义

GO语言天生支持并发,这个并发的“基石”就是GO程(goroutine)。

goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine(单个goroutine开销小至2K)并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。相比于C、C++、Java等语言,程序员只能使用系统级线程或进程,且需要手动维护线程池,需要自己去调度线程执行任务并维护上下文切换,释放资源等等,心智负担极重。而GO在语言设计之初,就从语言层面本身内置了调度GMP机制,智能地将 goroutine 中的任务合理地分配给每个CPU线程。Go语言也被称为21世纪的C语言。
并发:是在单核的情况下,轮转时间片,来实现多任务的调度切换执行(同一时段); 并行:是在多核的情况下,在多核之间轮转时间片,来实现多任务的调度切换执行(同一时刻);

GO并发的思想是遵循“通过通信(channel)来共享内存”而不是“通过共享内存来通信”。
CSP:Communication Sequential Process

2. 使用

2.1 单goroutine

关键字go
GO语言中使用goroutine非常简单粗暴,即只需在调用的函数前面加上go关键字,主线程就开启了一个goroutine去执行这个函数任务。如

package main import (     "fmt"     "time" )  func Show(){     fmt.Println("one goroutine called the func Show") }  func main() {     go Show()     time.Sleep(time.Second) //此处让主线程等待1s好让goroutine有机会执行到,否则主线程退出子进程也将被结束。 }

2.2 多goroutine

package main  import (     "fmt"     "runtime"     "time" )  func main() {     for i := 0; i < 10; i++ {         // 不传参数,直接使用外部i变量的情形         //go func() {         //  fmt.Printf("goroutine %d doing biz ...n", i)         //}()          // 传参数,使用外部i变量的拷贝的情形         go func(j int) {             fmt.Printf("goroutine %d doing biz ...n", j)         }(i)     }     runtime.Gosched()       // 此处只是让main稍微让出CPU一次,不确保所有子goroutine都能执行完     time.Sleep(time.Second) //此处让主线程等待1s好让所有goroutine有机会执行到,否则主线程退出子进程也将被结束。     fmt.Println("main exit") }

3. GO程间通信

上面示例中,主线程和子goroutine之间并无通信,主线程通过runtime.Gosched()或time.Sleep(time.Second)的方式都不能友好的知道是否所有子goroutine都已完成了任务。

3.1 通道

为了goroutine之间能方便的通信和调度控制,GO提供了通道技术。

3.1.1 通道的声明

关键字chan
var 通道变量名 chan 通道内值类型T

如 var noBufChan chan struct{} fmt.Println(noBufChan) // <nil>

3.1.2 初始化

格式make(chan 值类型 [, 缓冲大小])
通过make进行通道初始化,初始化后的通道才能使用,否则编译通过,但运行时将报错(不易发现要注意)。
基础篇·第二章[2]·GO并发编程基础

通道有三种操作,发送(ch<-)、接收(<-ch)和关闭(close(ch))。
对于关闭有话要说:
一般只在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
通道是可以被垃圾回收机制回收的,区别于文件等资源必须是在结束操作后关闭,通道则不然。

对一个【关闭的】通道,特点如下:
– 再次关闭panic;
– 再次发送panic;
– 再次接收会取得值至通道为空最终取得元素零值;
试想想如果不是关闭的,又分别会是什么情况?

3.1.3 无缓冲通道

所谓无缓冲通道,也可理解为阻塞式通道或同步通道,即读写必须同时具备,否则将死锁。
基础篇·第二章[2]·GO并发编程基础

package main  import (     "fmt" )  func main() {     var noBufChan chan struct{}     noBufChan = make(chan struct{}) //必须先初始化才能使用, 否则编译通过但运行时会报错      go func() {         fmt.Println("one goroutine snd struct{} to noBufChan")         <-noBufChan // 无缓冲通道如果只有写入,没有相应的读取,将报错     }()      noBufChan <- struct{}{}      fmt.Println("all goroutine finished bizs!")     fmt.Println("main exit") }

检查是否有无死锁或资源竞争,可以在运行时加选项-race,如: go run -race main.go

3.1.4 有缓冲通道

相对无缓冲通道而言,有缓存通道必须在初始化时指定通道的容量数。
使用中,如果通道满了,则将出现写被阻塞;通道空了,则出现读被阻塞;

package main  import "fmt"  func main() {     var noBufChan chan struct{}      //无缓冲通道     //noBufChan = make(chan struct{})     //无缓冲通道,如果没有一个读取者,将panic     //go func() {     //  <-noBufChan     //}()      //有缓冲通道     noBufChan = make(chan struct{}, 1) //容量为1(只要大于0)     //有缓冲通道,即使没有读取者,不会阻塞,也不会panic     noBufChan <- struct{}{}      fmt.Println("main exit") }

3.1.4.1 通道的遍历

  • fori + 条件判断
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {         for {             v, ok := <-bufChan //此处一定是要对应bufChan有一个关闭动作             if !ok {                 println("sub rcved all !!!")                 <-done                 break             }             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }      }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }     // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{}// 同步通道,等待子goroutine完成任务  }
  • range 【推荐】
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {                 // for range形式遍历通道,底层已判断通道是否被关闭         for v := range bufChan {             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }          println("sub rcved all !!!")         <-done     }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }      // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{} // 同步通道,等待子goroutine完成任务 }

3.1.5 单向通道

单向通道,是对通道进行更加精细的操作的一种粒度,可以区分通道只能读或只能写或都可以;特别是作为参数进行传递,让代码更具可读性,同时更加健壮和安全。

package main  import (     "fmt" )  var bufChan = make(chan rune, 3) var done = make(chan struct{})  func snd(inChan chan<- rune) {     for _, c := range "collection" {         inChan <- c         fmt.Printf("snd %cn", c)     }     fmt.Println("snd ok !")     close(inChan) //此处重要,否则rcv中的for range将阻塞,进而导致done通道只有写入却无法走到读取而出现死锁panic }  func rcv(outChan <-chan rune) {     for v := range outChan {         fmt.Printf("rcv %cn", v)     }     fmt.Println("rcv all !")     <-done }  func main() {     go snd(bufChan) //更细粒度的通道控制 只写     go rcv(bufChan) //更细粒度的通道控制 只读     done <- struct{}{}     fmt.Println("main exit") }

3.2 简单的work-pool和sync.WaitGroup

在代码中使用time.Sleep是无法预期任务执行完毕耗时的,使用sync.WaitGroup可以优雅的等待并发任务全部完成。
本质上sync.WaitGroup内部维护了一个计数器,计数器Add()的值随子goroutine的创建而加1,Done()随之退出而减1。而Wait()则等待计数器值为0时,表示所有并发任务已经完成。

package main  import (     "fmt"     "sync"     "time" )  var bufChan = make(chan int, 100) var done = make(chan int)  var wg1 sync.WaitGroup var wg2 sync.WaitGroup  func product(pid int, inChan chan<- int) {     defer wg1.Done()     for i := 1; i <= 5; i++ {         inChan <- pid*10 + i         time.Sleep(time.Millisecond * 10)         fmt.Printf("pid %d create new request: %dn", pid, pid*10+i)     }      time.Sleep(time.Second) }  func consume(outChan <-chan int) {     defer wg2.Done()     for v := range outChan {         time.Sleep(time.Millisecond * 100)         fmt.Printf("done respone: %dn", v)     } }  func main() {     // 假设4个生产者,每个生产者请求10次(每次间隔10毫秒)     for i := 1; i <= 4; i++ {         wg1.Add(1)         go product(i, bufChan)     }      // 假设2个消费者,每100毫秒响应一个应答     for i := 1; i <= 2; i++ {         wg2.Add(1)         go consume(bufChan)     }      wg1.Wait()     close(bufChan)     wg2.Wait()     fmt.Printf("Both product and consume works Done !") }

3.3 select多路复用

对于一些异步业务场景,使用多通道,那么通道的先后执行顺序可以交给select多路复用的随机调度去自由处理。
select 同时响应多个通道的操作。
如果所有case条件都未达到,且没有default的话,select就会一直阻塞等待;
如果所有case条件都未达到,但是有default的话,select就会立即执行default;
某个case的通信操作完成时,就会执行case分支对应的语句;
多个case的通信操作完成时,select会随机选择一个;

package main  import (     "fmt"     "sync" )  var wg sync.WaitGroup  func main() {     ch1 := make(chan string, 10)     ch2 := make(chan string, 10)      wg.Add(1)     go func() {         defer wg.Done()         for i := 1; i <= 10; i++ {             ch1 <- fmt.Sprintf("request %d", i)         }         close(ch1)     }()      wg.Add(1)     go func() {         defer wg.Done()         for i := 101; i <= 110; i++ {             ch2 <- fmt.Sprintf("response %d", i)         }         close(ch2)     }()      wg.Add(1)     go func() {         defer wg.Done()         flag1, flag2 := false, false         for {             select {             case s1, ok := <-ch1:                 if !ok {                     fmt.Println("ch1 read all !")                     flag1 = true                 }                 fmt.Println("select from ch1:", s1)             case s2, ok := <-ch2:                 if !ok {                     fmt.Println("ch2 read all !")                     flag2 = true                 }                 fmt.Println("select from ch2:", s2)             default:                 if flag1 && flag2 {                     break                 }             }             if flag1 && flag2 {                 break             }         }     }()      wg.Wait() }

3.4 goroutine的并发安全和锁机制

并发给我们带来优越性能的同时,也引入了安全方面的问题,特别是并发“争抢”资源时,一不小心就可能出现“竟态”问题。经常被用来举例的就是“售票”例子。

package main  import (     "fmt"     "sync" )  func main() {     var wg sync.WaitGroup     ticketTotal := 100      for i := 0; i < 100; i++ {         wg.Add(1)         go func() {                         defer wg.Done()             ticketTotal--         }()     }      wg.Wait()     fmt.Println("Final result: ", ticketTotal) //结果并非预期的0 }

3.4.1 互斥锁

关键字sync.Metux
售票的例子中,并发的goroutine可能同时修改ticketTotal,而此时ticketTotal值可能相同,那么相当于在同一时刻做了重复的事情,所以ticketTotal最终“少卖”,结果值大于0的概率极高。
解决“竞争”问题,可以用“锁”机制,对资源进行加锁和解锁操作,来达到结果的准确性。
有一种锁叫“互斥锁”,是完全互斥的锁,即一个时刻,不管是读还是写,只能有一个goroutine在操作资源。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var lock sync.Mutex     var wg sync.WaitGroup     ticketTotal := 100 //100张票      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             lock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             lock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.4.2 读写锁

关键字sync.RWMetux
上面通过互斥锁,虽然能让结果正确。但是,一个时刻只能有一个goroutine操作资源(不管是读还是写),而现实的场景多数是“读多写少”,因为一个写而影响多数的读是有很大问题的。为此,可以通过读写锁来适配“读多写少”的场景。(性能问题另外讨论)

读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var rwlock sync.RWMutex     var wg sync.WaitGroup     ticketTotal := 100      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             rwlock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.RLock()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)             rwlock.RUnlock()         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.5 sync.Once

3.6 sync.Map

3.7 原子操作

[TOC]

1. 定义

GO语言天生支持并发,这个并发的“基石”就是GO程(goroutine)。

goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine(单个goroutine开销小至2K)并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。相比于C、C++、Java等语言,程序员只能使用系统级线程或进程,且需要手动维护线程池,需要自己去调度线程执行任务并维护上下文切换,释放资源等等,心智负担极重。而GO在语言设计之初,就从语言层面本身内置了调度GMP机制,智能地将 goroutine 中的任务合理地分配给每个CPU线程。Go语言也被称为21世纪的C语言。
并发:是在单核的情况下,轮转时间片,来实现多任务的调度切换执行(同一时段); 并行:是在多核的情况下,在多核之间轮转时间片,来实现多任务的调度切换执行(同一时刻);

GO并发的思想是遵循“通过通信(channel)来共享内存”而不是“通过共享内存来通信”。
CSP:Communication Sequential Process

2. 使用

2.1 单goroutine

关键字go
GO语言中使用goroutine非常简单粗暴,即只需在调用的函数前面加上go关键字,主线程就开启了一个goroutine去执行这个函数任务。如

package main import (     "fmt"     "time" )  func Show(){     fmt.Println("one goroutine called the func Show") }  func main() {     go Show()     time.Sleep(time.Second) //此处让主线程等待1s好让goroutine有机会执行到,否则主线程退出子进程也将被结束。 }

2.2 多goroutine

package main  import (     "fmt"     "runtime"     "time" )  func main() {     for i := 0; i < 10; i++ {         // 不传参数,直接使用外部i变量的情形         //go func() {         //  fmt.Printf("goroutine %d doing biz ...n", i)         //}()          // 传参数,使用外部i变量的拷贝的情形         go func(j int) {             fmt.Printf("goroutine %d doing biz ...n", j)         }(i)     }     runtime.Gosched()       // 此处只是让main稍微让出CPU一次,不确保所有子goroutine都能执行完     time.Sleep(time.Second) //此处让主线程等待1s好让所有goroutine有机会执行到,否则主线程退出子进程也将被结束。     fmt.Println("main exit") }

3. GO程间通信

上面示例中,主线程和子goroutine之间并无通信,主线程通过runtime.Gosched()或time.Sleep(time.Second)的方式都不能友好的知道是否所有子goroutine都已完成了任务。

3.1 通道

为了goroutine之间能方便的通信和调度控制,GO提供了通道技术。

3.1.1 通道的声明

关键字chan
var 通道变量名 chan 通道内值类型T

如 var noBufChan chan struct{} fmt.Println(noBufChan) // <nil>

3.1.2 初始化

格式make(chan 值类型 [, 缓冲大小])
通过make进行通道初始化,初始化后的通道才能使用,否则编译通过,但运行时将报错(不易发现要注意)。
基础篇·第二章[2]·GO并发编程基础

通道有三种操作,发送(ch<-)、接收(<-ch)和关闭(close(ch))。
对于关闭有话要说:
一般只在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。
通道是可以被垃圾回收机制回收的,区别于文件等资源必须是在结束操作后关闭,通道则不然。

对一个【关闭的】通道,特点如下:
– 再次关闭panic;
– 再次发送panic;
– 再次接收会取得值至通道为空最终取得元素零值;
试想想如果不是关闭的,又分别会是什么情况?

3.1.3 无缓冲通道

所谓无缓冲通道,也可理解为阻塞式通道或同步通道,即读写必须同时具备,否则将死锁。
基础篇·第二章[2]·GO并发编程基础

package main  import (     "fmt" )  func main() {     var noBufChan chan struct{}     noBufChan = make(chan struct{}) //必须先初始化才能使用, 否则编译通过但运行时会报错      go func() {         fmt.Println("one goroutine snd struct{} to noBufChan")         <-noBufChan // 无缓冲通道如果只有写入,没有相应的读取,将报错     }()      noBufChan <- struct{}{}      fmt.Println("all goroutine finished bizs!")     fmt.Println("main exit") }

检查是否有无死锁或资源竞争,可以在运行时加选项-race,如: go run -race main.go

3.1.4 有缓冲通道

相对无缓冲通道而言,有缓存通道必须在初始化时指定通道的容量数。
使用中,如果通道满了,则将出现写被阻塞;通道空了,则出现读被阻塞;

package main  import "fmt"  func main() {     var noBufChan chan struct{}      //无缓冲通道     //noBufChan = make(chan struct{})     //无缓冲通道,如果没有一个读取者,将panic     //go func() {     //  <-noBufChan     //}()      //有缓冲通道     noBufChan = make(chan struct{}, 1) //容量为1(只要大于0)     //有缓冲通道,即使没有读取者,不会阻塞,也不会panic     noBufChan <- struct{}{}      fmt.Println("main exit") }

3.1.4.1 通道的遍历

  • fori + 条件判断
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {         for {             v, ok := <-bufChan //此处一定是要对应bufChan有一个关闭动作             if !ok {                 println("sub rcved all !!!")                 <-done                 break             }             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }      }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }     // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{}// 同步通道,等待子goroutine完成任务  }
  • range 【推荐】
package main  import (     "fmt"     "time" )  func main() {     bufChan := make(chan rune, 3)     done := make(chan struct{})      go func() {                 // for range形式遍历通道,底层已判断通道是否被关闭         for v := range bufChan {             fmt.Printf("sub goroutine got: %cn", v)             time.Sleep(time.Millisecond * 200)         }          println("sub rcved all !!!")         <-done     }()      for _, v := range "collection" {         bufChan <- v         fmt.Printf("main snd: %cn", v)     }      // 此处一定要close,否则上面子goroutine的for遍历通道将一直阻塞,     // 进而导致done通道未被读取而引发panic     close(bufChan) // 此处重要     println("main closed the bufChan")      println("main waitting done signal ...")     done <- struct{}{} // 同步通道,等待子goroutine完成任务 }

3.1.5 单向通道

单向通道,是对通道进行更加精细的操作的一种粒度,可以区分通道只能读或只能写或都可以;特别是作为参数进行传递,让代码更具可读性,同时更加健壮和安全。

package main  import (     "fmt" )  var bufChan = make(chan rune, 3) var done = make(chan struct{})  func snd(inChan chan<- rune) {     for _, c := range "collection" {         inChan <- c         fmt.Printf("snd %cn", c)     }     fmt.Println("snd ok !")     close(inChan) //此处重要,否则rcv中的for range将阻塞,进而导致done通道只有写入却无法走到读取而出现死锁panic }  func rcv(outChan <-chan rune) {     for v := range outChan {         fmt.Printf("rcv %cn", v)     }     fmt.Println("rcv all !")     <-done }  func main() {     go snd(bufChan) //更细粒度的通道控制 只写     go rcv(bufChan) //更细粒度的通道控制 只读     done <- struct{}{}     fmt.Println("main exit") }

3.2 简单的work-pool和sync.WaitGroup

在代码中使用time.Sleep是无法预期任务执行完毕耗时的,使用sync.WaitGroup可以优雅的等待并发任务全部完成。
本质上sync.WaitGroup内部维护了一个计数器,计数器Add()的值随子goroutine的创建而加1,Done()随之退出而减1。而Wait()则等待计数器值为0时,表示所有并发任务已经完成。

package main  import (     "fmt"     "sync"     "time" )  var bufChan = make(chan int, 100) var done = make(chan int)  var wg1 sync.WaitGroup var wg2 sync.WaitGroup  func product(pid int, inChan chan<- int) {     defer wg1.Done()     for i := 1; i <= 5; i++ {         inChan <- pid*10 + i         time.Sleep(time.Millisecond * 10)         fmt.Printf("pid %d create new request: %dn", pid, pid*10+i)     }      time.Sleep(time.Second) }  func consume(outChan <-chan int) {     defer wg2.Done()     for v := range outChan {         time.Sleep(time.Millisecond * 100)         fmt.Printf("done respone: %dn", v)     } }  func main() {     // 假设4个生产者,每个生产者请求10次(每次间隔10毫秒)     for i := 1; i <= 4; i++ {         wg1.Add(1)         go product(i, bufChan)     }      // 假设2个消费者,每100毫秒响应一个应答     for i := 1; i <= 2; i++ {         wg2.Add(1)         go consume(bufChan)     }      wg1.Wait()     close(bufChan)     wg2.Wait()     fmt.Printf("Both product and consume works Done !") }

3.3 select多路复用

对于一些异步业务场景,使用多通道,那么通道的先后执行顺序可以交给select多路复用的随机调度去自由处理。
select 同时响应多个通道的操作。
如果所有case条件都未达到,且没有default的话,select就会一直阻塞等待;
如果所有case条件都未达到,但是有default的话,select就会立即执行default;
某个case的通信操作完成时,就会执行case分支对应的语句;
多个case的通信操作完成时,select会随机选择一个;

package main  import (     "fmt"     "sync" )  var wg sync.WaitGroup  func main() {     ch1 := make(chan string, 10)     ch2 := make(chan string, 10)      wg.Add(1)     go func() {         defer wg.Done()         for i := 1; i <= 10; i++ {             ch1 <- fmt.Sprintf("request %d", i)         }         close(ch1)     }()      wg.Add(1)     go func() {         defer wg.Done()         for i := 101; i <= 110; i++ {             ch2 <- fmt.Sprintf("response %d", i)         }         close(ch2)     }()      wg.Add(1)     go func() {         defer wg.Done()         flag1, flag2 := false, false         for {             select {             case s1, ok := <-ch1:                 if !ok {                     fmt.Println("ch1 read all !")                     flag1 = true                 }                 fmt.Println("select from ch1:", s1)             case s2, ok := <-ch2:                 if !ok {                     fmt.Println("ch2 read all !")                     flag2 = true                 }                 fmt.Println("select from ch2:", s2)             default:                 if flag1 && flag2 {                     break                 }             }             if flag1 && flag2 {                 break             }         }     }()      wg.Wait() }

3.4 goroutine的并发安全和锁机制

并发给我们带来优越性能的同时,也引入了安全方面的问题,特别是并发“争抢”资源时,一不小心就可能出现“竟态”问题。经常被用来举例的就是“售票”例子。

package main  import (     "fmt"     "sync" )  func main() {     var wg sync.WaitGroup     ticketTotal := 100      for i := 0; i < 100; i++ {         wg.Add(1)         go func() {                         defer wg.Done()             ticketTotal--         }()     }      wg.Wait()     fmt.Println("Final result: ", ticketTotal) //结果并非预期的0 }

3.4.1 互斥锁

关键字sync.Metux
售票的例子中,并发的goroutine可能同时修改ticketTotal,而此时ticketTotal值可能相同,那么相当于在同一时刻做了重复的事情,所以ticketTotal最终“少卖”,结果值大于0的概率极高。
解决“竞争”问题,可以用“锁”机制,对资源进行加锁和解锁操作,来达到结果的准确性。
有一种锁叫“互斥锁”,是完全互斥的锁,即一个时刻,不管是读还是写,只能有一个goroutine在操作资源。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var lock sync.Mutex     var wg sync.WaitGroup     ticketTotal := 100 //100张票      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             lock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             lock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.4.2 读写锁

关键字sync.RWMetux
上面通过互斥锁,虽然能让结果正确。但是,一个时刻只能有一个goroutine操作资源(不管是读还是写),而现实的场景多数是“读多写少”,因为一个写而影响多数的读是有很大问题的。为此,可以通过读写锁来适配“读多写少”的场景。(性能问题另外讨论)

读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

package main  import (     "fmt"     "sync"     "time" )  func main() {     var rwlock sync.RWMutex     var wg sync.WaitGroup     ticketTotal := 100      now := time.Now()      // 100个售票员     for i := 1; i <= 100; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.Lock()             fmt.Printf("%02d 售票员,正在售票... %dn", i, ticketTotal)             ticketTotal--             rwlock.Unlock()         }(i)      }      // 10000个购票者     for i := 1; i <= 10000; i++ {         wg.Add(1)         go func(i int) {             defer wg.Done()             rwlock.RLock()             fmt.Printf("%03d 购票者,正在查询... %dn", i, ticketTotal)             rwlock.RUnlock()         }(i)     }      wg.Wait()     fmt.Printf("Final result: %d, Cost time: %vn", ticketTotal, time.Since(now)) }

3.5 sync.Once

3.6 sync.Map

3.7 原子操作

部分转自互联网,侵权删除联系

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 基础篇·第二章[2]·GO并发编程基础求职学习资料
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们