Go Channel
11 min
Channel(通道)
1. Channels的意义
- 消息传递:Channels通过传递消息(数据)在不同的goroutine之间进行通信。
- 同步机制:无缓冲的channel具有 :o:同步的特性,确保发送和接收操作是同步的,有缓冲的channel则具有异步的特性。
- 安全通信:Channels确保了不同goroutine之间的安全通信,避免了共享数据时出现的竞争条件(race condition)。
2. Channels的特点:
- 类型化:每个channel都有一个类型,声明时需要指定。channel只能传递这种类型的数据。
- 无缓冲和有缓冲:无缓冲的channel在没有接收方接收数据时,发送方会阻塞;有缓冲的channel允许在缓冲区满之前发送数据而不会阻塞。
- 同步和通信:通过channel传递数据的操作是同步的,即发送操作和接收操作会阻塞,直到另一端准备好。这使得channels可以用于
goroutine之间的同步。
- 同步和通信:通过channel传递数据的操作是同步的,即发送操作和接收操作会阻塞,直到另一端准备好。这使得channels可以用于
3. Channel的基本使用:
3.1 无缓冲的channels:
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
ch <- 42
}()
value := <-ch // 从Channel ch中接收数据,并将数据赋值给value
fmt.Println(value) // 输出:42
}操作符是
<-箭头的方向指向数据可以流动的方向。
ch <- v // 发送值v到Channel ch中,“写”操作,数据流入通道 v := <-ch // 从Channel ch中接收数据,并将数据赋值给v,“读”操作,数据从传到流出make的用法
声明一个通道很简单,我们使用
chan关键字即可,通道在使用前必须先用make创建ch := make(chan int)chan int是一个整体,表示一个整型的通道(channel)chan是 Go 语言中的关键字,用于声明一个通道类型- 后面的 int 表示这个通道用于传递 int 类型的数据。
3.2 有缓冲的channels:
// 创建一个有缓冲的channel,缓冲区大小为2
messages := make(chan string, 2)
// 向channel发送数据
messages <- "buffered"
messages <- "channel"
// 从channel接收数据
fmt.Println(<-messages)
fmt.Println(<-messages)3.3 关闭channel:
当不再需要发送数据时,可以关闭channel,这样接收方可以通过检查channel是否关闭来终止接收操作。
close(messages)
3.4 Range循环接收数据:
在接收方,可以使用
range循环从channel接收数据,直到channel被关闭。go func() { messages <- "message 1" messages <- "message 2" close(messages) }() for msg := range messages { fmt.Println(msg) }for range:会源源不断从通道messages中读出信息,赋予msg
3.5 Select语句:
并发处理多通道,当多个通道都准备好时,
select会随机选择一个通道操作。这有助于避免偏向某个特定通道,保证公平性。select允许同时等待多个通道的操作,无论是接收(<-channel)还是发送(channel <-)。这使得它非常适合处理多路复用和并发任务。package main import ( "fmt" "time" ) func main() { channel1 := make(chan string) channel2 := make(chan string) go func() { time.Sleep(2 * time.Second) channel1 <- "Message from channel1" }() go func() { time.Sleep(1 * time.Second) channel2 <- "Message from channel2" }() for i := 0; i < 2; i++ { select { case msg1 := <-channel1: fmt.Println("Received", msg1) case msg2 := <-channel2: fmt.Println("Received", msg2) } } }
防止阻塞
select可以用default子句来避免阻塞。如果所有的通道都没有准备好,default子句将被执行。这样可以防止因等待通道操作而导致的阻塞。package main import "fmt" func main() { channel := make(chan string) select { case msg := <-channel: fmt.Println("Received", msg) default: fmt.Println("No messages received") } }
超时处理
select可以与time.After结合使用来实现超时操作。如果某个通道在指定时间内没有响应,可以执行超时处理。package main import ( "fmt" "time" ) func main() { channel := make(chan string) go func() { time.Sleep(2 * time.Second) channel <- "Message from channel" }() select { case msg := <-channel: fmt.Println("Received", msg) case <-time.After(1 * time.Second): fmt.Println("Timeout") } }
简洁性和可读性
select语句可以简化处理多个通道操作的代码,使代码更加简洁和可读select { case msg1 := <-channel1: fmt.Println("Received", msg1) case msg2 := <-channel2: fmt.Println("Received", msg2) default: fmt.Println("No messages received") }
3.6 实际示例
下面是一个实际的例子,展示了如何使用goroutine和channel来实现并发任务:
package main
import (
"fmt"
"time"
)
// worker函数接收一个只读的jobs通道和一个只写的results通道
func worker(id int, jobs <-chan int, results chan<- int) {
// 从jobs通道中读取任务
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j) // 打印开始任务的信息
time.Sleep(time.Second) // 模拟耗时任务
fmt.Printf("Worker %d finished job %d\n", id, j) // 打印完成任务的信息
results <- j * 2 // 将任务结果发送到results通道
}
}
func main() {
const numJobs = 5 // 定义任务数量
jobs := make(chan int, numJobs) // 创建一个可以存储numJobs个int类型元素的通道
results := make(chan int, numJobs) // 创建一个可以存储numJobs个int类型元素的通道
// 启动3个worker goroutines
for w := 1; w <= 3; w++ {
go worker(w, jobs, results) // 启动worker协程
}
// 发送5个任务到jobs channel
for j := 1; j <= numJobs; j++ {
jobs <- j // 将任务发送到jobs通道
}
close(jobs) // 关闭jobs通道,表示所有任务已经发送完毕
// 收集结果
for a := 1; a <= numJobs; a++ {
res := <-results // 从results通道中读取结果
fmt.Println("Result:", res) // 打印结果
}
}- 在 Go 语言中,for range 是一种循环结构,用于遍历数组、切片、字符串、map 或者通道(channel)。
- 当用于遍历通道时,for range 会持续从通道中接收数据,直到该通道被显式关闭。
- 如果通道未被关闭,那么 for range 循环会一直阻塞,等待接收新的数据。
- 在你的代码
for j := range jobs中,jobs 是一个通道,for range jobs 就会不断从 jobs 这个通道中读取数据,每次读取的数据会被赋值给j,直到 jobs 这个通道被关闭。 - 如果 jobs 通道未被关闭,这个循环就会一直等待新的数据到来。
- 在你的代码
4. channels的作业调度
4.1 先来先服务(FCFS)
这种调度方式是
chan的默认行为,任务按照放入通道的顺序被读取和处理。
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
func worker(taskQueue chan Task, wg *sync.WaitGroup) {
for task := range taskQueue {
fmt.Printf("Processing task ID: %d\n", task.ID)
time.Sleep(time.Second) // 模拟任务处理时间
wg.Done()
}
}
func main() {
const workerCount = 3
taskQueue := make(chan Task, workerCount)
var wg sync.WaitGroup
// 启动 worker
for i := 0; i < workerCount; i++ {
go worker(taskQueue, &wg)
}
// 添加任务
for i := 1; i <= 10; i++ {
wg.Add(1)
taskQueue <- Task{ID: i}
}
close(taskQueue)
wg.Wait()
}
4.2 短作业优先(SJF)
这种调度方式需要对任务进行排序,短作业优先被处理。可以使用优先级队列或手动排序来实现。
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
type Task struct {
Duration int
ID int
}
type PriorityQueue []*Task
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Duration < pq[j].Duration // 短作业优先
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*Task))
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
func worker(taskQueue chan *Task, wg *sync.WaitGroup) {
for task := range taskQueue {
fmt.Printf("Processing task ID: %d with duration %d\n", task.ID, task.Duration)
time.Sleep(time.Duration(task.Duration) * time.Second) // 模拟任务处理时间
wg.Done()
}
}
func main() {
const workerCount = 3
taskQueue := make(chan *Task, workerCount)
var wg sync.WaitGroup
// 启动 worker
for i := 0; i < workerCount; i++ {
go worker(taskQueue, &wg)
}
// 创建优先级队列
pq := make(PriorityQueue, 0)
heap.Init(&pq)
// 添加任务到优先级队列
tasks := []*Task{
{Duration: 3, ID: 1},
{Duration: 1, ID: 2},
{Duration: 2, ID: 3},
}
for _, task := range tasks {
heap.Push(&pq, task)
}
// 将任务分发给 worker
for pq.Len() > 0 {
wg.Add(1)
task := heap.Pop(&pq).(*Task)
taskQueue <- task
}
close(taskQueue)
wg.Wait()
}
4.3 最高响应比优先(HRRN)
这种调度方式比较复杂,需要计算每个任务的响应比。可以使用自定义逻辑来计算响应比,并按此顺序处理任务。
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ArrivalTime time.Time
Duration int
ID int
}
type TaskQueue struct {
tasks []*Task
}
func (tq *TaskQueue) Add(task *Task) {
tq.tasks = append(tq.tasks, task)
}
func (tq *TaskQueue) GetNext() *Task {
if len(tq.tasks) == 0 {
return nil
}
now := time.Now()
var highestRatioTask *Task
highestRatio := -1.0
for _, task := range tq.tasks {
waitTime := now.Sub(task.ArrivalTime).Seconds()
responseRatio := (waitTime + float64(task.Duration)) / float64(task.Duration)
if responseRatio > highestRatio {
highestRatio = responseRatio
highestRatioTask = task
}
}
// Remove the selected task from the queue
for i, task := range tq.tasks {
if task == highestRatioTask {
tq.tasks = append(tq.tasks[:i], tq.tasks[i+1:]...)
break
}
}
return highestRatioTask
}
func worker(taskQueue chan *Task, wg *sync.WaitGroup) {
for task := range taskQueue {
fmt.Printf("Processing task ID: %d with duration %d\n", task.ID, task.Duration)
time.Sleep(time.Duration(task.Duration) * time.Second) // 模拟任务处理时间
wg.Done()
}
}
func main() {
const workerCount = 3
taskQueue := make(chan *Task, workerCount)
var wg sync.WaitGroup
// 启动 worker
for i := 0; i < workerCount; i++ {
go worker(taskQueue, &wg)
}
// 创建任务队列
tq := &TaskQueue{}
// 添加任务到任务队列
tasks := []*Task{
{ArrivalTime: time.Now(), Duration: 3, ID: 1},
{ArrivalTime: time.Now(), Duration: 1, ID: 2},
{ArrivalTime: time.Now(), Duration: 2, ID: 3},
}
for _, task := range tasks {
tq.Add(task)
}
// 模拟任务的到达时间
time.Sleep(2 * time.Second)
tq.Add(&Task{ArrivalTime: time.Now(), Duration: 4, ID: 4})
// 将任务分发给 worker
for {
task := tq.GetNext()
if task == nil {
break
}
wg.Add(1)
taskQueue <- task
}
close(taskQueue)
wg.Wait()
}