Go语言学习08-并发编程

Thread vs. Groutine

  1. 创建时默认的stack的大小

    • JDK5 以后 Java Thread stack 默认为 1M
    • Groutine 的 Stack 初始化大小为 2K
  2. 和 KSE (Kernel Space Entity) 的对应关系

    • Java Thread 是 1:1
    • Groutine 是 M:N

1
2
3
4
5
6
7
8
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i)
}(i)
}
time.Sleep(time.Millisecond * 50)
}

共享内存并发机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}

func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
wg.Done()
}()
}
wg.Wait()
t.Logf("counter = %d", counter)
}

CSP并发机制

CSP vs. Actor

  • 和Actor的直接通讯不不同,CSP模式则是通过Channel进行通讯的,更松耦合⼀些

  • Go中channel是有容量限制并且独立于处理Groutine,而如Erlang,Actor模式中的mailbox容量是无限的,接收进程也总是被动地处理消息。

    Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}

func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}

func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
}

func AsyncService() chan string {
//retCh := make(chan string)
retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}

func TestAsynService(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh)
time.Sleep(time.Second * 1)
}

多路选择和超时控制

select

多渠道的选择
1
2
3
4
5
6
7
8
select {
case ret := <-retCh1:
t.Logf("result %s", ret)
case ret := <-retCh2:
t.Logf("result %s", ret)
default:
t.Error("No one returned")
}
超时控制
1
2
3
4
5
6
select {
case ret := <-retCh:
t.Logf("result %s", ret)
case <-time.After(time.Second * 1):
t.Error("time out")
}

channel的关闭和广播

channel的关闭

  • 向 关闭的channel发送数据, 会导致 panic
  • v, ok <-ch; ok 为bool值, true 表示正常接受, false 表示通道关闭
  • 所有的 channel 接收者都会在channel关闭时, 立刻从阻塞等待中返回且上述ok值为false. 这个广播机制常被利用, 进行向多个订阅者同时发送信号. 如: 退出信号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
//ch <- 11
wg.Done()
}()
}

func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 11; i++ {
//data := <-ch
//fmt.Println(data)
if data, ok := <-ch; ok {
fmt.Println(data)
} else {
break
}
}
wg.Done()
}()
}

func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
//wg.Add(1)
//dataReceiver(ch, &wg)
wg.Wait()
}

context处理复杂场景任务的取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}

func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}

func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}

func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ {
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
//cancel_1(cancelChan)
cancel_2(cancelChan)
time.Sleep(time.Second * 1)
}

Context与关联任务取消

  • 根Context: 通过 context.Background() 创建
  • 子Context: context.WithCancel(parentContext) 创建
    • ctx, cancel := context.WithCancel(context.Background())
  • 当前Context 被取消时, 基于他的子 context 都会被取消
  • 接收取消通知 <-ctx.Done()