当前位置: 首页 > news >正文

Go:goroutine 和通道

goroutine

f() 	// 等待 f() 返回
go f()  // 新建一个调用 f() 的 goroutine,不用等待

在 Go 语言里,goroutine 是并发执行的活动单元。与顺序执行程序不同,在有多个 goroutine 的并发程序中,不同函数可同时执行。程序启动时,首个调用main函数的 goroutine 为主 goroutine,新的 goroutine 通过go语句创建,go语句在函数或方法调用前加上go关键字,且go语句本身执行立即完成,不等待函数执行结束。

package mainimport ("fmt""time"
)func main() {go sinner(100 * time.Microsecond)const n = 45fibN := fib(n)fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}func sinner(delay time.Duration) {for {for _, r := range `-\|/` {fmt.Printf("\r%c", r)time.Sleep(delay)}}
}func fib(x int) int {if x < 2 {return x}return fib(x - 1) + fib(x - 2)
}

示例:主 goroutine 计算第 45 个斐波那契数(使用低效递归算法,耗时较长),同时启动另一个 goroutine 运行spinner函数,spinner函数不断打印旋转字符提示程序在运行。当fib(45)计算完成,main函数输出结果后返回,此时所有 goroutine 强制终结,程序退出。除程序正常返回或退出外,无法从外部直接停止一个 goroutine,但可通过通信让其自行停止。程序中两个并发活动(指示器显示和斐波那契数计算)相互独立运行

示例:并发时钟服务器

顺序时钟服务器

func main() {listener, err := net.Listen("tcp", "localhost:8000")if err != nil {log.Fatal(err)}for {conn, err := listener.Accept()if err != nil {log.Print(err)continue}handleConn(conn)}
}func handleConn(conn net.Conn) {defer conn.Close()for {_, err := io.WriteString(conn, time.Now().Format("15:04:05\n"))if err != nil {return}time.Sleep(1 * time.Second)}
}
  • 实现原理:使用net.Listen创建net.Listenerlocalhost:8000监听 TCP 连接,Accept方法阻塞等待连接,接收到连接后由handleConn函数处理。handleConn函数在循环中,每秒通过time.Now获取当前时间,利用net.Conn满足io.Writer接口的特性,将时间格式化后写入连接发送给客户端。若写入失败(如客户端断开连接),则关闭连接,继续等待下一个连接请求。
func main() {conn, err := net.Dial("tcp", "localhost:8000")if err != nil {log.Fatal(err)}defer conn.Close()mustCopy(os.Stdout, conn)
}func mustCopy(dst io.Writer, src io.Reader) {if _, err := io.Copy(dst, src); err != nil {log.Fatal(err)}
}
  • 客户端连接:客户端可使用telnetncnetcat)工具或自定义的基于net.Dial实现的 Go 版netcat程序连接服务器。顺序服务器一次只能处理一个客户端请求,第二个客户端需等第一个结束才能正常工作。

并发时钟服务器

func main() {listener, err := net.Listen("tcp", "localhost:8000")if err != nil {log.Fatal(err)}for {conn, err := listener.Accept()if err != nil {log.Print(err)continue}go handleConn(conn)}
}

在顺序时钟服务器基础上,只需在调用handleConn函数处添加go关键字,使其在新的 goroutine 内执行,就能让服务器并发处理多个客户端连接,多个客户端可同时接收到时间信息。

示例:并发回声服务器

简单回声服务器实现

func handleConn(c net.Conn) {io.Copy(c, c) // 注意: 忽略错误c.Close()
}

普通回声服务器可通过handleConn函数,利用io.Copy将读取到的内容写回客户端,之后关闭连接,但此版本忽略错误处理。

模拟真实回声的回声服务器

func echo(c net.Conn, shout string, delay time.Duration) {fmt.Fprintln(c, "\t", strings.ToUpper(shout))time.Sleep(delay)fmt.Fprintln(c, "\t", shout)time.Sleep(delay)fmt.Fprintln(c, "\t", strings.ToLower(shout))
}func handleConn(c net.Conn) {input := bufio.NewScanner(c)for input.Scan() {echo(c, input.Text(), 1*time.Second)}// 注意: 忽略input.Err()中可能的错误c.Close()
}
  • echo函数:定义echo函数,接收net.Conn 、字符串和延迟时间参数,先将字符串转为大写输出,延迟后输出原字符串,再延迟后转为小写输出,模拟真实回声效果。
  • handleConn函数:在handleConn函数中,使用bufio.NewScanner读取客户端输入,循环调用echo函数处理输入内容,同样忽略了输入错误处理,处理完后关闭连接。
