// filter.go // Package pipefilter is to define the interfaces // and the structures for pipe-filter style implementation package pipefilter
// Request is the input of the filter type Request interface{}
// Response is the output of the filter type Response interface{}
// Filter interface is the definition of the data processing components // Pipe-Filter structure type Filter interface { Process(data Request) (Response, error) }
// split_filter.go package pipefilter
import ( "errors" "strings" )
var SplitFilterWrongFormatError = errors.New("input data should be string")
func(sf *SumFilter) Process(data Request) (Response, error) { elems, ok := data.([]int) if !ok { returnnil, SumFilterWrongFormatError } ret := 0 for _, elem := range elems { ret += elem } return ret, nil }
// straight_pipeline.go package pipefilter
// StraightPipeline is composed of the filters, and the filters are piled as a straight line. type StraightPipeline struct { Name string Filters *[]Filter }
// NewStraightPipeline create a new StraightPipelineWithWallTime funcNewStraightPipeline(name string, filters ...Filter) *StraightPipeline { return &StraightPipeline{ Name: name, Filters: &filters, } }
// Process is to process the coming data by the pipeline func(f *StraightPipeline) Process(data Request) (Response, error) { var ret interface{} var err error for _, filter := range *f.Filters { ret, err = filter.Process(data) if err != nil { return ret, err } data = ret } return ret, err }
type Agent struct { collectors map[string]Collector evtBuf chan Event cancel context.CancelFunc ctx context.Context state int }
func(agt *Agent) EventProcessGroutine() { var evtSeg [10]Event for { for i := 0; i < 10; i++ { select { case evtSeg[i] = <-agt.evtBuf: case <-agt.ctx.Done(): return } } fmt.Println(evtSeg) } }