Go语言学习13-常见软件架构的实现

架构模式

An architectural pattern is a general, reusable solution to a commonly occurring problem in software architectural within a given context. ——wikipedia

Pipe-Filter 架构

Pipe-Filter 模式

  • 非常适合于数据处理及数据分析系统

  • Filter 封装数据处理的功能

  • 松耦合: Filter只跟数据(格式) 耦合

  • Pipe用于连接 Filter 传递数据或者在异步处理过程中缓冲数据流

    进程内同步调用时, pipe 演变为数据在方法调用间传递

Filter和组合模式

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// filter.go
// Package pipefilter is to define the interfaces
// and the structures for pipe-filter style implementation
package pipefilter

// Request is the input of the filter
type Request interface{}

// Response is the output of the filter
type Response interface{}

// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
Process(data Request) (Response, error)
}

// split_filter.go
package pipefilter

import (
"errors"
"strings"
)

var SplitFilterWrongFormatError = errors.New("input data should be string")

type SplitFilter struct {
delimiter string
}

func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter}
}

func (sf *SplitFilter) Process(data Request) (Response, error) {
str, ok := data.(string) // 检查数据格式/类型, 是否可以处理
if !ok {
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
return parts, nil
}

// to_int_filter.go
package pipefilter

import (
"errors"
"strconv"
)

var ToIntFilterWrongFormatError = errors.New("input data should be []string")

type ToIntFilter struct {
}

func NewToIntFilter() *ToIntFilter {
return &ToIntFilter{}
}

func (tif *ToIntFilter) Process(data Request) (Response, error) {
parts, ok := data.([]string)
if !ok {
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _, part := range parts {
s, err := strconv.Atoi(part)
if err != nil {
return nil, err
}
ret = append(ret, s)
}
return ret, nil
}

// sum_filter.go
package pipefilter

import "errors"

var SumFilterWrongFormatError = errors.New("input data should be []int")

type SumFilter struct {
}

func NewSumFilter() *SumFilter {
return &SumFilter{}
}

func (sf *SumFilter) Process(data Request) (Response, error) {
elems, ok := data.([]int)
if !ok {
return nil, SumFilterWrongFormatError
}
ret := 0
for _, elem := range elems {
ret += elem
}
return ret, nil
}

// straight_pipeline.go
package pipefilter

// StraightPipeline is composed of the filters, and the filters are piled as a straight line.
type StraightPipeline struct {
Name string
Filters *[]Filter
}

// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {
return &StraightPipeline{
Name: name,
Filters: &filters,
}
}

// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {
var ret interface{}
var err error
for _, filter := range *f.Filters {
ret, err = filter.Process(data)
if err != nil {
return ret, err
}
data = ret
}
return ret, err
}

Micro Kernel架构

  • 特点
    • 易于扩展
    • 错误隔离
    • 保持架构一致性
  • 要点
    • 内核包含公共流程或通用逻辑
    • 将可变或可扩展部分规划为扩展点
    • 抽象扩展点行为, 定义接口
    • 利用插件进行扩展

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package microkernel

import (
"context"
"errors"
"fmt"
"strings"
"sync"
)

const (
Waiting = iota
Running
)

var WrongStateError = errors.New("can not take the operation in the current state")

type CollectorsError struct {
CollectorsErrors []error
}

func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorsErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}

type Event struct {
Source string
Content string
}

type EventReceiver interface {
OnEvent(evt Event)
}

type Collector interface {
Init(evtReceiver EventReceiver) error
Start(agtCtx context.Context) error
Stop() error
Destroy() error
}

type Agent struct {
collectors map[string]Collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int
}

func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}
}

func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}

return &agt
}

func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}

func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex

for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}

func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}

func (agt *Agent) destroyCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destroy(); err != nil {
errs.CollectorsErrors = append(errs.CollectorsErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorsErrors) == 0 {
return nil
}
return errs
}

func (agt *Agent) Start() error {
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}

func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}

func (agt *Agent) Destroy() error {
if agt.state != Waiting {
return WrongStateError
}
return agt.destroyCollectors()
}

func (agt *Agent) OnEvent(evt Event) {
agt.evtBuf <- evt
}