func main() {conn, err := net.Dial("tcp", "localhost:8000")if err!= nil {log.Fatal(err)}defer conn.Close()go mustCopy(os.Stdout, conn)mustCopy(conn, os.Stdin)
}
  • netcat程序:客户端程序使用net.Dial连接服务器,通过两个mustCopy调用,一个将标准输入发送到服务器,另一个将服务器回复输出到标准输出,主 goroutine 处理输入,另一个 goroutine 处理输出,输入结束时程序停止。

实现并发回声效果

func handleConn(c net.Conn) {input := bufio.NewScanner(c)for input.Scan() {go echo(c, input.Text(), 1*time.Second)}// 注意: 忽略input.Err()中可能的错误c.Close()
}

为使回声真正并发,在调用echo函数时添加go关键字,让每个回声处理在单独的 goroutine 中执行,实现多个回声在时间上相互重合的并发效果。在添加go关键字实现并发时,要考虑net.Conn并发调用的安全性。

通道

通道是 Go 程序中 goroutine 之间的通信机制,可看作是特定类型元素的导管,如chan int表示存放int类型元素的通道。它是 goroutine 间发送特定值的通信桥梁,是并发编程中实现同步和数据传递的重要工具。

ch := make(chan int)
  • 创建方式:使用内置的make函数创建通道,如ch := make(chan int) 。通道和map类似,是引用类型,复制或传递时传递的是引用,指向同一份数据结构,其零值为nil 。同种类型通道可用==比较,若为同一通道引用则比较值为true ,也可与nil比较 。
ch <- x  // 发送语句
x = <-ch // 接收语句
<-ch 	 // 丢弃
  • 发送与接收:通道有发送(send )和接收(receive )两个主要操作,使用<-操作符。发送语句如ch <- x ;接收语句如x = <-ch ,也可丢弃接收结果,如<-ch
close(ch)
  • 关闭操作:可使用close函数关闭通道,关闭后发送操作会导致宕机,接收操作会获取已发送的值,通道为空时接收立即完成并获取元素类型零值 。

类型

ch = make(chan int)    // 无缓冲通道
ch = make(chan int, 0) // 无缓冲通道
ch = make(chan inr, 3) // 容量为3的缓冲通道

使用简单make调用创建的是无缓冲通道,make还可接受第二个可选参数表示通道容量。容量为 0 时是无缓冲通道,如ch = make(chan int)ch = make(chan int, 0) ;指定容量(如ch = make(chan int, 3) )则为缓冲通道 ,下面会再进行介绍缓冲通道 。

无缓冲通道

无缓冲通道上的发送操作会阻塞,直到有另一个 goroutine 在对应通道上执行接收操作,此时值传送完成,两个 goroutine 可继续执行;反之,接收操作先执行也会阻塞,直到有 goroutine 发送值。这种通信机制使发送和接收的 goroutine 同步化,所以无缓冲通道也叫同步通道 。

