Go Channel Practice I
给一个超大文件,一次读 4k 的 chunk,每块求 sha1 值,输出 sha1,尽可能用到 CPU。
定义两个 struct 来处理 chunk
type ChunkOutput struct { Index int SHA1 string } type ChunkInput struct { Index int Content []byte }
By chunk 读文件
file, err := os.Open("file_read") if err != nil { panic(fmt.Sprintf("cannot open file, err: ", err.Error())) } defer file.Close() for { n, err = file.Read(buf) if err != nil { if err != io.EOF { panic(fmt.Sprintf("failed to read file, err: ", err.Error())) } break } }
worker 处理每个 chunk
func getSha(index int, buf []byte) *ChunkOutput { h := sha1.New() return &ChunkOutput{ Index: index, SHA1: fmt.Sprintf("%x", h.Sum(buf)), } } func worker(input chan *ChunkInput, output chan *ChunkOutput, quit chan bool) { for { select { case i := <-input: chunk := getSha(i.Index, i.Content) output <- chunk case <-quit: return } } }
collector 收集通过 channel 传来的结果
func collect(output chan *ChunkOutput, count int, quit chan bool, quit2 chan bool) { for i := 0; i <= count; i++ { select { case o := <-output: fmt.Println("o: ", o) } } for i := 0; i <= count; i++ { quit <- true } quit2 <- true }
在 main 里先建立 worker,再跑 collector,再读文件传给 input, output 这两个 channel 里,注意 quit 的处理方式。
package main import ( "crypto/sha1" "fmt" "io" "os" ) const BufferSize = 40 type ChunkOutput struct { Index int SHA1 string } type ChunkInput struct { Index int Content []byte } func main() { result := Read() fmt.Println("result: ", result) } func Read() []string { file, err := os.Open("file_read") if err != nil { panic(fmt.Sprintf("cannot open file, err: ", err.Error())) } defer file.Close() info, err := file.Stat() if err != nil { panic(fmt.Sprintf("cannot read stat, err: ", err.Error())) } count := int(info.Size() / BufferSize) buf := make([]byte, BufferSize) input := make(chan *ChunkInput, 40) output := make(chan *ChunkOutput, 40) quit := make(chan bool) quit2 := make(chan bool) for i := 0; i <= count; i++ { go worker(input, output, quit) } go collect(output, count, quit, quit2) index := 0 var n = BufferSize for n == BufferSize { n, err = file.Read(buf) if err != nil { if err != io.EOF { panic(fmt.Sprintf("failed to read file, err: ", err.Error())) } break } copied := make([]byte, BufferSize) copy(copied, buf) input <- &ChunkInput{Index: index, Content: copied} index++ } <-quit2 return []string{} } func collect(output chan *ChunkOutput, count int, quit chan bool, quit2 chan bool) { for i := 0; i <= count; i++ { select { case o := <-output: fmt.Println("O: ", o) } } for i := 0; i <= count; i++ { quit <- true } quit2 <- true } func getSha(index int, buf []byte) *ChunkOutput { h := sha1.New() return &ChunkOutput{ Index: index, SHA1: fmt.Sprintf("%x", h.Sum(buf)), } } func worker(input chan *ChunkInput, output chan *ChunkOutput, quit chan bool) { for { select { case i := <-input: chunk := getSha(i.Index, i.Content) output <- chunk case <-quit: return } } }