Go指南笔记05_(并发)

请注意,本文编写于 640 天前,最后修改于 191 天前,其中某些信息可能已经过时。

Go指南

老马带你学习Go语言编程

初学笔记,如有错误.欢迎指正,不胜感激.

goroutine

goroutine是由Go运行时环境管理的轻量级线程.

go f(x,y,z)语句开启了一个goroutine运行在当前goroutine中定义的f.

goroutine在相同的地址空间中运行,因此访问共享内存必须进行同步.goroutine 开启的方式十分简单,只需要在前面加一个go 关键字即可.

package main

import (
    "fmt"
    "time"
)

func main () {
    go fmt.Println("Hello ") // 开启一个goroutine执行打印Hello
    time.Sleep(time.Second) // 让主goroutine让出1s的时间片等待新建goroutine打印Hello
    fmt.Println("World") // 主goroutine打印World
}

channel

channel是有类型的管道,可以用channel操作符<-对其发送或者接受值. "<-"表示数据流的方向.

ch <- v 表示将v送入channel ch
v := <-ch 表示从ch中接受并且赋值给v

在使用channel之前必须先创建:ch := make(chan int)

默认情况下,在另一端准备好之前,发送和接受都会堵塞.这使得goroutine可以在没有明确的锁或竞态变量的情况下进行同步.

关闭channel 使用内置close()函数.当关闭channel之后,对改channel的任何发送操作将导致panic异常,对于一个已经关闭channel接收数据,依然可以接收到已经成功发送的数据,如果没有的话,将接收零值的数据.

无缓冲channel

一个基于无缓冲的channel的发送操作将导致发送者goroutine阻塞,直到另一个goroutine在相同的channel上执行接收操作.同样的,如果接收操作先发生,那么接收goroutine也将阻塞,直到有一个goroutine在相同的channel上执行发送操作.基于无缓冲的channel的发送和接收操作将导致两个goroutine做一次同步操作,因此无缓冲的channel也称为同步channel.

package main

import (
    "fmt"
    //"time"
)

func Sum(a []int, ch chan int) {
    sum := 0
    for _,x := range a {
        sum += x
    }
    ch <- sum
}

func main () {
    a := []int{1,2,3,4,5,6,7,8,9}
    ch := make(chan int)
    go Sum(a[:len(a)/2], ch)
    go Sum(a[len(a)/2:], ch)
    x, y := <-ch, <-ch
    fmt.Println(x, y, x+y)

}

上述代码当没有接收到x和y的时候将会阻塞goroutine,直到x和y都接收到Sum函数中发送给ch的sum值主的goroutine才会被继续执行打印.这样就是无缓存的channel,也就是同步的goroutine.

range和close

发送者可以close一个channel开表示没有值会被发送了.接受者可以通过赋值语句的第二个参数来判断channel是否被关闭,当没有值可以接受并且channel已经被关闭,v,ok := <-chok会被置为flase.

循环 for i := range c会不断从channel接受值,直到它被关闭.

只有发送者才能关闭channel.对一个只接受的channel调用会发生编译错误.向一个已经关闭的channel发送数据会引起panic.channel 与文件不同;通常情况下无需关闭它们。只有在需要告诉接收者没有更多的数据的时候才有必要进行关闭,例如中断一个 range

串联的channel

channel可以将多个goroutine串联起来,一个channel的输出作为下一个channel的输入,类似进程间的管道通信.

package main

import (
    "fmt"
    "time"
)

func main () {
    num := make(chan int)
    sqr := make(chan int)
    go func() {
        for x := 0; x< 10; x++  {
            num <- x
            time.Sleep(time.Second)
        }
        close(num)
    }()
    go func() {
        for x := range num  {
            sqr <- x * x
        }
        close(sqr)
    }()
    for x := range sqr {
        fmt.Println(x)
    }
}

上述代码中实现了一个串联的channel,num作为上一个接收上一goroutine的值,然后传递给下一goroutine,计算平方之后再传递给sqr,最后在主的goroutine中获取打印sqr的值.

当我们不断没有close channel的时候goroutine将会持续传递和接收,主goroutine也会持续打印.这会导致goroutine泄露.

而当我们使用for{}一直接收channel中的值的时候,即使我们关闭了channel,在真正意义上也只是关闭了传递值得channel,而接收值得channel则会不断的接收,只不过没有发发送了,所以接收的是零值.

所以我们需要使用for ... range 来遍历接收channel中的值,这样才会保证正确的接收.

单向的channel

使用channel做为参数进行通信.在函数内部,有的channel只接收数据,有的channel只发送数据.这时可以使用单向的channel来实现.

channel <- int 表示只写不读
<- channel int 表示只读不写

任何双向的channel都可以想单向的channel赋值,这会导致隐式转换,但是单向的channel不能向双向的channel赋值.

package main

import (
    "fmt"
    "time"
)

func main () {

    num := make(chan int) 
    sqr := make(chan int) 

    go counter(num)
    go squarter(sqr, num)
    printer(sqr)
}


func counter(out chan <- int) { // 只写
    for x:= 0; x < 10; x++ {
        out <- x
        time.Sleep(time.Second)
    }
    close(out)
}

func squarter(out chan <- int, in <- chan int)  {
    for x := range in {
        out <- x * x
    }
    close(out)
}

func printer(in <- chan int) {
    for x := range in {
        fmt.Println(x)
    }
}

