golang学习

开始学学golang这门伟大的语言。

结构

Go的基础组成有以下几个部分:

  • 包声明
  • 引入包
  • 函数
  • 变量
  • 语句 & 表达式
  • 注释
1
2
3
4
5
6
7
8
9
package main
/* 包的名字是main,每个程序都有一个main的包 */
import "fmt"
/* 需要fmt这个包 */

func main() {
/* 这是我的第一个简单的程序 */
fmt.Println("Hello, World!")
}

(太奇葩了,竟然是以大小写作为权限控制的。)当标识符(包括常量、变量、类型、函数名、结构字段等等)以一个大写字母开头,如:Group1,那么使用这种形式的标识符的对象就可以被外部包的代码所使用(客户端程序需要先导入这个包),这被称为导出(像面向对象语言中的 public);标识符如果以小写字母开头,则对包外是不可见的,但是他们在整个包的内部是可见并且可用的(像面向对象语言中的 protected )。

运行的话:

1
go run hello.go

最奇葩的是 { 不能单独放在一行

基础

行分隔符

在 Go 程序中,一行代表一个语句结束。每个语句不需要像C一样以分号结尾,因为这些工作都将由 Go 编译器自动完成。

标识符用来命名变量、类型等程序实体。一个标识符实际上就是一个或是多个字母、数字、下划线组成的序列,但是第一个字符必须是字母或下划线而不能是数字。

Go 语言的字符串可以通过 + 实现:

1
2
3
4
5
package main
import "fmt"
func main() {
fmt.Println("Google" + "Runoob")
}

数据类型

在 Go 编程语言中,数据类型用于声明函数和变量。

数据类型的出现是为了把数据分成所需内存大小不同的数据,编程的时候需要用大数据的时候才需要申请大内存,就可以充分利用内存。

Go 语言按类别有以下几种数据类型:

  • 布尔型:布尔型的值只可以是常量 true 或者 false。一个简单的例子:var b bool = true。
  • 数字类型:整型 int 和浮点型 float32、float64,Go 语言支持整型和浮点型数字,并且支持复数,其中位的运算采用补码。
  • 字符串类型:字符串就是一串固定长度的字符连接起来的字符序列。Go 的字符串是由单个字节连接起来的。Go 语言的字符串的字节使用 UTF-8 编码标识 Unicode 文本。
  • 派生类型:
    • (a) 指针类型(Pointer)
    • (b) 数组类型
    • (c) 结构化类型(struct)
    • (d) Channel 类型
    • (e) 函数类型
    • (f) 切片类型
    • (g) 接口类型(interface)
    • (h) Map 类型

数字类型

Go 也有基于架构的类型,例如:int、uint 和 uintptr。

  • uint8:无符号 8 位整型 (0 到 255)
  • uint16:无符号 16 位整型 (0 到 65535)
  • uint32:无符号 32 位整型 (0 到 4294967295)
  • uint64:无符号 64 位整型 (0 到 18446744073709551615)
  • int8:有符号 8 位整型 (-128 到 127)
  • int16:有符号 16 位整型 (-32768 到 32767)
  • int32:有符号 32 位整型 (-2147483648 到 2147483647)
  • int64:有符号 64 位整型 (-9223372036854775808 到 9223372036854775807)

浮点型

  • float32:IEEE-754 32位浮点型数
  • float64:IEEE-754 64位浮点型数
  • complex64:32 位实数和虚数
  • complex128:64 位实数和虚数

其他数字类型

  • byte:类似 uint8
  • rune:类似 int32
  • uint:32 或 64 位
  • int:与 uint 一样大小
  • uintptr:无符号整型,用于存放一个指针

变量

Go 语言变量名由字母、数字、下划线组成,其中首个字符不能为数字。

声明变量的一般形式是使用 var 关键字:

1
var identifier type

可以一次声明多个变量:
1
var identifier1, identifier2 type

实例
1
2
3
4
5
6
7
8
9
package main
import "fmt"
func main() {
var a string = "Runoob"
fmt.Println(a)

var b, c int = 1, 2
fmt.Println(b, c)
}

变量声明

第一种,指定变量类型,如果没有初始化,则变量默认为零值。

1
2
var v_name v_type
v_name = value

未初始化的时候:

  • 数值类型(包括complex64/128)为 0
  • 布尔类型为 false
  • 字符串为 “”(空字符串)
  • 以下几种类型为 nil:
    • var a *int
    • var a []int
    • var a map[string] int
    • var a chan int
    • var a func(string) int
    • var a error // error 是接口

第二种,根据值自行判定变量类型。

1
var v_name = value

实例
1
2
3
4
5
6
package main
import "fmt"
func main() {
var d = true
fmt.Println(d)
}

第三种,省略 var, 注意 := 左侧如果没有声明新的变量,就产生编译错误,格式:

1
v_name := value

例如:
1
2
3
var intVal int 
intVal :=1 // 这时候会产生编译错误
intVal,intVal1 := 1,2 // 此时不会产生编译错误,因为有声明新的变量,因为 := 是一个声明语句

可以将 var f string = "Runoob" 简写为 f := "Runoob"

实例

1
2
3
4
5
6
7
package main
import "fmt"
func main() {
f := "Runoob" // var f string = "Runoob"

fmt.Println(f)
}

多变量声明
//类型相同多个变量, 非全局变量

1
2
3
4
var vname1, vname2, vname3 type
vname1, vname2, vname3 = v1, v2, v3
var vname1, vname2, vname3 = v1, v2, v3 // 和 python 很像,不需要显示声明类型,自动推断
vname1, vname2, vname3 := v1, v2, v3 // 出现在 := 左侧的变量不应该是已经被声明过的,否则会导致编译错误

这种因式分解关键字的写法一般用于声明全局变量

1
2
3
4
var (
vname1 v_type1
vname2 v_type2
)

可以在变量的初始化时省略变量的类型而由系统自动推断,声明语句写上 var 关键字其实是显得有些多余了,因此我们可以将它们简写为 a := 50 或 b := false。

a 和 b 的类型(int 和 bool)将由编译器自动推断。

这是使用变量的首选形式,但是它只能被用在函数体内,而不可以用于全局变量的声明与赋值。使用操作符 := 可以高效地创建一个新的变量,称之为初始化声明。

如果在相同的代码块中,我们不可以再次对于相同名称的变量使用初始化声明,例如:a := 20 就是不被允许的,编译器会提示错误 no new variables on left side of :=,但是 a = 20 是可以的,因为这是给相同的变量赋予一个新的值。

如果你在定义变量 a 之前使用它,则会得到编译错误 undefined: a。

如果你声明了一个局部变量却没有在相同的代码块中使用它,同样会得到编译错误

常量

常量是一个简单值的标识符,在程序运行时,不会被修改的量。

常量中的数据类型只可以是布尔型、数字型(整数型、浮点型和复数)和字符串型。

常量的定义格式:

1
const identifier [type] = value

你可以省略类型说明符 [type],因为编译器可以根据变量的值来推断其类型。

显式类型定义: const b string = “abc”
隐式类型定义: const b = “abc”
多个相同类型的声明可以简写为:

1
const c_name1, c_name2 = value1, value2

常量还可以用作枚举:

1
2
3
4
5
const (
Unknown = 0
Female = 1
Male = 2
)

数字 0、1 和 2 分别代表未知性别、女性和男性。

常量可以用len(), cap(), unsafe.Sizeof()函数计算表达式的值。常量表达式中,函数必须是内置函数,否则编译不过

iota

iota,特殊常量,可以认为是一个可以被编译器修改的常量。

iota 在 const 关键字出现时将被重置为 0(const 内部的第一行之前),const 中每新增一行常量声明将使 iota 计数一次(iota 可理解为 const 语句块中的行索引)。

iota 可以被用作枚举值:

1
2
3
4
5
const (
a = iota
b = iota
c = iota
)

第一个 iota 等于 0,每当 iota 在新的一行被使用时,它的值都会自动加 1;所以 a=0, b=1, c=2 可以简写为如下形式:
1
2
3
4
5
const (
a = iota
b
c
)

iota 用法

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import "fmt"

func main() {
const (
a = iota //0
b //1
c //2
d = "ha" //独立值,iota += 1
e //"ha" iota += 1
f = 100 //iota +=1
g //100 iota +=1
h = iota //7,恢复计数
i //8
)
fmt.Println(a,b,c,d,e,f,g,h,i)
}

以上实例运行结果为:
1
0 1 2 ha ha 100 100 7 8

再看个有趣的的 iota 实例:

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import "fmt"
const (
i=1<<iota
j=3<<iota
k
l
)

func main() {
fmt.Println("i=",i)
fmt.Println("j=",j)
fmt.Println("k=",k)
fmt.Println("l=",l)
}

以上实例运行结果为:
1
2
3
4
i= 1
j= 6
k= 12
l= 24

iota 表示从 0 开始自动加 1,所以 i=1<<0, j=3<<1(<< 表示左移的意思),即:i=1, j=6,这没问题,关键在 k 和 l,从输出结果看 k=3<<2,l=3<<3。

简单表述:

1
2
3
4
i=1:左移 0 位,不变仍为 1;
j=3:左移 1 位,变为二进制 110, 即 6;
k=3:左移 2 位,变为二进制 1100, 即 12;
l=3:左移 3 位,变为二进制 11000,即 24。

部分运算符

假定 A 为60,B 为13:

运算符 描述 实例
& 按位与运算符”&”是双目运算符。 其功能是参与运算的两数各对应的二进位相与。 (A & B) 结果为 12, 二进制为 0000 1100
竖线或 按位或运算符是双目运算符。 其功能是参与运算的两数各对应的二进位相或。 (A 或 B) 结果为 61, 二进制为 0011 1101
^ 按位异或运算符”^”是双目运算符。 其功能是参与运算的两数各对应的二进位相异或,当两对应的二进位相异时,结果为1。 (A ^ B) 结果为 49, 二进制为 0011 0001
<< 左移运算符”<<”是双目运算符。左移n位就是乘以2的n次方。 其功能把”<<”左边的运算数的各二进位全部左移若干位,由”<<”右边的数指定移动的位数,高位丢弃,低位补0。 A << 2 结果为 240 ,二进制为 1111 0000
>> 右移运算符>>是双目运算符。右移n位就是除以2的n次方。 其功能是把>>左边的运算数的各二进位全部右移若干位,>>右边的数指定移动的位数。 A >> 2 结果为 15 ,二进制为 0000 1111
运算符 描述 实例
& 返回变量存储地址 &a; 将给出变量的实际地址。
* 指针变量。 *a; 是一个指针变量

条件语句

if 语句的语法如下:

1
2
3
if 布尔表达式 {
/* 在布尔表达式为 true 时执行 */
}

If 在布尔表达式为 true 时,其后紧跟的语句块执行,如果为 false 则不执行。

Go 编程语言中 if…else 语句的语法如下:

1
2
3
4
5
if 布尔表达式 {
/* 在布尔表达式为 true 时执行 */
} else {
/* 在布尔表达式为 false 时执行 */
}

If 在布尔表达式为 true 时,其后紧跟的语句块执行,如果为 false 则执行 else 语句块。

Go 编程语言中 if…else 语句的语法如下:

1
2
3
4
5
6
if 布尔表达式 1 {
/* 在布尔表达式 1 为 true 时执行 */
if 布尔表达式 2 {
/* 在布尔表达式 2 为 true 时执行 */
}
}

switch

switch 语句用于基于不同条件执行不同动作,每一个 case 分支都是唯一的,从上至下逐一测试,直到匹配为止。

switch 语句执行的过程从上至下,直到找到匹配项,匹配项后面也不需要再加 break。

switch 默认情况下 case 最后自带 break 语句,匹配成功后就不会执行其他 case,如果我们需要执行后面的 case,可以使用 fallthrough 。

语法
Go 编程语言中 switch 语句的语法如下:

1
2
3
4
5
6
7
8
switch var1 {
case val1:
...
case val2:
...
default:
...
}

变量 var1 可以是任何类型,而 val1 和 val2 则可以是同类型的任意值。类型不被局限于常量或整数,但必须是相同的类型;或者最终结果为相同类型的表达式。

您可以同时测试多个可能符合条件的值,使用逗号分割它们,例如:case val1, val2, val3。

switch 语句还可以被用于 type-switch 来判断某个 interface 变量中实际存储的变量类型。

Type Switch 语法格式如下:

1
2
3
4
5
6
7
8
9
switch x.(type){
case type:
statement(s);
case type:
statement(s);
/* 你可以定义任意个数的case */
default: /* 可选 */
statement(s);
}

fallthrough

使用 fallthrough 会强制执行后面的 case 语句,fallthrough 不会判断下一条 case 的表达式结果是否为 true。

select

select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。

select 随机执行一个可运行的 case。如果没有 case 可运行,它将阻塞,直到有 case 可运行。一个默认的子句应该总是可运行的。

语法
Go 编程语言中 select 语句的语法如下:

1
2
3
4
5
6
7
8
9
select {
case communication clause :
statement(s);
case communication clause :
statement(s);
/* 你可以定义任意数量的 case */
default : /* 可选 */
statement(s);
}

以下描述了 select 语句的语法:

  • 每个 case 都必须是一个通信
  • 所有 channel 表达式都会被求值
  • 所有被发送的表达式都会被求值
  • 如果任意某个通信可以进行,它就执行,其他被忽略。
  • 如果有多个 case 都可以运行,Select 会随机公平地选出一个执行。其他不会执行。

否则:

  • 如果有 default 子句,则执行该语句。
  • 如果没有 default 子句,select 将阻塞,直到某个通信可以运行;Go 不会重新对 channel 或值进行求值。

循环

Go语言的For循环有3中形式,只有其中的一种使用分号。

和 C 语言的 for 一样:

1
for init; condition; post { }

和 C 的 while 一样:
1
for condition { }

和 C 的 for(;;) 一样:
1
for { }

  • init: 一般为赋值表达式,给控制变量赋初值;
  • condition: 关系表达式或逻辑表达式,循环控制条件;
  • post: 一般为赋值表达式,给控制变量增量或减量。

for语句执行过程如下:

  • 先对表达式1赋初值;
  • 判别赋值表达式 init 是否满足给定条件,若其值为真,满足循环条件,则执行循环体内语句,然后执行 post,进入第二次循环,再判别 condition;
  • 否则判断 condition 的值为假,不满足条件,就终止for循环,执行循环体外语句。

for 循环的 range 格式可以对 slice、map、数组、字符串等进行迭代循环。格式如下:

1
2
3
for key, value := range oldMap {
newMap[key] = value
}

break 语句

Go 语言循环语句 Go语言循环语句

Go 语言中 break 语句用于以下两方面:

  • 用于循环语句中跳出循环,并开始执行循环之后的语句。
  • break 在 switch(开关语句)中在执行一条case后跳出语句的作用。

Go 语言的 continue 语句 有点像 break 语句。但是 continue 不是跳出循环,而是跳过当前循环执行下一次循环语句。

for 循环中,执行 continue 语句会触发for增量语句的执行。

函数

函数是基本的代码块,用于执行一个任务。

Go 语言最少有个 main() 函数。

你可以通过函数来划分不同功能,逻辑上每个函数执行的是指定的任务。函数声明告诉了编译器函数的名称,返回类型,和参数。

Go 语言标准库提供了多种可动用的内置的函数。例如,len() 函数可以接受不同类型参数并返回该类型的长度。如果我们传入的是字符串则返回字符串的长度,如果传入的是数组,则返回数组中包含的元素个数。

Go 语言函数定义格式如下:

1
2
3
func function_name( [parameter list] ) [return_types] {
函数体
}

  • func:函数由 func 开始声明
  • function_name:函数名称,函数名和参数列表一起构成了函数签名。
  • parameter list:参数列表,参数就像一个占位符,当函数被调用时,你可以将值传递给参数,这个值被称为实际参数。参数列表指定的是参数类型、顺序、及参数个数。参数是可选的,也就是说函数也可以不包含参数。
  • return_types:返回类型,函数返回一列值。return_types 是该列值的数据类型。有些功能不需要返回值,这种情况下 return_types 不是必须的。
  • 函数体:函数定义的代码集合。

函数返回多个值

Go 函数可以返回多个值,例如:

实例

1
2
3
4
5
6
7
8
9
10
11
package main
import "fmt"

func swap(x, y string) (string, string) {
return y, x
}

func main() {
a, b := swap("Google", "Runoob")
fmt.Println(a, b)
}

值传递

传递是指在调用函数时将实际参数复制一份传递到函数中,这样在函数中如果对参数进行修改,将不会影响到实际参数。默认情况下,Go 语言使用的是值传递,即在调用过程中不会影响到实际参数。

引用传递

引用传递是指在调用函数时将实际参数的地址传递到函数中,那么在函数中对参数所进行的修改,将影响到实际参数。

引用传递指针参数传递到函数内,以下是交换函数 swap() 使用了引用传递:

1
2
3
4
5
6
7
/* 定义交换值函数*/
func swap(x *int, y *int) {
var temp int
temp = *x /* 保持 x 地址上的值 */
*x = *y /* 将 y 值赋给 x */
*y = temp /* 将 temp 值赋给 y */
}

函数作为实参

Go 语言可以很灵活的创建函数,并作为另外一个函数的实参。以下实例中我们在定义的函数中初始化一个变量,该函数仅仅是为了使用内置函数 math.sqrt(),实例为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"math"
)

func main(){
/* 声明函数变量 */
getSquareRoot := func(x float64) float64 {
return math.Sqrt(x)
}

/* 使用函数 */
fmt.Println(getSquareRoot(9))

}

函数闭包

go支持匿名函数,可作为闭包。匿名函数是一个”内联”语句或表达式。匿名函数的优越性在于可以直接使用函数内的变量,不必申明。

以下实例中,我们创建了函数 getSequence() ,返回另外一个函数。该函数的目的是在闭包中递增 i 变量,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import "fmt"

func getSequence() func() int {
i:=0
return func() int {
i+=1
return i
}
}

func main(){
/* nextNumber 为一个函数,函数 i 为 0 */
nextNumber := getSequence()

/* 调用 nextNumber 函数,i 变量自增 1 并返回 */
fmt.Println(nextNumber())
fmt.Println(nextNumber())
fmt.Println(nextNumber())

/* 创建新的函数 nextNumber1,并查看结果 */
nextNumber1 := getSequence()
fmt.Println(nextNumber1())
fmt.Println(nextNumber1())
}

方法

Go 语言中同时有函数和方法。一个方法就是一个包含了接受者的函数,接受者可以是命名类型或者结构体类型的一个值或者是一个指针。所有给定类型的方法属于该类型的方法集。语法格式如下:

1
2
3
func (variable_name variable_data_type) function_name() [return_type]{
/* 函数体*/
}

下面定义一个结构体类型和该类型的一个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
)

/* 定义结构体 */
type Circle struct {
radius float64
}

func main() {
var c1 Circle
c1.radius = 10.00
fmt.Println("圆的面积 = ", c1.getArea())
}

//该 method 属于 Circle 类型对象中的方法
func (c Circle) getArea() float64 {
//c.radius 即为 Circle 类型对象中的属性
return 3.14 * c.radius * c.radius
}

变量作用域

作用域为已声明标识符所表示的常量、类型、变量、函数或包在源代码中的作用范围。

Go 语言中变量可以在三个地方声明:

  • 函数内定义的变量称为局部变量
  • 函数外定义的变量称为全局变量
  • 函数定义中的变量称为形式参数

接下来让我们具体了解局部变量、全局变量和形式参数。

局部变量

在函数体内声明的变量称之为局部变量,它们的作用域只在函数体内,参数和返回值变量也是局部变量。

全局变量

在函数体外声明的变量称之为全局变量,全局变量可以在整个包甚至外部包(被导出后)使用。

Go 语言程序中全局变量与局部变量名称可以相同,但是函数内的局部变量会被优先考虑。

形式参数

形式参数会作为函数的局部变量来使用。

数组

Go 语言提供了数组类型的数据结构。

数组是具有相同唯一类型的一组已编号且长度固定的数据项序列,这种类型可以是任意的原始类型例如整形、字符串或者自定义类型。

相对于去声明 number0, number1, …, number99 的变量,使用数组形式 numbers[0], numbers[1] …, numbers[99] 更加方便且易于扩展。

数组元素可以通过索引(位置)来读取(或者修改),索引从 0 开始,第一个元素索引为 0,第二个索引为 1,以此类推。

多维数组

Go 语言支持多维数组,以下为常用的多维数组声明方式:

1
var variable_name [SIZE1][SIZE2]...[SIZEN] variable_type

以下实例声明了三维的整型数组:
1
var threedim [5][10][4]int

二维数组是最简单的多维数组,二维数组本质上是由一维数组组成的。二维数组定义方式如下:
1
var arrayName [ x ][ y ] variable_type

指针

Go 语言的取地址符是&,放到一个变量前使用就会返回相应变量的内存地址。

1
2
3
4
5
6
7
8
9
package main

import "fmt"

func main() {
var a int = 10

fmt.Printf("变量的地址: %x\n", &a )
}

指针使用流程:

  • 定义指针变量。
  • 为指针变量赋值。
  • 访问指针变量中指向地址的值。

在指针类型前面加上 * 号(前缀)来获取指针所指向的内容。

空指针

当一个指针被定义后没有分配到任何变量时,它的值为 nil。nil 指针也称为空指针。nil在概念上和其它语言的null、None、nil、NULL一样,都指代零值或空值。

一个指针变量通常缩写为 ptr。

结构体

Go 语言中数组可以存储同一类型的数据,但在结构体中我们可以为不同项定义不同的数据类型。

结构体是由一系列具有相同类型或不同类型的数据构成的数据集合。

定义结构体

结构体定义需要使用 type 和 struct 语句。struct 语句定义一个新的数据类型,结构体有中有一个或多个成员。type 语句设定了结构体的名称。结构体的格式如下:

1
2
3
4
5
6
type struct_variable_type struct {
member definition;
member definition;
...
member definition;
}

一旦定义了结构体类型,它就能用于变量的声明,语法格式如下:

1
variable_name := structure_variable_type {value1, value2...valuen}


1
variable_name := structure_variable_type { key1: value1, key2: value2..., keyn: valuen}

你可以像其他数据类型一样将结构体类型作为参数传递给函数。

1
2
3
4
5
6
func printBook( book Books ) {
fmt.Printf( "Book title : %s\n", book.title);
fmt.Printf( "Book author : %s\n", book.author);
fmt.Printf( "Book subject : %s\n", book.subject);
fmt.Printf( "Book book_id : %d\n", book.book_id);
}

切片

Go 语言切片是对数组的抽象。

Go 数组的长度不可改变,因此提供了一种灵活,功能强悍的内置类型切片(“动态数组”),与数组相比切片的长度是不固定的,可以追加元素,在追加时可能使切片的容量增大。

定义切片

你可以声明一个未指定大小的数组来定义切片:

1
var identifier []type

切片不需要说明长度。

或使用make()函数来创建切片:

1
var slice1 []type = make([]type, len)

也可以简写为
1
slice1 := make([]type, len)

也可以指定容量,其中capacity为可选参数。
1
make([]T, length, capacity)

这里 len 是数组的长度并且也是切片的初始长度。

切片初始化

1
s :=[] int {1,2,3 } 

直接初始化切片,[]表示是切片类型,{1,2,3}初始化值依次是1,2,3.其cap=len=3

1
s := arr[:] 

初始化切片s,是数组arr的引用
1
s := arr[startIndex:endIndex] 

将arr中从下标startIndex到endIndex-1 下的元素创建为一个新的切片
1
s := arr[startIndex:] 

缺省endIndex时将表示一直到arr的最后一个元素
1
s := arr[:endIndex] 

缺省startIndex时将表示从arr的第一个元素开始
1
s1 := s[startIndex:endIndex] 

通过切片s初始化切片s1
1
s :=make([]int,len,cap) 

通过内置函数make()初始化切片s,[]int 标识为其元素类型为int的切片

len() 和 cap() 函数

切片是可索引的,并且可以由 len() 方法获取长度。

切片提供了计算容量的方法 cap() 可以测量切片最长可以达到多少。

空(nil)切片

一个切片在未初始化之前默认为 nil,长度为 0.

范围(range)

range 关键字用于 for 循环中迭代数组(array)、切片(slice)、通道(channel)或集合(map)的元素。在数组和切片中它返回元素的索引和索引对应的值,在集合中返回 key-value 对的 key 值。

1
2
3
4
kvs := map[string]string{"a": "apple", "b": "banana"}
for k, v := range kvs {
fmt.Printf("%s -> %s\n", k, v)
}

Map(集合)

Map 是一种无序的键值对的集合。Map 最重要的一点是通过 key 来快速检索数据,key 类似于索引,指向数据的值。

Map 是一种集合,所以我们可以像迭代数组和切片那样迭代它。不过,Map 是无序的,我们无法决定它的返回顺序,这是因为 Map 是使用 hash 表来实现的。

定义 Map

可以使用内建函数 make 也可以使用 map 关键字来定义 Map:

1
2
3
4
5
/* 声明变量,默认 map 是 nil */
var map_variable map[key_data_type]value_data_type

/* 使用 make 函数 */
map_variable := make(map[key_data_type]value_data_type)

1
2
3
4
5
6
7
8
var countryCapitalMap map[string]string /*创建集合 */
countryCapitalMap = make(map[string]string)

/* map插入key - value对,各个国家对应的首都 */
countryCapitalMap [ "France" ] = "巴黎"
countryCapitalMap [ "Italy" ] = "罗马"
countryCapitalMap [ "Japan" ] = "东京"
countryCapitalMap [ "India " ] = "新德里"

递归

Go 语言支持递归。但我们在使用递归时,开发者需要设置退出条件,否则递归将陷入无限循环中。

类型转换

类型转换用于将一种数据类型的变量转换为另外一种类型的变量。Go 语言类型转换基本格式如下:

1
type_name(expression)

type_name 为类型,expression 为表达式。

接口

Go 语言提供了另外一种数据类型即接口,它把所有的具有共性的方法定义在一起,任何其他类型只要实现了这些方法就是实现了这个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type interface_name interface {
method_name1 [return_type]
method_name2 [return_type]
method_name3 [return_type]
...
method_namen [return_type]
}

/* 定义结构体 */
type struct_name struct {
/* variables */
}

/* 实现接口方法 */
func (struct_name_variable struct_name) method_name1() [return_type] {
/* 方法实现 */
}
...
func (struct_name_variable struct_name) method_namen() [return_type] {
/* 方法实现*/
}

错误处理

Go 语言通过内置的错误接口提供了非常简单的错误处理机制。

error类型是一个接口类型,这是它的定义:

1
2
3
type error interface {
Error() string
}

我们可以在编码中通过实现 error 接口类型来生成错误信息。

函数通常在最后的返回值中返回错误信息。使用errors.New 可返回一个错误信息:

1
2
3
4
5
6
func Sqrt(f float64) (float64, error) {
if f < 0 {
return 0, errors.New("math: square root of negative number")
}
// 实现
}

在下面的例子中,我们在调用Sqrt的时候传递的一个负数,然后就得到了non-nil的error对象,将此对象与nil比较,结果为true,所以fmt.Println(fmt包在处理error时会调用Error方法)被调用,以输出错误,请看下面调用的示例代码:
1
2
3
4
5
result, err:= Sqrt(-1)

if err != nil {
fmt.Println(err)
}

并发

Go 语言支持并发,我们只需要通过 go 关键字来开启 goroutine 即可。

goroutine 是轻量级线程,goroutine 的调度是由 Golang 运行时进行管理的。

goroutine 语法格式:

1
go 函数名( 参数列表 )

例如:
1
go f(x, y, z)

开启一个新的 goroutine:
1
f(x, y, z)

Go 允许使用 go 语句开启一个新的运行期线程, 即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。 同一个程序中的所有 goroutine 共享同一个地址空间。

通道

通道(channel)是用来传递数据的一个数据结构。

通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。

1
2
3
ch <- v    // 把 v 发送到通道 ch
v := <-ch // 从 ch 接收数据
// 并把值赋给 v

声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:

1
ch := make(chan int)

注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须又接收端相应的接收数据。

以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:

通道缓冲区

通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:

1
ch := make(chan int, 100)

带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。

不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。

注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。

遍历通道与关闭通道

Go 通过 range 关键字来实现遍历读取到的数据,类似于与数组或切片。格式如下:

1
v, ok := <-ch

如果通道接收不到数据后 ok 就为 false,这时通道就可以使用 close() 函数来关闭。

Go语言并发之道1-3章

原子性

原子性是指一个操作在运行的环境中是不可被分割的或不可被中断的。操作的原子性是根据当前定义的范围而改变的,上下文不同则一个操作可能不是原子性的。

使一个操作变为原子操作取决于你想让它在哪个上下文中,如果上下文是没有并发的,则该代码是原子性的。

内存访问同步

程序中需要独占访问共享资源的部分叫做“临界区”,看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
var memoryAccess sync.Mutex
var value int

go func() {
memoryAccess.Lock()
value++
memoryAccess.Unlock()
}()

memoryAccess.Lock()
if value == 0 {
fmt.Printf("the value is %v.\n", value)
} else {
fmt.Printf("the value is %v.\n", value)
}
memoryAccess.Unlock()
}

这里我们添加了一个sync.Mutex类型,声明一下在哪个部分里应该独占value这个变量。如果想要访问value这个变量,就要首先调用Lock,当访问结束后,调用Unlock。当然,也可能造成维护和性能的问题。

defer关键字

defer代码块会在函数调用链表中增加一个函数调用。这个函数调用不是普通的函数调用,而是会在函数正常返回,也就是return之后添加一个函数调用。因此,defer通常用来释放函数内部变量。

当defer被声明时,其参数就会被实时解析

我们通过以下代码来解释这条规则:

1
2
3
4
5
6
func a() {
i := 0
defer fmt.Println(i)
i++
return
}

虽然我们在defer后面定义的是一个带变量的函数: fmt.Println(i). 但这个变量在defer被声明的时候,就已经确定其确定的值了。 换言之,上面的代码等同于下面的代码:
1
2
3
4
5
6
func a() {
i := 0
defer fmt.Println(0) //因为i=0,所以此时就明确告诉golang在程序退出时,执行输出0的操作
i++
return
}

为了更为明确的说明这个问题,我们继续定义一个defer:
1
2
3
4
5
6
7
func a() {
i := 0
defer fmt.Println(i) //输出0,因为i此时就是0
i++
defer fmt.Println(i) //输出1,因为i此时就是1
return
}

通过运行结果,可以看到defer输出的值,就是定义时的值。而不是defer真正执行时的变量值(很重要,搞不清楚的话就会产生于预期不一致的结果)

但为什么是先输出1,在输出0呢? 看下面的规则二。

defer执行顺序为先进后出

当同时定义了多个defer代码块时,golang安装先定义后执行的顺序依次调用defer。不要为什么,golang就是这么定义的。我们用下面的代码加深记忆和理解:

1
2
3
4
5
func b() {
for i := 0; i < 4; i++ {
defer fmt.Print(i)
}
}

在循环中,依次定义了四个defer代码块。结合规则一,我们可以明确得知每个defer代码块应该输出什么值。 安装先进后出的原则,我们可以看到依次输出了3210.

defer可以读取有名返回值

先看下面的代码:

1
2
3
4
func c() (i int) {
defer func() { i++ }()
return 1
}

输出结果是12. 在开头的时候,我们说过defer是在return调用之后才执行的。 这里需要明确的是defer代码块的作用域仍然在函数之内,结合上面的函数也就是说,defer的作用域仍然在c函数之内。因此defer仍然可以读取c函数内的变量(如果无法读取函数内变量,那又如何进行变量清除呢….)。

当执行return 1 之后,i的值就是1. 此时此刻,defer代码块开始执行,对i进行自增操作。 因此输出2.

掌握了defer以上三条使用规则,那么当我们遇到defer代码块时,就可以明确得知defer的预期结果。

死锁、活锁、饥饿

死锁是所有并发进程等待的程序,在这种情况下,如果没有外界干预,这个程序将无法恢复。

Coffman条件

出现死锁的条件有以下几个必要条件:

  • 相互排斥:并发进程同时拥有资源的独占权
  • 等待条件:并发进程必须同时拥有一个资源,并等待额外的资源
  • 没有抢占:并发进程拥有的资源只能被该进程释放
  • 循环等待:一个并发进程只能等待一系列其他并发进程,这些并发进程也在等待

活锁

正在主动执行并发操作的程序,但是无法向前推进程序的状态。看起来程序在工作。

饥饿

在任何情况下,并发进程欧步伐获得执行工作所需的所有资源。饥饿通常意味着有一个或多个贪婪的并发进程,它们不公平地阻止一个或多个并发进程,以尽可能地有效完成工作,或者阻止全部并发进程。

通信顺序进程

并行与并发

并行属于一个运行中的程序,并发属于代码。

并发哲学

CSP即Communicating Sequential Process,通信顺序进程。

Go的运行时自动将goroutine映射到系统的线程上,并管理调度,因此可以在像goroutine阻塞等待IO之类的事情上进行内省,从而智能的把OS的线程分配到没有阻塞的goroutine上。

如果有一块产生计算结果并想共享结果给其他代码块的代码,则需要传递数据的所有权。并发程序安全就是保证同时只有一个并发上下文拥有数据的所有权。通过channel类型解决,可以创建一个带缓存的channel实现低成本的在内存中的队列来解耦生产者和消费者。

使用channel时可以更简单的控制软件中出现的复杂性。

并发组件

goroutine

每个Go程序中都有至少一个goroutine: main goroutine。goroutine是一个并发的函数,在一个函数前添加go关键字来触发。匿名函数也行:

1
2
3
go func() {
fmt.Println("hello")
} ()

函数赋值也行:
1
2
3
4
5
sayhello := func() {
fmt.Println("hello")
}

go sayhello()