func main() {conn, err := net.Dial("tcp", "localhost:8000")if err!= nil {log.Fatal(err)}done := make(chan struct{})go func() {io.Copy(os.Stdout, conn) // 注意: 忽略错误log.Println("done")done <- struct{}{} // 指示主 goroutine}()mustCopy(conn, os.Stdin)conn.Close()<-done // 等待后台 goroutine 完成
}

示例:客户端程序基础上,通过创建无缓冲通道done来同步主 goroutine 和后台 goroutine。主 goroutine 等待从done通道接收值,后台 goroutine 在完成任务(如io.Copy操作)后,向done通道发送一个值,主 goroutine 接收到值后才继续执行后续操作(如关闭连接 )。当用户关闭标准输入流,mustCopy返回,后台 goroutine 记录消息并向done通道发送值,主 goroutine 接收到值后关闭连接,保证程序按预期顺序执行 。

消息与事件的概念

  • 通过通道发送消息时,不仅消息的值重要,通信本身及发生时间也很关键,这种用于同步、不携带额外信息的消息称为事件。可使用struct{}元素类型的通道来强调同步功能,boolint类型通道也可以 。

管道

管道是利用通道连接 goroutine 的一种方式,使一个 goroutine 的输出成为另一个的输入。通过通道在多个包含无限循环的 goroutine 间进行全生命周期通信 。

func main() {naturals := make(chan int)squares := make(chan int)// countergo func() {for x := 0; x < 100; x++ {naturals <- x}close(naturals)}()// squarergo func() {for x := range naturals {squares <- x * x}close(squares)}()// printer (在主 goroutine 中)for x := range squares {fmt.Println(x)}
}

image.png

  • 示例:程序由countersquarerprinter三个 goroutine 和两个通道组成。counter产生自然数序列并通过naturals通道发送给squarersquarer计算平方后通过squares通道发送给printerprinter输出结果 。

  • 通道关闭处理:当发送方知道无更多数据发送时,可关闭通道告知接收方停止等待 。如counter在发送一定数量元素(如 100 个 )后关闭naturals通道,squarer接收到通道关闭信号后处理并关闭squares通道 。接收操作有变体,可返回接收值和表示是否成功的布尔值 ,利用此特性可判断通道是否关闭并处理 。也可使用range循环语法,在通道接收完所有值后自动结束循环 ,简化通道关闭和数据处理逻辑 。

  • 关闭通道的必要性:不是必须的,仅在需通知接收方数据发送完毕时进行 。通道回收由垃圾回收器根据可访问性决定,与文件关闭操作不同 。关闭已关闭通道会导致宕机,关闭通道还可作为广播机制 。

缓冲通道

ch = make(chan string, 3)
  • 创建:缓冲通道通过make函数创建,可设置容量参数,如ch = make(chan string, 3)创建一个能容纳三个字符串的缓冲通道 。
  • 发送与接收操作:发送操作向通道队列尾部插入元素,接收操作从头部移除元素 。通道满时发送操作阻塞,通道空时接收操作阻塞,部分填满时发送和接收操作不阻塞 。可通过cap函数获取通道容量,len函数获取当前元素个数 。
func mirroredQuery() string {responses := make(chan string, 3)go func() { responses <- request("asia.gopl.io") }()go func() { responses <- request("europe.gopl.io") }()go func() { responses <- request("americas.gopl.io") }()return <-responses // return the quickest response
}

示例:它并发向三个镜像地址发送请求,将响应通过缓冲通道发送,只接收最早返回的响应 。缓冲通道在并发场景中解耦发送和接收 goroutine 的作用 。同时使用无缓冲通道可能导致 goroutine 泄漏(长时间阻塞无法结束 ),要合理选择无缓冲或缓冲通道以及设置缓冲通道容量。

缓冲通道与性能

以蛋糕店厨师工作场景类比,无缓冲通道类似厨师需等待下一个接收,同步性强;缓冲通道可容纳一定量任务,容量为 1 时可消除速率差异,容量更大可应对更大速率波动 。还提到可通过创建额外 goroutine 辅助处理任务,以优化程序性能 。

并行循环

  • 初步并行:直接在循环中添加go关键字启动 goroutine 进行并行处理,但此版本存在问题,外层 goroutine 可能在内部 goroutine 完成任务前就返回,导致任务未真正完成 。
  • 使用通道同步:创建无缓冲通道,内层 goroutine 完成任务时向通道发送信号,外层 goroutine 通过接收通道信号计数,等待所有任务完成 。此过程需注意循环变量在闭包中的使用问题,避免 goroutine 读取到错误的变量值 。
  • 处理错误返回:为使外层 goroutine 能获取内层 goroutine 执行函数的错误,创建用于接收错误的通道,内层 goroutine 将错误发送到通道,外层 goroutine 接收并处理错误 。但简单处理方式可能导致 goroutine 泄漏(如遇到错误时未正确结束 goroutine ),可通过使用有足够容量的缓冲通道或其他方案解决 。
  • 返回处理结果:进一步改进,创建缓冲通道,内层 goroutine 将生成的内容及错误信息发送到通道,外层 goroutine 接收并处理 。

使用sync.WaitGroup同步

func makeThumbnails6(filenames <-chan string) int64 {sizes := make(chan int64)var wg sync.WaitGroup // 工作goroutine的个数for f := range filenames {wg.Add(1)go func(f string) {defer wg.Done()thumb, err := thumbnail.ImageFile(f)if err != nil {log.Println(err)return}info, _ := os.Stat(thumb) // 可以忽略错误sizes <- info.Size()}(f)}// closergo func() {wg.Wait()close(sizes)}()var total int64for size := range sizes {total += size}return total
}

为更好地控制和等待所有 goroutine 结束,引入sync.WaitGroup 。在启动每个工作 goroutine 前使用Add方法增加计数,工作 goroutine 结束时调用Done方法减少计数,主 goroutine 通过Wait方法阻塞等待计数为 0,即所有工作 goroutine 完成 。同时,结合通道传递处理结果(如文件大小 ),实现更健壮的并行处理 。

使用 select 多路复用

select {
case <-ch1://...
case x := <-ch2://...use x...
case ch3 <- y://...
default://...
}

select语句用于在多个通道操作中进行选择,实现多路复用。它类似switch语句,有一系列情况和可选的默认分支,每个情况指定一次通道上的发送或接收操作及关联代码块 。select会一直等待,直到有一个通道操作可执行,然后执行对应语句,其他未满足条件的操作不会执行;若没有对应情况且无默认分支,select将永远等待 。

示例

func main() {fmt.Println("Commencing countdown.")tick := time.Tick(1 * time.Second)for countdown := 10; countdown > 0; countdown-- {fmt.Println(countdown)<-tick}launch()
}
  • 火箭发射倒计时:以火箭发射倒计时为例,最初的倒计时程序通过time.Tick函数按固定时间间隔发送事件进行倒计时 。为实现可取消发射,启动一个 goroutine 从标准输入读取字符,若成功则向abort通道发送值 。使用select语句等待time.Tick通道的计时事件或abort通道的取消事件 。还可结合time.After函数设置超时,若在指定时间(如 10s )内未收到取消事件则开始发射 。
ch := make(chan int, 1)
for i := 0; i < 10; i++ {select {case x := <-ch:fmt.Println(x) // "0" "2" "4" "6" "8"case ch <- i:}
}
  • 通道状态操作选择:对于缓冲区大小为 1 的通道ch ,通过select语句根据通道状态(空或满 )及循环变量i的奇偶性,决定是从通道接收还是向通道发送数据 。当多个情况同时满足时,select随机选择一个执行 。
func main() {//...创建中止通道...fmt.Println("Commencing countdown.  Press return to abort.")tick := time.Tick(1 * time.Second)for countdown := 10; countdown > 0; countdown-- {fmt.Println(countdown)select {case <-tick:// 什么操作也不执行case <-abort:fmt.Println("Launch aborted!")return}}launch()
}
  • 带输出的倒计时:改进火箭发射倒计时程序,在每次迭代中使用select语句使程序等待 1s 以检测中止事件,同时输出倒计时数值 。

注意事项

ticker := time.NewTicker(1 * time.Second)
<-ticker.C // 从ticker的通道接收
ticker.Stop() // 造成ticker的goroutine终止select {
case <-abort:fmt.Printf("Launch aborted!\n")return
default:// 不执行任何操作
}
  • time.Tick函数使用可能导致的 goroutine 泄漏问题,因为即使不再接收其通道事件,对应的 goroutine 仍在运行 。建议使用time.NewTicker创建定时器,使用完后通过Stop方法终止相关 goroutine 。
  • select语句可实现非阻塞通信,通过添加默认分支,在没有通道操作就绪时立即执行默认代码块 ,重复此操作可实现对通道轮询 。
  • nil通道上的操作永远不会被选中 ,可利用这一特性开启或禁用特定情况 。

取消

在一些场景下,需要让 goroutine 停止当前任务,如 Web 服务器处理客户端请求时客户端断开连接 。但直接终止一个 goroutine 会使共享变量状态不确定,且难以准确知道有多少 goroutine 在工作,简单通过通道发送固定数量事件来取消多个 goroutine 存在计数不准确、操作卡住等问题 。

基于通道关闭的广播式取消机制

var done = make(chan struct{})
func cancelled() bool {select {case <-done:return truedefault:return false}
}

利用通道关闭特性实现广播式取消 。创建取消通道done ,不向其发送值,而是通过关闭它来表明取消操作 。定义cancelled函数,使用select语句检测done通道是否关闭 ,若关闭则返回true ,否则返回false 。同时,启动一个 goroutine 读取标准输入,一旦检测到输入(如用户按回车键 ),就关闭done通道广播取消事件 。

// 当检测到输入时取消遍历
go func() {os.Stdin.Read(make([]byte, 1)) // 读一个字节close(done)
}()for {select {case <-done:// 耗尽fileSizes以允许已有的goroutine结束for range fileSizes {// 不执行任何操作}returncase size, ok := <-fileSizes://...}
}
  • 主 goroutine:在主 goroutine 的select语句中添加从done通道接收的情况 。当接收到取消信号时,先耗尽fileSizes通道中的值(防止卡住 ),然后返回 。
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {defer n.Done()if cancelled() {return}for _, entry := range dirents(dir) {//...}
}
  • walkDir函数walkDir函数开始时轮询取消状态,若检测到取消(cancelledtrue ),则立即返回 ,不再执行后续操作 。在walkDir循环中也可进行取消状态轮询 ,避免取消后创建新的 goroutine 。
func dirents(dir string) []os.FileInfo {select {case sema <- struct{}{}: // 获取令牌case <-done:return nil // 取消}defer func() { <-sema }() // 释放令牌//...read directory...
}
  • dirents函数:在dirents函数中,使用select语句同时处理获取信号量令牌和检测取消通道done ,若检测到取消则直接返回nil ,实现更快速的取消响应 。通过这些措施,当取消事件发生时,后台 goroutine 能迅速停止,main函数返回,程序退出。

alkDir函数**:walkDir函数开始时轮询取消状态,若检测到取消(cancelledtrue ),则立即返回 ,不再执行后续操作 。在walkDir`循环中也可进行取消状态轮询 ,避免取消后创建新的 goroutine 。

func dirents(dir string) []os.FileInfo {select {case sema <- struct{}{}: // 获取令牌case <-done:return nil // 取消}defer func() { <-sema }() // 释放令牌//...read directory...
}
  • dirents函数:在dirents函数中,使用select语句同时处理获取信号量令牌和检测取消通道done ,若检测到取消则直接返回nil ,实现更快速的取消响应 。通过这些措施,当取消事件发生时,后台 goroutine 能迅速停止,main函数返回,程序退出。

参考资料:《Go程序设计语言》

相关文章:

  • 按键精灵安卓/ios脚本辅助工具开发教程:如何把界面配置保存到服务器
  • Linux——共享内存
  • 2025年第十六届蓝桥杯省赛JavaB组真题回顾
  • 威锋VL822-Q7T10GHUB芯片适用于扩展坞显示器
  • Java异常报错:java.nio.channels.UnresolvedAddressException
  • 在 IntelliJ IDEA 中开发 Java Web 项目时,遇到包内明明存在某个类但类名仍然爆红(显示红色错误提示)
  • 广汽滴滴Robotaxi首次亮相,中国自动驾驶加速领跑新赛道
  • 明天见!奇兵到家将携“偏远服务第一网”登陆成都建博会
  • Linux常见指令介绍上(入门级)
  • ubuntu 2204 安装 vcs 2023
  • LangChain4j (3):集成 DeepSeek 大模型的Java程序
  • 质量问题频发,如何提升源头把控
  • classpath “com.android.tools.build:gradle:8.1.0“ 和 Gradle插件版本8.5 有什么关系
  • KMP算法核心笔记:前后缀本质与nextval实现
  • GitHub配置密钥
  • 环境变量概念以及获取环境变量(linux下解析)
  • C#Winform程序将子窗体嵌入容器方法
  • 15、nRF52xx蓝牙学习(串口输入与回环)
  • code review时线程池的使用
  • Oracle数据库数据编程SQL<00. 课外关注:rownum、rowid、level、row_number 对比详解与实战>
  • 青岛:今年计划新增城镇住房约5.77万套,推动房地产市场回稳
  • 宫崎骏电影《幽灵公主》4K修复版定档五一
  • 中国泳协:新奥运周期竞争激烈,“三从一新”全力提升实力
  • 龚正会见巴西里约热内卢州州长克劳迪奥·卡斯特罗
  • 伊朗外长访华将会见哪些人?讨论哪些议题?外交部回应
  • 大卫·第艾维瑞谈历史学与社会理论②丨马克斯·韦伯与历史学研究