Go Channel Practice I
Go Channel Practice III - Leaky Bucket

Go Channel Practice II

violet posted @ Jul 06, 2020 05:50:39 AM in 笔记 with tags Golang GoRoutine , 241 阅读

给一个超大文件,一次读 4k 的 chunk,每块求 sha1 值,输出 sha1,尽可能用到 CPU。

为了处理 goroutine 退出的问题,使用 sync.WaitGroup。并且处理一个 chunk 开一个 goroutine。

		wg.Add(1)
		go func(index int) {
			defer wg.Done()
			worker(&ChunkInput{Index: index, Content: copied}, output)
		}(index)
		index++
		...
		...
		wg.Wait()

index 需要当作参数传进去,否则就是老生常谈的错误。

collect 还是需要一个退出 channel 来控制

func collect(output chan *ChunkOutput, quit chan bool) {
	for {
		select {
		case o := <-output:
			fmt.Println("O: ", o)
		case <-quit:
			return
		}
	}
}

worker 则只需要考虑处理数据并写入 output 的 channel 即可,不需要 quit 来控制,都交给 wg 来处理。

func worker(i *ChunkInput, output chan *ChunkOutput) {
	chunk := getSha(i.Index, i.Content)
	output <- chunk
}

另外可以考虑限制 goroutine 的数量,可以创建一个 limit 的 buffer channel,一开始写满,开一次 worker 的 goroutine 之前读出来一个,worker 结束之后再写回。

		limit := make(chan bool, 3)
		for i := 0; i < 3; i++ {
			limit <- true
		}
		...
		...
		wg.Add(1)
		<-limit
		go func(index int) {
			defer wg.Done()
			defer func() {
				limit <- true
			}()
			worker(&ChunkInput{Index: index, Content: copied}, output)
		}(index)
		index++
	}
	wg.Wait()

完整实现

package main

import (
	"crypto/sha1"
	"fmt"
	"io"
	"os"
	"sync"
)

const BufferSize = 40

type ChunkOutput struct {
	Index int
	SHA1  string
}

type ChunkInput struct {
	Index   int
	Content []byte
}

func main() {
	result := Read()
	fmt.Println("result: ", result)
}

func Read() []string {
	file, err := os.Open("file_read")
	if err != nil {
		panic(fmt.Sprintf("cannot open file, err: ", err.Error()))
	}
	defer file.Close()
	var wg sync.WaitGroup
	buf := make([]byte, BufferSize)

	output := make(chan *ChunkOutput, 40)
	quit := make(chan bool)
	limit := make(chan bool, 3)
	for i := 0; i < 3; i++ {
		limit <- true
	}

	go collect(output, quit)

	index := 0
	var n = BufferSize

	for n == BufferSize {
		n, err = file.Read(buf)
		if err != nil {
			if err != io.EOF {
				panic(fmt.Sprintf("failed to read file, err: ", err.Error()))
			}
			break
		}
		copied := make([]byte, BufferSize)
		copy(copied, buf)
		wg.Add(1)
		<-limit
		go func(index int) {
			defer wg.Done()
			defer func() {
				limit <- true
			}()
			worker(&ChunkInput{Index: index, Content: copied}, output)
		}(index)
		index++
	}
	wg.Wait()

	return []string{}
}

func collect(output chan *ChunkOutput, quit chan bool) {
	for {
		select {
		case o := <-output:
			fmt.Println("O: ", o)
		case <-quit:
			return
		}
	}
}

func getSha(index int, buf []byte) *ChunkOutput {
	h := sha1.New()
	return &ChunkOutput{
		Index: index,
		SHA1:  fmt.Sprintf("%x", h.Sum(buf)),
	}
}

func worker(i *ChunkInput, output chan *ChunkOutput) {
	chunk := getSha(i.Index, i.Content)
	output <- chunk
}

登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter