Go语言学习09-典型并发任务

单例模式(懒汉式, 线程安全)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Singleton struct {
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Println(unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}

场景01: 仅需任意任务完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}

func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}

func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}

场景02: 等待所有任务完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}

func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet
}

func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(AllResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}

实现池化

使用 buffered channel 实现对象池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type ReusableObj struct {
}

type ObjPool struct {
bufChan chan *ReusableObj // 用于缓冲可重用对象
}

func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}

func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // 超时控制
return nil, errors.New("time out")
}
}

func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}