go中的goroutine是一个更高级别的抽象,称为协程,一中非抢占式的简单并发子程序,不能被中断,允许暂停或重入。Go的运行时会观察goroutine的运行时行为,并在它们阻塞时自动挂起它们,然后在它们不被阻塞时自动恢复它们。

go的主机托管机制是一个名为M:N调度器的实现。将M个绿色线程映射到N个OS线程,然后将goroutine安排在绿色线程上。

go遵循一个fork-join并发模型,将执行的子分支与其父节点同时运行,这些并发的执行分支将会在未来合并在一起。为了创建一个join点,必须对程序进行同步,这里可以通过sync.Watigroup实现。

在下边这个程序中,输出的是“world”,因此可以说明goroutine在它们所创建的相同地址空间内执行。

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var wg sync.WaitGroup
salutation := "hello"
wg.Add(1)
go func(){
defer wg.Done()
salutation = "world"
}()
wg.Wait()
fmt.Println(salutation)
}


可以以如下方式将参数传到函数中,以输出正确结果。
1
2
3
4
5
6
7
8
9
for _, salt := range []string{"hello", "greetings", "good day"} {
wg.Add(1)
go func(salt string) {
defer wg.Done()
fmt.Println(salt)
} (salt)
}
wg.Wait()

sync包

sync包包含了对低级别内存访问同步最有用的并发原语。

WaitGroup

可以调用Add表明n个goroutine已经开始了,使用defer关键字确保在goroutine退出之前执行Done操作。执行Wait操作将会阻塞main goroutine直到所有goroutine表明它们已经退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("1st goroutine sleeping")
time.Sleep(1)
} ()

wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("2nd goroutine sleeping")
time.Sleep(2)
}()

wg.Wait()
fmt.Println("All goroutine complete")

WaitGroup调用通过传入的整数执行Add操作增加计数器的增量,并调用Done递减,Wait阻塞,直到计数器为0.

互斥锁和读写锁

channel通过通信共享内存,而Mutex通过开发人员的约定同步访问共享内存。

Mutex有两个函数,Lock和Unlock,在defer中调用Unlock保证即使出现了panic,也可以及时调用Unlock,避免死锁。

进入和退出一个临界区是有开销的,所以要减少临界区的范围,可能存在多个并发进程之间共享内存,但这些进程不是都需要读写此内存,可以利用不同类型的互斥对象,sync.RWMutex。可以请求一个锁用于读或者写。

cond

cond是一个goroutine的集合点,等待或发布一个event,在这里一个event是两个或两个以上的goroutine之间的任意信号。

1
2
3
4
5
6
c := sync.NewCond(&sync.Mutex{})
c.L.Lock()
for conditionTrue() == false {
c.Wait()
}
c.L.Unlock()

上述代码实例化一个cond,NewCond创建一个类型,cond类型能够以一种并发安全的方式与其他goroutine协调。

Broadcast提供了同时与多个goroutine通信的方法,在Clicked Cond上调用Broadcast,则所有三个函数都将运行。它内部维护一个FIFO列表,等待接收信号,向所有等待的goroutine发送信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func main() {
type Button struct {
Clicked *sync.Cond
}

button := Button{ Clicked: sync.NewCond(&sync.Mutex{}) }
subscribe := func(c *sync.Cond, fn func()) {
var goroutineRunning sync.WaitGroup
goroutineRunning.Add(1)
go func(){
goroutineRunning.Done()
c.L.Lock()
defer c.L.Unlock()
c.Wait()
fn()
}()
goroutineRunning.Wait()
}

var clickRegistered sync.WaitGroup
clickRegistered.Add(3)
subscribe(button.Clicked, func() {
fmt.Println("Maximizing window")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Displaying annoying dialog box!")
clickRegistered.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("Mouse clicked")
clickRegistered.Done()
})

button.Clicked.Broadcast()
clickRegistered.Wait()
}

once

sync.Once在内部调用一些原语,确保即使在不同的goroutine上也只会调用一次Do方法处理传进来的函数。

1
2
3
4
5
6
7
var count int
increment := func() {count++}
decrement := func() {count--}

var once sync.Once
once.Do(increment)
once.Do(decrement)

上述程序输出的是1,因为once只计算Do调用的次数,不管Do函数里边的参数是什么。

Pool模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法,用于约束创建昂贵的场景,以便只创建固定数量的实例,但不确定数量的操作仍然可以请求访问这些场景。

Pool的主接口是Get方法,首先检查池中是否有可用的实例,如果没有则调用new方法创建一个,完成时调用者调用Put方法将实例归还。

Pool也用来尽可能快地将预先分配的对象缓存加载启动,通过提前加载获取引用到另一个对象所需的时间,来节省消费者的时间。

  • 实例化sync.Pool时,使用new方法创建一个成员变量,在调用时是线程安全的。
  • 收到来自Get的实例时,不要对接收的对象的状态做出任何假设。
  • 当你用完了从Pool中取出的对象时一定要调用Put否则Pool无法复用这个实例。
  • Pool内的分布大致均匀。

channel

channel充当着信息传送的管道,值可以沿着channel传递。

1
2
var dataStream chan interface{}
dataStream = make(chan interface{})

上面声明了一个新channel,因为声明的类型是空接口,所以类型是interface{},并且使用内置的make函数实例化channel。

声明一个单向channel只需包含“<-”,声明一个只能读取的channel,将“<-”放在左边:

1
2
var dataStream <-chan interface{}
dataStream = make(<-chan interface{})

声明一个只能发送的channel,则将“<-”放在右边。

通过将“<-”放到channel的右边实现发送操作,通过将“<-”放到channel的左边实现接收操作。另一种方法是数据流向箭头所指方向的变量。

1
2
3
4
5
stringStream := make(chan string)
go func(){
stringStream <- "hello"
}()
fmt.Println(<-stringStream)

上述代码实现了将字符串文本传递到stringStream channel并读取channel的字符串并打印到stdout。

可以从channel中获取,然后通过range遍历,并且在channel关闭时自动中断循环:

1
2
3
4
5
6
7
8
9
10
11
intStream := make(chan int)
go func() {
defer close(intStream)
for i:= 1; i <= 5; i ++ {
intStream <- i
}
}()
for integer := range intStream {
fmt.Printf("%v ",integer)
}

关闭channel也是一种同时给多个goroutine发信号的方法,如果有n个goroutine在一个channel上等待,而不是在channel上写n次来打开每个goroutine,可以简单地关闭channel。

更可以创建buffered channel,在实例化时提供容量。即使没有在channel上执行读取操作,goroutine仍然可以写入n次。

如果说channel是满的,那么写入channel阻塞。无缓冲的channel容量为0,因此在任何写入之前就已经满了,缓冲channel是一个内存中的FIFO队列,用于并发进程通信。

我们需要在正确的环境中配置channel,channel的所有者对channel拥有写访问视图,使用者只有读访问视图。拥有channel的goroutine应该:

  1. 实例化channel;
  2. 执行写操作,或将所有权传递给另一个goroutine;
  3. 关闭channel
  4. 通过只读channel将上述三件事暴露出来。

select

select是将channel绑定在一起的粘合剂,在一个系统中两个或多个组件的交集中,可以在本地、单个函数或类型以及全局范围内找到select语句绑定在一起的channel。

1
2
3
4
5
6
7
8
9
10
var c1, c2 <-chan interface{}
var c3 chan<- interface{}
select {
case <- c1:
....
case <- c2:
....
case <- c3:
....
}

如果多个channel是可用的,则执行伪随机选择,每一个都可能被执行到。如果没有任何channel可用,则我们需要使用time包中的超时机制,time.After。

GOMAXPROCS控制

这是runtime中的一个函数,这个函数控制的OS线程的数量将承载所谓的“工作队列”。runtime.GOMAXPROCS总是被设置成为主机上逻辑CPU的数量。

Go语言并发之道第4章

提示:interface{}可用于向函数传递任意类型的变量,但对于函数内部,该变量仍然为interface{}类型(空接口类型),

Go的并发模式

约束

约束是一种确保了信息只能从一个并发过程中获取到的简单且强大的方法,特定约束是指通过公约实现约束,词法约束涉及使用词法作用域仅公开用于多个并发进程的正确数据和并发原语。

for-select循环

1
2
3
4
5
6
for {
// 要不就无限循环,要不就使用range循环
select {
//使用channel作业
}
}

向channel发送迭代变量

1
2
3
4
5
6
7
for _, s := range []string{"a", "b", "c"} {
select {
case <- done:
return
case stringStream <- s:
}
}

循环等待停止

创建循环,无限直至停止。

1
2
3
4
5
6
7
8
for {
select {
case <- done:
return
default:
}
// 非抢占式任务
}

