Go Channel Practice II
给一个超大文件,一次读 4k 的 chunk,每块求 sha1 值,输出 sha1,尽可能用到 CPU。
为了处理 goroutine 退出的问题,使用 sync.WaitGroup。并且处理一个 chunk 开一个 goroutine。
wg.Add(1) go func(index int) { defer wg.Done() worker(&ChunkInput{Index: index, Content: copied}, output) }(index) index++ ... ... wg.Wait()
index 需要当作参数传进去,否则就是老生常谈的错误。
collect 还是需要一个退出 channel 来控制
func collect(output chan *ChunkOutput, quit chan bool) { for { select { case o := <-output: fmt.Println("O: ", o) case <-quit: return } } }
worker 则只需要考虑处理数据并写入 output 的 channel 即可,不需要 quit 来控制,都交给 wg 来处理。
func worker(i *ChunkInput, output chan *ChunkOutput) { chunk := getSha(i.Index, i.Content) output <- chunk }
另外可以考虑限制 goroutine 的数量,可以创建一个 limit 的 buffer channel,一开始写满,开一次 worker 的 goroutine 之前读出来一个,worker 结束之后再写回。
limit := make(chan bool, 3) for i := 0; i < 3; i++ { limit <- true } ... ... wg.Add(1) <-limit go func(index int) { defer wg.Done() defer func() { limit <- true }() worker(&ChunkInput{Index: index, Content: copied}, output) }(index) index++ } wg.Wait()
完整实现
package main import ( "crypto/sha1" "fmt" "io" "os" "sync" ) const BufferSize = 40 type ChunkOutput struct { Index int SHA1 string } type ChunkInput struct { Index int Content []byte } func main() { result := Read() fmt.Println("result: ", result) } func Read() []string { file, err := os.Open("file_read") if err != nil { panic(fmt.Sprintf("cannot open file, err: ", err.Error())) } defer file.Close() var wg sync.WaitGroup buf := make([]byte, BufferSize) output := make(chan *ChunkOutput, 40) quit := make(chan bool) limit := make(chan bool, 3) for i := 0; i < 3; i++ { limit <- true } go collect(output, quit) index := 0 var n = BufferSize for n == BufferSize { n, err = file.Read(buf) if err != nil { if err != io.EOF { panic(fmt.Sprintf("failed to read file, err: ", err.Error())) } break } copied := make([]byte, BufferSize) copy(copied, buf) wg.Add(1) <-limit go func(index int) { defer wg.Done() defer func() { limit <- true }() worker(&ChunkInput{Index: index, Content: copied}, output) }(index) index++ } wg.Wait() return []string{} } func collect(output chan *ChunkOutput, quit chan bool) { for { select { case o := <-output: fmt.Println("O: ", o) case <-quit: return } } } func getSha(index int, buf []byte) *ChunkOutput { h := sha1.New() return &ChunkOutput{ Index: index, SHA1: fmt.Sprintf("%x", h.Sum(buf)), } } func worker(i *ChunkInput, output chan *ChunkOutput) { chunk := getSha(i.Index, i.Content) output <- chunk }