Golang ( 4 ) - 并发编程

一、Go协程

只需要在方法前加一个go关键字就可以让一个普通方法协程化。以下面的代码为例,一般同步阻塞的编码方式下会按顺序打印012然后再输出Finish. 示例中启动了3个协程之后主进程会继续往下执行,不会等待函数返回,大概率会先看到Finish输出,然后看到012或者210。

即加上go关键字之后程序不会同步阻塞主进程,协程的执行速度跟程序复杂性关系,无法保证先启动的协程先执行完毕。

func main() {
	for i := 0; i < 3; i++ {
		go func(v int) {
			fmt.Println(v)
		}(i)
	}
	fmt.Println("Finish.")
	time.Sleep(time.Second)
}

通常情况下同步的逻辑方式书写是最方便的,无须考虑程序逻辑的先后关系,上面示例最后一行休眠1秒,确保协程可以执行完毕,但正常逻辑下无法确保1秒内协程能执行完毕,也不会只执行一个打印这么简单,通常还需要能获取到函数的返回,所以这里面存在两个主要问题:

Channel正是为协程间通信而产生,接下来分开看看同步等待、超时、通信的问题。

二、 Channel简介

通道(channel)提供了协程之间的通信方式以及运行同步机制

2.1 通道定义

Channel是Go中的一个核心类型,Goroutine通过它发送或者接收数据。就像mapslice数据类型一样,channel必须先创建再使用:

ch := make(chan int)

操作符是箭头 <-(箭头的指向就是数据的流向)。

ch <- v    // 发送值v到Channel ch中
v := <-ch  // 从Channel ch中接收数据,并将数据赋值给v

chan按读写方向可以分为双向Channel和单向Channel(只读Channel和只写Channel),根据是否缓冲可以分为带缓冲的Channel和无缓冲Channel。

2.2 select语句

select 是Go中的一个控制结构,类似于用于通信的switch语句。每个case必须是一个通信操作,要么是发送要么是接收。select随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有 case可运行。一个默认的子句应该总是可运行的。

基本用法

select {
case <- chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}

以下描述了 select 语句的语法:

示例

func main() {
	ch1 := make(chan int)
	ch2 := make(chan string)

	go func() {
		time.Sleep(1 * time.Second)
		ch1 <- 1
	}()

	go func() {
		time.Sleep(2 * time.Second)
		ch2 <- "Hello"
	}()

	fmt.Println("Start")
	select {
	case <-ch1:
		fmt.Println("Read From ch1")
	case <-ch2:
		fmt.Println("Read From ch2")
	default:
		fmt.Println("Read From Default")
	}
	fmt.Println("END")
}

如果select里啥都没有 select{},则会等待,达到阻塞Goroutine的目的。如果当前运行环境没有新的协程而使用该语句则会抛错。

三、协程同步

3.1 通过Channel同步

创建了一个存储10个bool类型的通道,函数执行成功向通道里写入true,执行失败向通道里写入false。启动一个循环从通道读取数据,读取10次之后程序在打印最后的结果:

true true true true true true true true true true [0 1 2 3 4 5 6 7 8 9]

func main() {
	ch := make(chan bool, 10)

	data := make([]int, 10)
	for i := 0; i < 10; i++ {
		go func(idx int) {
			defer func() {
				if err := recover(); err != nil {
					ch <- false
				}
			}()
			data[idx] = idx
			ch <- true
		}(i)
	}
	for j := 0; j < 10; j++ {
		fmt.Print(<-ch, " ")
	}
	fmt.Println(data)
}

3.2 通过sync同步

控制流程同步等待也可以通过sync.WaitGroup来实现,WaitGroup对象内部有一个计数器,最初从0开始,它有三个方法:

func main() {
	var wg sync.WaitGroup

	data := make([]int, 10)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(idx int, w *sync.WaitGroup) {
			defer func() {
				w.Done()
			}()
			data[idx] = idx
		}(i, &wg)
	}
	wg.Wait()
	fmt.Println(data)
}

通过这两种方法可以实现多协程的同步等待,但这会涉及到另一个超时问题,如果多个协程时间较长,主线程就一直等待了,所以还得考虑同步超时问题。

四、同步超时

这个是前面的补充,正常单个协程可以通过select + time.After实现超时控制。

func main() {
	ch := make(chan bool)
	go func() {
		time.Sleep(3 * time.Second)
		ch <- true
	}()

	select {
	case <-ch:
		fmt.Println("Read From CH")
	case <-time.After(2 * time.Second):
		fmt.Println("timeout")
	}
}

4.1 Channel同步超时

来看看3.1中多个goroutine的超时问题,如果主线程想最多只等待2s:

func main() {
	ch := make(chan bool, 10)

	data := make([]int, 10)
	for i := 0; i < 10; i++ {
		go func(idx int) {
			defer func() {
				if err := recover(); err != nil {
					ch <- false
				}
			}()
			time.Sleep(time.Duration(idx) * time.Second)//Sleep idx sec
			data[idx] = idx
			ch <- true
		}(i)
	}
	timeout := make(chan bool)
	go func() {
		time.Sleep(2 * time.Second)
		timeout <- true
	}()

	for{
		select {
		case t := <- ch:
			fmt.Println(t)
		case <- timeout:
			fmt.Println("Timeout")
			goto EXIT
		}
	}
EXIT:
	fmt.Println(data)
}

4.2 sync同步超时

3.2的超时也借助了Channel来实现。

func main() {
	var wg sync.WaitGroup

	data := make([]int, 10)
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(idx int) {
			defer func() {
				wg.Done()
			}()
			time.Sleep(time.Duration(idx) * time.Second)
			data[idx] = idx
			fmt.Println(idx)
		}(i)
	}
	wc := make(chan bool)
	go func() {
		wg.Wait()
		wc <- true
	}()
	select {
	case <- time.After(3 * time.Second):
		fmt.Println("Timeout")
	case <- wc:
		fmt.Println("Wg Wait")
	}
	fmt.Println(data)
}

五、协程通信

协程之间数据交互上主要有两种方式,一种为全局变量然后通过锁来控制原子性,另一种则是通过channel来进行通信。

5.1 全局变量

启动10个协程来执行1加到10的操作,s变量为协程共享,所以需要加锁才会正确输出55,若去掉锁的三行代码,则会出现非55的情况。

func sum() int {
	s := 0
	var wg sync.WaitGroup
	var mutex sync.Mutex
	wg.Add(10)
	for i := 1; i <= 10; i++ {
		go func(i int) {
			mutex.Lock()
			s += i
			mutex.Unlock()
			wg.Done()
		}(i)
	}
	wg.Wait()
	return s
}
func main() {
	//执行100次结果都是55
	for i := 0; i < 100; i++ {
		fmt.Print(sum(), " ")
	}
}

5.2 通过Channel来通信

将结果写到通道中,从通道中读取结果进行累加。

func sum() int {
	ch := make(chan int)
	for i := 1; i <= 10; i++ {
		go func(i int) {
			ch <- i
		}(i)
	}
	s := 0
	for i := 0; i < 10; i++ {
		s += <-ch
	}
	return s
}
func main() {
	for i := 0; i < 100; i++ {
		fmt.Print(sum(), " ")
	}
}

六、并发控制

6.1 先处理再消费

这里为等待所有任务处理完成后再对结果进行处理,通过sync.WaitGroup来确认同步阻塞,通过有缓冲Channel的阻塞特性来控制最大执行的并发数,在启动Goroutine之前往通道里写值,处理完成之后再读取。处理成功后将结果写到result通道中,这里模拟了一个延时与错误操作,也就是result通道可能写不满,所以在任务执行完毕后做了close(result)操作,避免后续遍历通道时阻塞。

var (
	taskNum     = 10
	maxParalNum = 3
)

func main() {
	wg := sync.WaitGroup{}
	limit := make(chan struct{}, maxParalNum)
	result := make(chan int, taskNum)

	task := func() (int, error) {
		time.Sleep(time.Second * 2)
		if rand.Intn(3) == 1 {
			return 0, fmt.Errorf("task error")
		}
		return 1, nil
	}

	for i := 1; i <= taskNum; i++ {
		wg.Add(1)
		limit <- struct{}{}
		go func(i int) {
			fmt.Println("start task", i, time.Now())
			defer func() {
				<-limit
				wg.Done()
			}()
			if t, err := task(); err == nil {
				result <- t
			}
		}(i)
	}
	wg.Wait()
	close(result)
	for res := range result {
		fmt.Printf("%+v", res)
	}
}

6.2 边处理边消费

如果想限制并发的同时,一边处理一边消费。则可以调整为发送时起新的协程去发送,通过limit来控制发送的数量,根据任务次数来读取,不过这就要求执行任务时需往通道里写入对应数量的结果。

var (
	taskNum     = 10
	maxParalNum = 3
)

type data struct {
	code int
	res  int
}

func main() {
	limit := make(chan struct{}, maxParalNum)
	result := make(chan data, taskNum)

	task := func() (data, error) {
		time.Sleep(time.Second * 2)
		if rand.Intn(3) == 1 {
			return data{}, fmt.Errorf("task error")
		}
		return data{code: 1}, nil
	}
	go func() {
		for i := 1; i <= taskNum; i++ {
			limit <- struct{}{}
			go func(i int) {
				fmt.Println("start task", i, time.Now())
				defer func() {
					<-limit
				}()
				t, _ := task()
				result <- t
			}(i)
		}
	}()

	for i := 0; i < taskNum; i++ {
		res := <-result
		fmt.Printf("%+v\n", res)
	}
}