防止goroutine泄露

main goroutine可能会在其生命周期内将其他的goroutine设置为自旋,导致内存利用率下降。减轻这种情况的方法是在父goroutine和子goroutine之间建立一个信号,让父goroutine向其子goroutine发出信号通知。父goroutine将该channel发送给子goroutine,然后在想要取消子goroutine时关闭该channel。

确保:如果goroutine负责创建goroutine,那么它也负责确保可以停止goroutine。

or-channel

使用or-channel模式将多个channel组合起来。通过递归和goroutine创建一个符合done channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}

orDone := make(chan interface{})
go func() {
defer close()
switch len(channels):
case 2:
select{
case <-channels[0]:
case <-channels[1]:
case <-channels[1]:
case <-channels[2]:
}
}()
}
}

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Result struct {
Error error
Response *http.Response
}

func main() {

checkStatus := func(
done <-chan interface{},
urls ...string,
) <-chan Result {
results := make(chan Result)
go func(){
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <-result:
}
}
}()
return results
}

done := make(chan interface{})
defer close(done)
urls := []string{"https://www.baidu.com", "https://badhost"}
for result := range checkStatus(done, urls...){
if result.Error != nil {
fmt.Printf("Error: %v.\n", result.Error)
continue
}
fmt.Printf("Response: %v.\n", result.Response.Status)
}
}

pipeline

一个stage是将数据输入,对其进行转换并将数据发回。

1
2
3
4
5
6
7
multiply := func(values []int, len(values)) []int {}
add := func(values []int, additive int) []int {}

ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
fmt.Println(v)
}

在range子句中结合加法和乘法,这样构建了一个具有pipeline stage的属性,组合形成pipeline。

pipeline stage的属性是:

  • 一个stage消耗并返回相同的类型;
  • 一个stage必须用语言来表达,以便可以被传递;

channel适合在Go中构建pipeline,可以接受和产生值,且可以安全的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func main() {
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {

addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
fmt.Println(v)
}
}

挺有意思的,显示了流水线的操作。

generator接受一个可变的整数切片,构造一个缓存长度等于输入片段的整数channel,启动goroutine并返回构造的channel,将一组离散值转化成一个channel上的数据流。

扇入扇出

扇出是描述启动多个goroutine以处理来自pipeline的输入的过程;扇入是描述将多个结果组合到一个channel的过程中。

1
2
3
4
5
6
7
primeStream := primeFinder(done, randIntStream)

numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i ++ {
finders[i] = primeFinder(done, randIntStream)
}

这里启动了stage的多个副本,有n个goroutine从随机数发生器中拉出并试图确定数字是否为素数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package pips

import (
"sync"
)

type PrimePip struct {
}

func NewPrimePip() *PrimePip {
primePip := &PrimePip{}
return primePip
}

func (primePip *PrimePip) RepeatFn(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}

func (primePip *PrimePip) Take(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}

func (primePip *PrimePip) ToInt(
done <-chan interface{},
valueStream <-chan interface{},
) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}

func (primePip *PrimePip) PrimeFinder(
done <-chan interface{},
intStream <-chan int,
) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for integer := range intStream {
integer -= 1
prime := true
for divisor := integer - 1; divisor > 1; divisor-- {
if integer%divisor == 0 {
prime = false
break
}
}

if prime {
select {
case <-done:
return
case primeStream <- integer:
}
}
}
}()
return primeStream
}

func (primePip *PrimePip) FanIn(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})

multiplexed := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}

wg.Add(len(channels))
for _, c := range channels {
go multiplexed(c)
}

go func() {
wg.Wait()
close(multiplexedStream)
}()

return multiplexedStream
}

or-done-channel

用于处理来自系统各个分散部分的channel,需要用channel中的select语句来包装我们的读操作,并从已完成的channel中进行选择。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false{
return
}
select {
case valStream <- v:
case <-done:
}
}
}
} ()
return valStream
}

tee-channel

分割一个来自channel的值,以便将他们发送到代码的两个独立区域。

队列排队

在队列尚未准备好的时候开始接受请求,只要stage完成了工作,就会把结果存放在一个稍后其他stage可以获取到的临时位置。

  • 在一个stage批处理请求节省时间
  • 如果stage中的延迟产生反馈回路进入系统。

context包

主要包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
var Canceled = errors.New("context canceled")
var DeadlineExceeded error = deadlineExceededError{}

type CancelFunc
type Context

func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

上下文包有两个目的:

  • 提供可以取消调用图中分支的API
  • 提供用于通过呼叫传输请求范围数据的数据包

context类型将是函数的第一个参数,此外,接收context的函数并不能取消它,这保护了调用堆栈上的函数被子函数取消上下文的情况。

上述context包中的函数都接收一个Context参数,并返回一个Context。WithCancel返回新Context,它在调用返回的cancel函数时关闭其done channel。WithDeadline返回一个新的Context,当机器的时钟超过给定的最后期限时,它关闭完成的channel。WithTimeout返回一个新的Context,它在给定的超时时间后关闭完成的channel。

如果函数以某种方式在调用图中取消它后面的函数,它将调用其中一个函数并传递给它的上下文,然后将返回的上下文传递给它的子元素,如果函数不需要修改取消行为,则只传递给定的上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)

go func() {
defer wg.Done()
if err := printGreeting(ctx); err != nil {
fmt.Printf("cannot print greeting: %v\n", err)
cancel()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := printFarewell(ctx); err != nil {
fmt.Printf("cannot print greeting: %v\n", err)
cancel()
}
}()
wg.Wait()
}

func printGreeting(ctx context.Context) error {
greeting, err := genGreeting(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", greeting)
return nil
}

func printFarewell(ctx context.Context) error {
farewell, err := genFarewell(ctx)
if err != nil {
return err
}
fmt.Printf("%s world!\n", farewell)
return nil
}

func genGreeting(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "hello", nil
}
return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "godbye", nil
}
return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
if deadline, ok := ctx.Deadline(); ok {
if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
return "", context.DeadlineExceeded
}
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1 * time.Minute):
}
return "EN/US", nil
}

上述程序允许locale函数快速失败,不必实际等待超时发生。

context包的另一个功能是用于存储和检索请求范围数据的Context数据包。

1
2
3
4
5
6
7
8
9
10
11
12
func ProcessRequest(userID, authToken string) {
ctx := context.WithValue(context.Background(), "userID", userID)
ctx = context.WithValue(ctx, "authToken", authToken)

HandleResponse(ctx)

}

func HandleResponse(ctx context.Context) {
fmt.Printf("handling response for %v (%v)\n", ctx.Value("userID"), ctx.Value("authToken"))
}

  • 我们使用的键值必须满足Go的可比性概念,即==和!=在使用时需要返回正确的结果。
  • 返回值必须安全,才能从多个goroutine访问

由于context的键和值都被定义为interface{},所以当试图检索值时,我们会失去Go的类型安全性,key可以是不同的类型,或者与我们提供的key略有不同。建议在软件包里定义一个自定义键类型:

1
2
3
4
5
6
7
8
type foo int
type bar int

m := make(map[interface{}] int)
m[foo(1)] = 1
m[bar(1)] = 1

fmt.Printf("%v", m)

输出为:

1
map[1:1, 2:2]

虽然基础值是相同的,但是科通通过不同的类型信息在map中区分它们。

Go语言并发之道第5章

异常传递

我们需要对传入的异常信息进行传递和处理,如:

1
2
3
4
5
6
7
8
9
func PostReport(id string) error {
result, err := lowlevel.DoWork()
if err != nil{
if _, ok := err.(lowlevel.Error); ok {
err = WrapErr(err, "cannot post report with id %q", id)
}
return err
}
}

在这里检查接收到的异常信息,确保结构良好,使用一个假设的函数将传入的异常和模块相关信息封装起来,并赋予一个新类型。

创建一个异常类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
type MyError struct {
Inner error
Message string
StackTrace string
Misc map[string]interface{}
}

func wrapError(err error, messagef string, msgArgs ...interface{}) MyError {
return MyError{
Inner: err,
Message: fmt.Sprintf(messagef, msgArgs...),
StackTrace: "stack!!!",
Misc: make(map[string]interface{}),
}
}

func (err MyError) Error() string {
return err.Message
}

type LowLevelErr struct {
error
}
func isGloballyExec(path string) (bool, error) {
info, err := os.Stat(path)
if err != nil {
return false, LowLevelErr{(wrapError(err, err.Error()))}
}
return info.Mode().Perm()&0100 == 0100, nil
}

type IntermediateErr struct {
error
}