上述代码中定义了三个函数,第一个函数接受一个只写的channel,第二个函数接受一个只读和一个只写的channel,最后一个函数接受一个只读的channel.

在main函数中,创建两个goroutine分别执行counter()和squarter(),然后在主goroutine中执行printer()打印.

在main函数中创建了两个双向channel, 由于双向的channel可以向单向的channel赋值,所以在counter()函数中我们传递了num channel 用于写入数据,squarter()中将num作为一个只读的channel类型,从中读取在counter()函数中传递的值,然后求平方之后写入sqr channel

最后在主goroutine中执行printer()从sqr中读取数据并打印.

缓冲channel

在创建channel的时候make函数提供了第二个参数,channel的缓冲长度.带缓冲的channel内部持有一个元素队列.

向缓冲channel的发送操作就是想内部缓冲队列的尾部插入元素,接收操作则是从队列的头部删除元素.

向带有缓冲的channel发送数据的时候,只有在缓冲区满的时候才会阻塞,当缓冲区为空的时候接收操作会被阻塞.

cap()函数可以获取channel内部缓存的容量.len()可以换取channel内部缓冲队列中有效元素的个数.

package main

import (
    "fmt"
    "time"
)

func main () {
    bufferChan := make(chan int, 3)
    productor(bufferChan)
}

func productor (ch chan int) {
    go func() {
        for {
            ch <- 1
            time.Sleep(time.Second)
        }
    }()

    go func() {
        for {
            ch <- 2
            time.Sleep(time.Second*2)
        }
    }()

    go func() {
        for {
            ch <- 3
            time.Sleep(time.Second*3)
        }
    }()

    for {
        fmt.Println(<-ch)
    }
}

创建一个productor() 函数,创建三个goroutine,分别向channel中传入1,2,3. 然后在主goroutine中打印ch中的值.

如果缓冲区满了则会阻塞发送数据的channel.如果缓冲区为空的话则会阻塞接收数据的channel.

并发的循环

并发的循环是指在循环体内,通过go加匿名函数生成多个goroutine,如果在goroutine内用到外部函数的变量,不要直接使用,需要将外部变量作为匿名函数传递,保证每个goroutine运行不同的变量.

func loopgo() {
    for i := 0; i < 10; i++  {
        go func() {
            fmt.Printf("第%d个goroutine\n", i)
        }()

    }
}

上述代码创建了一个并发的goroutine,当函数进行到for{}时,会先创建相应个数的goroutine,再进行大印i的值.所以i的值并不能确定,并且可能打印很多相同的值.

所以我们需要将外部变量作为匿名函数的参数传递

func loopgo() {
    for i := 0; i < 10; i++  {
        go func(x int) {
            fmt.Printf("第%d个goroutine\n", x)
        }(i)

    }
}

上述代码中我们为匿名函数创建了一个参数x,它接受了i的值作为参数.这样goroutine创建的时候实际上每一个goroutine都保存了i的值,所以在打印是第几个goroutine的时候就不会有重复的i值出现了.

基于select的多路复用

Go语言直接在语言级别支持select关键字,用于处理异步IO问题.基于此特性可以为channel实现超时机制.

select 语句使得一个 goroutine 在多个通讯操作上等待。

select 会阻塞,直到条件分支中的某个可以继续执行,这时就会执行那个条件分支。当多个都准备好的时候,会随机选择一个。

当select中的其他条件分支都没有准备好的时候,default分支会被执行.为了非阻塞的发送或者接收,可以使用default分支.

import (
    "time"
    "fmt"
)

func main()  {
    fmt.Println("程序开始执行")
    timeout := make(chan bool)
    ch := make(chan int)
    go func() {
        time.Sleep(time.Second*3)
        timeout <- true
    }()

    select {
        case <- ch:
        fmt.Println("接收ch的值")
        case <-timeout:
        fmt.Println("超时程序退出")
        return
    }

}

上述代码演示了一个超时机制. 我们在程序开始的时候打印一句话表示程序开始执行.然后创建了一个bool类型的channel和一个int类型的ch. 然后创建一个goroutine加匿名函数.函数中程序睡眠三秒,然后给timeout channel传递一个true. 最后创建一个select ... case. 第一个case是接收ch中的值,由于我们没有给ch传递值,所以不能接收到值,而goroutine延时三秒之后timeout就赋值为true了,当执行到第二个case的时候条件成立,打印了程序退出,然后return退出掉了.

并发的退出

当一个已经关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们将会立即返回一个零值,可以将这个机制扩展为广播机制:即不要想channel发送值,而使用一个关闭的channel来广播.

package main

import (
    "fmt"
    "os"
    "time"
)

func main() {
    fmt.Println("程序开始")
    exit := make(chan struct{})
    for i := 0; i < 5; i++ {
        go func(x int) {
            for {
                select {
                case <-exit:
                    fmt.Println("第",x,"个goroutine退出")
                    return
                default:
                    fmt.Println("第",x,"个goroutine正在运行")
                }
            }
            time.Sleep(time.Second)

        }(i)

    }
    os.Stdin.Read(make([]byte, 1))
    close(exit)
    time.Sleep(time.Second)
    fmt.Println("程序退出")
}

当使用键盘键入一个字符的时候,执行close() 关闭channel.这时case <- exit; 中从exit接受的数据为0,依次执行程序退出语句.

Comments

添加新评论