七、Context

用于协程之间传递上下文信息,包含取消信号、超时信号、传值。context包中定义了一个context.Context结构体,可通过两种方式获取:

context.Background()
context.TODO()

支持四种使用方式,使用时需要传入context.Context对象并返回新的Context对象,其中取消、超时会返回一个取消函数。

7.1 WithCancel

取消信号,返回context.Context和取消函数CancelFunc,调用CancelFunc则会终止调用树上协程的执行。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

7.2 WithDeadline

超时截止时间,设置具体的截止时间点。

WithDeadline(parent Context, d time.Time) (Context, CancelFunc)

7.3 WithTimeout

超时信号,和WithDeadline的相似,时间点从WithDeadline的具体时间点变为从当前时间开始的相对时间。

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

7.4 WithValue

键值,可以在协程之间进行简单的数据传递。

func WithValue(parent Context, key, val interface{}) Context

八、并发库

8.1 errgroup

1、通过errgroup来执行通过方法,最后同步等待执行完成,从示例看少了Add/Done的操作。

eg := errgroup.Group{}
eg.Go(func() error {
	// logic 1
})
eg.Go(func() error {
	// logic 2
})
if err = eg.Wait(); err != nil {
	return
}

2、可以控制并发

var (
	eg    = errgroup.Group{}
	limit = make(chan struct{}, consts.Five)
	mu    sync.Mutex
)
for _, classID := range classList {
	limit <- struct{}{}
	classID := classID
	eg.Go(func() error {
		defer func() {
			<-limit
		}()

		var classInfo string // logic

		mu.Lock()
		resp[classID] = classInfo
		mu.Unlock()
		return nil
	})
}
err = eg.Wait()
return

3、还可以增加取消函数,当有函数执行失败时可以提前取消。

var (
	eg, cancel    = errgroup.WithContext(ctx)
	mu            = sync.Mutex{}
	limit         = make(chan struct{}, consts.Five)
	resp = map[string]string{}
)

for _, classID := range classList {
	limit <- struct{}{}
	classID := classID
	eg.Go(func() error {
		defer func() {
			<-limit
		}()
		select {
		case <-cancel.Done():
			return nil
		default:
			var classInfo string // logic

			mu.Lock()
			resp[classID] = classInfo
			mu.Unlock()
		}
		return nil
	})
}
if err = eg.Wait(); err != nil {
	return
}

8.2 ants

ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。

func main() {
	var wg sync.WaitGroup

	// 任务函数
	task := func() {
		fmt.Println("Running task")
		time.Sleep(1 * time.Second) // 模拟任务执行时间
		wg.Done()                   // 任务完成,计数减1
	}

	// 创建一个大小为 10 的 goroutine 池
	pool, _ := ants.NewPool(10)
	defer pool.Release()

	// 提交任务到池中
	for i := 0; i < 20; i++ {
		wg.Add(1) // 计数加1

		// 提交任务并检查错误
		err := pool.Submit(task)
		if err != nil {
			fmt.Printf("Task submission failed: %v\n", err)
			wg.Done() // 提交失败时手动调用 Done 以避免阻塞
		}
	}

	// 等待所有任务完成
	wg.Wait()

	fmt.Println("All tasks completed.")
}

这里是一个并发读取文件,然后分批处理的协程池示例:

pool, err := ants.NewPool(poolNum)
if err != nil {
	return
}
defer pool.Release()
var (
	wg = &sync.WaitGroup{}

	batchChan     = make(chan []string, 10000)
	scanner       = bufio.NewScanner(objFile)
	batchLineNum  = 100 // 100行处理一次
)
go func() {
	defer close(batchChan)
	var (
		lines  = make([]string, 0, batchLineNum)
	)
	for scanner.Scan() {
		var line = scanner.Text()
		lines = append(lines, line)
		if len(lines) >= batchLineNum {
			batchChan <- lines
			lines = make([]string, 0, batchLineNum)
		}
	}
	if len(lines) > 0 {
		batchChan <- lines
	}
	if err = scanner.Err(); err != nil {
		// log
	}
}()

processBatch := func(ctx *gin.Context, batch []string) {
	wg.Add(1)
	e := pool.Submit(func() {
		defer wg.Done()

		// logic
	})
	if e != nil {
		wg.Done()
	}
}

for batch := range batchChan {
	processBatch(ctx, batch)
}

wg.Wait()

-- EOF --
最后更新于: 2024-08-17 14:44
发表于: 2020-05-12 20:00
标签: Golang