func runJob(id string) error {
const jobBinPath = "/bad/job/binary"
isExecutable, err := isGloballyExec(jobBinPath)

if err != nil {
return IntermediateErr{wrapError(err, "cannot run job %q: requisite binaries not available", id)}
} else if isExecutable == false {
return wrapError(nil, "job binary is not executable", id)
}

return exec.Command(jobBinPath, "--id="+id).Run()
}
func handleError(key int, err error, message string) {
log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key))
log.Printf("%#v", err)
fmt.Printf("[%v] %v", key, message)
}

func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
err := runJob("1")

if err != nil {
msg := "There was an unexpected issue; please report this as a bug."
if _, ok := err.(IntermediateErr); ok {
msg = err.Error()
}
handleError(1, err, msg)
}
}

超时和取消

有几个原因使我们需要支持超时:

  1. 系统饱和:希望超出的请求返回超时,而不是花很长时间等待响应。请求在超时时不太可能重复,或没有资源来存储请求,或者对系统响应或请求发送数据有时效性的要求时,需要超时操作。
  2. 陈旧的数据:数据通常有窗口期,如果并发进程处理数据需要的时间比这个窗口期长,则会想返回超时并取消并发进程。可以使用context.WithDeadline或者context.WithTimeout创建的context.Context传递给并发进程。
  3. 试图防止死锁:为了防止死锁,建议在所有并发操作中增加超时操作。

心跳

有两种不同的心跳:

  • 一段时间间隔内发出的心跳
  • 在工作单元开始时发出的心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func main() {
doWork := func(
done <-chan interface{},
pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {

// 建立一个发送心跳的channel,返回给doWork

heartbeat := make(chan interface{})
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)

pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
// 设定心跳间隔
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
// 可能没有人接收心跳,所以加一个default
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case results <- r:
return
}
}
}

for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}

done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) })

const timeout = 2 * time.Second
// 设置了超时时间
heartbeat, results := doWork(done, timeout/2)
// timeout/2 使我们的心跳有额外的响应时间

for {
select {
// 处理心跳,如果没有消息时,至少timeout/2后会从心跳channel发出一条消息
case _, ok := <-heartbeat:
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results:
if ok == false {
return
}
fmt.Printf("result %v\n", r.Second())
case <-time.After(timeout):
return
}
}
}

以下是每个工作单元开始之前发出的心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func main() {
doWork := func(
done <-chan interface{},
) (<-chan interface{}, <-chan int) {
heartbeatStream := make(chan interface{}, 1)
// 创建一个缓冲区大小为1的heartbeat channel,确保了即使没有及时接收发送消息也能发出一个心跳

workStream := make(chan int)
go func() {
defer close(heartbeatStream)
defer close(workStream)

for i := 0; i < 10; i++ {
select {
case heartbeatStream <- struct{}{}:
default:
}
select {
case <-done:
return
case workStream <- rand.Intn(10):
}
}
// 这里为心跳设置了单独的select块,将发送result和发送心跳分开,如果接收者没有准备好接受结果,作为替代它将收到一个心跳,而代表当前结果的值将会丢失。
// 为了防止没人接收心跳,增加了default,因为我们的heart channel创建时有一个缓冲区,所以如果有人正在监听暗示没有及时收到第一个心跳,接收者也可以收到心跳。
}()

return heartbeatStream, workStream
}

done := make(chan interface{})
defer close(done)

heartbeat, results := doWork(done)

for {
select {
case _, ok := <-heartbeat:
if ok == false {
return
} else {
fmt.Println("pulse")
}
case r, ok := <-results:
if ok == false {
return
} else {
fmt.Printf("result %v\n", r)
}
}
}
}

一些外部因素会导致goroutine花费更长的时间来进行第一次迭代,无论goroutine在调度上是否是第一位执行的。使用goroutine来解决这个问题。

复制请求

可以将请求分发到多个处理程序,其中一个将比其他处理程序返回更快,可以立即返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func main() {
doWork := func(
done <-chan interface{},
id int,
wg *sync.WaitGroup,
result chan<- int,
) {
started := time.Now()
defer wg.Done()

simulatedLoadTime := time.Duration(1+rand.Intn(5)) * time.Second
select {
case <-done:
case <-time.After(simulatedLoadTime):
}
select {
case <-done:
case result <- id:
}

took := time.Since(started)
if took < simulatedLoadTime {
took = simulatedLoadTime
}
fmt.Printf("%v took %v.\n", id, took)
}

done := make(chan interface{})
result := make(chan int)

var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go doWork(done, i, &wg, result)
}

firstReturned := <-result
close(done)

wg.Wait()

fmt.Printf("Received an answer from #%v.\n", firstReturned)
}

在这里我们启动了10个处理程序来处理请求,并获得了第一个返回值,如果得到了第一个返回值,则取消其它的处理程序,以保证不会做多余的工作。

速率限制

速率限制允许你将系统的性能和稳定性平衡在可控范围内。Go中大多数的限速是基于令牌算法的。

如果要访问资源,必须拥有资源的访问令牌,没有令牌的请求会被拒绝。假设令牌存储在一个等待被检索使用的桶中,桶的深度是d,表示一个桶可以容纳d个访问令牌。

每当需要访问资源时,都会在桶中删除一个令牌,请求必须排队等待直到有令牌可以用,或者被拒绝操作。将r定义为向桶中添加令牌的速率。只要用户拥有可用的令牌,集中的请求可能会使用户突破系统的可用范围。有些用户会间歇性访问系统,但是又想要尽可能快的获得结果,就会出现突发性的事件,只需要确保系统能同时处理所有用户的突发请求,或者在统计上不会有太多用户同时突发访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func Open() *APIConnection {
return &APIConnection{}
}

type APIConnection struct{}

func (a *APIConnection) ReadFile(ctx context.Context) error {
return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
return nil
}

func main() {
defer log.Printf("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile: %v", err)
}
log.Printf("ReadFile")
}()
}

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress: %v", err)
}
log.Printf("ResolveAddress")
}()
}

wg.Wait()
}

所有的API请求同时进行,没有进行限速,所以客户端可以自由访问系统,下面引入限速器,把限速器放在APIConnection中。这里用到了golang.org/x/time/rate包中的令牌桶限速器实现,具体安装如下:

golang.org/x包放到了https://github.com/golang/time.git中,下载时需要先在本地建立golang.org/x的目录后,再下载。

1
2
mkdir -p golang.org/x
git clone https://github.com/golang/time.git

我们使用了这个包的两个部分,分别是Limit类型和NewLimiter函数。Limit表示某个事件的最大频率,每秒事件数;NewLimiter返回一个新的Limit,允许事件速率为r,并允许最大为b的token。

rate包也包含一个辅助方法Every,将时间间隔转换为Limit。针对每次操作的间隔时间进行测量:

1
2
3
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration / time.Duration(eventCount))
}

创建rate.Limiter后,使用它来阻塞我们的请求,直到获得访问令牌,使用Wait实现。
1
2
3
4
5
6
func (lim *Limiter) Wait(ctx context.Context) 
// Wait是WaitN(ctx, 1)的缩写
// WaitN会执行直到有n个事件发生,
// 如果n超过Limiter的突发大小,ctx被取消,或者逾期等待时间超过context的deadline,会返回一个错误

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

修改后的APIConnection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

func Open() *APIConnection {
return &APIConnection{
rateLimiter: rate.NewLimiter(rate.Limit(1), 1),
}
}

type APIConnection struct {
rateLimiter *rate.Limiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
return nil
}

func main() {
defer log.Printf("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile: %v", err)
}
log.Printf("ReadFile")
}()
}

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress: %v", err)
}
log.Printf("ResolveAddress")
}()
}

wg.Wait()
}

这样实现了所有API连接的速率限制为每秒一次。

聚合限速器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type RateLimiter interface {
Wait(context.Context) error
Limit() rate.Limit
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit)
return &multiLimiter{limiters: limiters}
}

type multiLimiter struct {
limiters []RateLimiter
}

func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}

func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit()
}

定义了一个RateLimiter接口,使MultiLimiter可以递归定义其他的MultiLimiter实例,并且实现了一个优化,根据每个RateLimiter的Limit()排序,可以直接返回限制最多的限制器,这将是切片(slice)的第一个元素。

Wait犯法会遍历所有的子限速器,并调用Wait。

可以考虑增加对API请求的限制,对磁盘的限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter(
rate.NewLimiter(Per(2, time.Second), 2)
rate.NewLimiter(Per(10, time.Minute), 10),
),
diskLimit: MultiLimiter(
rate.NewLimiter(rate.Limit(1), 1)
),
networkLimit: MultiLimiter(
rate.NewLimiter(Per(3, time.Second), 3),
),
}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit,a.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := MultiLimiter(a.apiLimit,a.diskLimit).Wait(ctx); err != nil {
return err
}
return nil
}



