ubuntu

ubuntu golang并发编程实践

小樊
50
2025-11-22 23:09:50
栏目: 编程语言

Ubuntu 下 Go 并发编程实践指南

一 环境准备与工具链

二 核心原语速览

三 实战示例 并发任务处理与 Worker Pool

package main

import (
	"context"
	"fmt"
	"runtime"
	"sync"
	"time"
)

type Task struct {
	ID int
}

type Result struct {
	TaskID int
	Err    error
}

// 模拟耗时任务
func (t Task) Process(ctx context.Context) (Result, error) {
	select {
	case <-time.After(100 * time.Millisecond): // 模拟IO
		return Result{TaskID: t.ID}, nil
	case <-ctx.Done():
		return Result{TaskID: t.ID}, ctx.Err()
	}
}

// 固定大小 Worker Pool
func WorkerPool(ctx context.Context, tasks []Task, concurrency int, timeout time.Duration) []Result {
	var wg sync.WaitGroup
	results := make(chan Result, len(tasks))
	sem := make(chan struct{}, concurrency) // 并发信号量

	for _, task := range tasks {
		select {
		case <-ctx.Done():
			return nil
		default:
		}

		wg.Add(1)
		sem <- struct{}{} // 获取令牌
		go func(t Task) {
			defer wg.Done()
			defer func() { <-sem }() // 释放令牌

			ctx, cancel := context.WithTimeout(ctx, timeout)
			defer cancel()

			res, err := t.Process(ctx)
			select {
			case results <- res:
			case <-ctx.Done():
			}
		}(task)
	}

	// 等待全部完成或取消
	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	select {
	case <-done:
	case <-ctx.Done():
	}

	close(results)
	var out []Result
	for r := range results {
		out = append(out, r)
	}
	return out
}

func main() {
	// 显式设置 P 数量(容器/虚拟机中尤为重要)
	runtime.GOMAXPROCS(runtime.NumCPU())

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	tasks := make([]Task, 20)
	for i := 0; i < len(tasks); i++ {
		tasks[i] = Task{ID: i + 1}
	}

	start := time.Now()
	results := WorkerPool(ctx, tasks, 5, 500*time.Millisecond)
	elapsed := time.Since(start)

	var ok, fail int
	for _, r := range results {
		if r.Err != nil {
			fail++
		} else {
			ok++
		}
	}
	fmt.Printf("完成: %d, 失败: %d, 耗时: %v\n", ok, fail, elapsed)
}

四 常见陷阱与排查清单

五 性能与工程化建议

0
看了该问题的人还看了