上面为API调用和磁盘读取设置了限速器。

治愈异常的goroutine

建立一个机制来监控goroutine是否处于健康的状态,当它们变得异常时就可以尽快重启。需要使用心跳模式来检查正在监控的goroutine是否活跃,心跳的类型取决于想要监控的内容,如果goroutine有可能会产生活锁,需要确保心跳包含某些信息,表明goroutine正在工作而不是只是活着。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
type startGoroutineFn func(
done <-chan interface{},
pulseInterval time.Duration,
) (heartbeat <-chan interface{})
//定义一个可以监控和重启goroutine的信号。

func main() {
newSteward := func(
timeout time.Duration,
startGoroutine startGoroutineFn,
) startGoroutineFn {
return func(
done <-chan interface,
pulseInterval time.Duration,
// 监控goroutine需要timeout变量,一个函数startGoroutineFn表示管理员本身也是可监控的

) (<-chan interface{}) {
heartbeat :=make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <- chan interface{}

startWard := func() {
wardDone = make(chan interface{})
wardHeartbeat = startGoroutine(or(wardDone, done),timeout/2)
}
// 定义了一个闭包,实现了统一的方法来启动正在监视的goroutine
// 创建一个新的channel,如果需要发出停止信号则使用它传入goroutine
// 启动将要监控的goroutine,如果管理员被停止或者想要停止goroutine,希望这些信息能传给管理区的goroutine
// 所以使用了逻辑或来包装。

startWard()
pulse := time.Tick(pulseInterval)

monitorLoop:
for {
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select{
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat:
continue monitorLoop
case <-timeoutSignal:
log.Println("steward: ward unhealthy; restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}

log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
log.Println("ward: hello, I'm irresponsible!")
go func(){
<-done
log.Println("ward: I am halting")
}()
retunr nil
}
doWorkWithSteward := newSteward(4*time.Second, doWork)
// 超时时间是4s

done := make(chan interface{})
time.AfterFunc(9*time.Second, func(){
log.Println("main: halting steward and ward.")
close(done)
})
// 9s后停止管理员和goroutine

for range doWorkWithSteward(done, 4*time.Second) {}
log.Println("done")

}

管理区可以使用桥接channel模式向消费者提供公用的channel,避免中断,使用这些技术,管理区可以简单的通过组合各种模式变得任意复杂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)

done := make(chan interface{})
defer close(done)

doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
// 创建管理区函数,允许结束可变整数切片,返回用来返回的流

doWorkWithSteward := newSteward(1*time.Millisecond, doWork)
// 创建管理员,监听doWork

doWorkWithSteward(done, 1*time.Hour)
// 启动管理区并开始监控

for intVal := range take(done, intStream, 6) {
fmt.Println("Received %v.\n", intVal)
}

Go语言并发之道第6章

goroutine和Go语言进行时

工作窃取

为了确保所有CPU有相同的使用率,可以在所有可用的处理器上平均分配负载。在实际使用过程中,基于朴素策略在处理器上分配任务可能会导致其中一个处理器利用率不足。不仅如此,还可能导致缓存的位置偏差,因为需要调用这些数据的任务跑在其他处理器上。

可以采取:工作任务加入队列中进行调度,处理器在有空闲的时候将任务出队,或者阻塞连接。这样引入了一个集中化的队列,所有的处理器都必须使用这个数据结构,每次想要入队或出队一个任务时继续要将这个队列加载到每个处理器的缓存中。

也可以拆分工作队列,给每个处理器一个独立线程和双端队列。

首先需要强调,Go遵循fork-join模型进行并发,在goroutine开始的时候fork,join点事两个或更多的goroutine通过channel或sync包中的类型进行同步。工作窃取算法对于给定线程:

  1. 在fork点,将任务添加到与线程相关的双端队列尾部;
  2. 如果线程空闲则随机选取一个线程,从它关联的双端队列头部窃取工作;
  3. 如果在未准备好的join点则将工作从线程的双端队列尾部出栈;
  4. 如果线程的双端队列是空的,则暂停加入或从随机线程关联的双端队列中窃取工作。

以下是计算fibonacci数列的程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
var fib func(n int) <-chan int
fib = func(n int) <-chan int {
result := make(chan int)
go func() {
defer close(result)
if n <= 2 {
result <- 1
return
}
result <- <-fib(n-1) + <-fib(n-2)
}()
return result
}

fmt.Printf("fib(4) = %d.\n", <-fib(4))
}

首先只有一个goroutine,main goroutine,假设在处理器1上;接下来调用fib(4),这个goroutine被安排在T1的工作队列尾部,并且父goroutine将继续运行;此时根据时机不同,可能会发生T1或T2盗取调用fib(4)的goroutine,如果fib(4)在T1上,则在T1的工作队列上将添加fib(3)和fib(2)。

此时T2仍然是空闲的,所以从T1的队列头部取出fib(3)。此时fib(2)是fib(4)推入队列的最后一个任务,因此T1最有可能需要计算的第一个任务仍然在T1上!与此同时,由于在fib(3)和fib(2)返回的channel上等待着,T1不足以继续处理fib(4),它会自己从队列中出栈一个fib(2)。

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3)
fib(4)(等待join)
fib(2)

调用fib(3)的goroutine:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3) fib(2)
fib(4)(等待join) fib(1)
fib(2)

T1到达了Fibonacci收敛处,返回1:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3) fib(2)
fib(4)(等待join) fib(1)
fib(1)

T2到达了join点,并从其队列的尾部出栈一个任务:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3)(等待join) fib(2)
fib(4)(等待join) fib(1)
return 1

T1又一次处于空闲所以从T2的队列中窃取工作:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3)(等待join)
fib(4)(等待join) fib(1)
fib(2)

T2到达终点返回1:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3)(等待join)
fib(4)(等待join) return 1
fib(2)

T1到达终点返回1:

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) fib(3)(等待join)
fib(4)(等待join) return 1
return 1

T2对fib(3)的调用现在有两个已完成的join点,fib(2)和fib(1)已经通过channel返回了结果,并且fib(3)产生的两个goroutine已经运行结束。

T1调用栈 T1工作队列 T2调用栈 T2工作队列
(main goroutine)(等待join) return 2
fib(4)(等待join)

fib(4)调用的goroutine有两个join点,fib(3)和fib(2),在T2最后一个任务结束时完成了fib(2)的join。执行加法,通过fib(4)的channel返回。

位于队列尾部的任务:

  1. 最有可能完成父进程join的任务
  2. 最有可能存在于处理器缓存中的任务

当一个线程到达join时,必须暂停等待回调以窃取任务。

Go中的调度器

G:goroutine

M:OS线程,在源代码中也称为机器

P:上下文,在源代码中也被称为处理器

在Go的运行时中,首先启动M,然后是P,最后是调度运行G。

正如之前说的,设置GOMAXPROCS可以控制运行时使用多少上下文。默认设置是主机上每个逻辑CPU分配一个上下文。并且总会有足够的系统线程可以用来处理每个上下文。这使运行时可以进行一些重要的优化。

如果一个goroutine被阻塞,管理goroutine的系统线程也会被阻塞,并且无法继续执行或切换到其他的goroutine。从性能上,Go会进行更多的处理以尽可能让机器上的处理器保持活跃,Go会从系统线程分离上下文,将上下文切换到另一个无阻塞的系统线程上。当goroutine阻塞最终结束时,主机系统线程会尝试使用一个其他系统线程来回退上下文,以便它可以继续执行先前被阻塞的goroutine。或者把它的goroutine放在全局上下文中然后线程进入休眠状态,并将其放入运行时的线程池以供将来使用。

竞争检测

在Go中为大多数命令增加了race参数。

竞争检测器可以自动检测代码中的竞态条件。

1
2
3
4
5
6
7
8
9
10
func main() {
var data int
go func() {
data++
}()
if data == 0 {
fmt.Printf("the value is %d.\n", data)
}
}

执行go run -race test19.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
the value is 0.
==================
WARNING: DATA RACE
Write at 0x00c0000200c8 by goroutine 6:
main.main.func1()
/home/yuhao/tool/go/test/test19.go:8 +0x4e

Previous read at 0x00c0000200c8 by main goroutine:
main.main()
/home/yuhao/tool/go/test/test19.go:10 +0x88

Goroutine 6 (running) created at:
main.main()
/home/yuhao/tool/go/test/test19.go:7 +0x7a
==================
Found 1 data race(s)
exit status 66

分别表示goroutine试图进行非同步内存写入,或者试图读取相同的内存。