Go Channel

11 min

Channel(通道)

1. Channels的意义

  1. 消息传递:Channels通过传递消息(数据)在不同的goroutine之间进行通信。
  2. 同步机制:无缓冲的channel具有 :o:同步的特性,确保发送和接收操作是同步的,有缓冲的channel则具有异步的特性。
  3. 安全通信:Channels确保了不同goroutine之间的安全通信,避免了共享数据时出现的竞争条件(race condition)。

2. Channels的特点:

  1. 类型化:每个channel都有一个类型,声明时需要指定。channel只能传递这种类型的数据。
  2. 无缓冲和有缓冲:无缓冲的channel在没有接收方接收数据时,发送方会阻塞;有缓冲的channel允许在缓冲区满之前发送数据而不会阻塞。
    • 同步和通信:通过channel传递数据的操作是同步的,即发送操作和接收操作会阻塞,直到另一端准备好。这使得channels可以用于goroutine之间的同步。

3. Channel的基本使用:

3.1 无缓冲的channels:

 package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42
    }()
    value := <-ch // 从Channel ch中接收数据,并将数据赋值给value
    fmt.Println(value) // 输出:42
}
  • 操作符是 <-

    箭头的方向指向数据可以流动的方向。

     ch <- v    // 发送值v到Channel ch中,“写”操作,数据流入通道
     v := <-ch  // 从Channel ch中接收数据,并将数据赋值给v,“读”操作,数据从传到流出
  • make的用法

    • 声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先用make创建

      ch := make(chan int)
      • chan int 是一个整体,表示一个整型的通道(channel)
      • chan 是 Go 语言中的关键字,用于声明一个通道类型
      • 后面的 int 表示这个通道用于传递 int 类型的数据。

3.2 有缓冲的channels

// 创建一个有缓冲的channel,缓冲区大小为2
messages := make(chan string, 2)

// 向channel发送数据
messages <- "buffered"
messages <- "channel"

// 从channel接收数据
fmt.Println(<-messages)
fmt.Println(<-messages)

3.3 关闭channel:

  • 当不再需要发送数据时,可以关闭channel,这样接收方可以通过检查channel是否关闭来终止接收操作。

    close(messages)

3.4 Range循环接收数据:

  • 在接收方,可以使用range循环从channel接收数据,直到channel被关闭。

    go func() {
        messages <- "message 1"
        messages <- "message 2"
        close(messages)
    }()
    
    for msg := range messages {
        fmt.Println(msg)
    }
    • for range:会源源不断从通道messages中读出信息,赋予msg

3.5 Select语句

  • 并发处理多通道,当多个通道都准备好时,select 会随机选择一个通道操作。这有助于避免偏向某个特定通道,保证公平性。

    • select 允许同时等待多个通道的操作,无论是接收(<-channel)还是发送(channel <-)。这使得它非常适合处理多路复用和并发任务。

      package main
      
      import (
      	"fmt"
      	"time"
      )
      
      func main() {
      	channel1 := make(chan string)
      	channel2 := make(chan string)
      
      	go func() {
      		time.Sleep(2 * time.Second)
      		channel1 <- "Message from channel1"
      	}()
      
      	go func() {
      		time.Sleep(1 * time.Second)
      		channel2 <- "Message from channel2"
      	}()
      
      	for i := 0; i < 2; i++ {
      		select {
      		case msg1 := <-channel1:
      			fmt.Println("Received", msg1)
      		case msg2 := <-channel2:
      			fmt.Println("Received", msg2)
      		}
      	}
      }
  • 防止阻塞

    • select 可以用 default 子句来避免阻塞。如果所有的通道都没有准备好,default 子句将被执行。这样可以防止因等待通道操作而导致的阻塞。

      package main
      
      import "fmt"
      
      func main() {
      	channel := make(chan string)
      
      	select {
      	case msg := <-channel:
      		fmt.Println("Received", msg)
      	default:
      		fmt.Println("No messages received")
      	}
      }
  • 超时处理

    • select 可以与 time.After 结合使用来实现超时操作。如果某个通道在指定时间内没有响应,可以执行超时处理。

      package main
      
      import (
      	"fmt"
      	"time"
      )
      
      func main() {
      	channel := make(chan string)
      
      	go func() {
      		time.Sleep(2 * time.Second)
      		channel <- "Message from channel"
      	}()
      
      	select {
      	case msg := <-channel:
      		fmt.Println("Received", msg)
      	case <-time.After(1 * time.Second):
      		fmt.Println("Timeout")
      	}
      }
  • 简洁性和可读性

    • select 语句可以简化处理多个通道操作的代码,使代码更加简洁和可读

      select {
      case msg1 := <-channel1:
          fmt.Println("Received", msg1)
      case msg2 := <-channel2:
          fmt.Println("Received", msg2)
      default:
          fmt.Println("No messages received")
      }

3.6 实际示例

下面是一个实际的例子,展示了如何使用goroutinechannel来实现并发任务:

package main

import (
    "fmt"
    "time"
)

// worker函数接收一个只读的jobs通道和一个只写的results通道
func worker(id int, jobs <-chan int, results chan<- int) {
    // 从jobs通道中读取任务
    for j := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, j) // 打印开始任务的信息
        time.Sleep(time.Second) // 模拟耗时任务
        fmt.Printf("Worker %d finished job %d\n", id, j) // 打印完成任务的信息
        results <- j * 2 // 将任务结果发送到results通道
    }
}

func main() {
    const numJobs = 5 // 定义任务数量
    jobs := make(chan int, numJobs) // 创建一个可以存储numJobs个int类型元素的通道
    results := make(chan int, numJobs) // 创建一个可以存储numJobs个int类型元素的通道

    // 启动3个worker goroutines
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results) // 启动worker协程
    }

    // 发送5个任务到jobs channel
    for j := 1; j <= numJobs; j++ {
        jobs <- j // 将任务发送到jobs通道
    }
    close(jobs) // 关闭jobs通道,表示所有任务已经发送完毕

    // 收集结果
    for a := 1; a <= numJobs; a++ {
        res := <-results // 从results通道中读取结果
        fmt.Println("Result:", res) // 打印结果
    }
}
  • 在 Go 语言中,for range 是一种循环结构,用于遍历数组、切片、字符串、map 或者通道(channel)。
    • 当用于遍历通道时,for range 会持续从通道中接收数据,直到该通道被显式关闭。
    • 如果通道未被关闭,那么 for range 循环会一直阻塞,等待接收新的数据。
      • 在你的代码 for j := range jobs 中,jobs 是一个通道,for range jobs 就会不断从 jobs 这个通道中读取数据,每次读取的数据会被赋值给 j,直到 jobs 这个通道被关闭。
      • 如果 jobs 通道未被关闭,这个循环就会一直等待新的数据到来。

4. channels的作业调度

4.1 先来先服务(FCFS)

这种调度方式是 chan 的默认行为,任务按照放入通道的顺序被读取和处理。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task struct {
	ID int
}

func worker(taskQueue chan Task, wg *sync.WaitGroup) {
	for task := range taskQueue {
		fmt.Printf("Processing task ID: %d\n", task.ID)
		time.Sleep(time.Second) // 模拟任务处理时间
		wg.Done()
	}
}

func main() {
	const workerCount = 3
	taskQueue := make(chan Task, workerCount)
	var wg sync.WaitGroup

	// 启动 worker
	for i := 0; i < workerCount; i++ {
		go worker(taskQueue, &wg)
	}

	// 添加任务
	for i := 1; i <= 10; i++ {
		wg.Add(1)
		taskQueue <- Task{ID: i}
	}

	close(taskQueue)
	wg.Wait()
}

4.2 短作业优先(SJF)

这种调度方式需要对任务进行排序,短作业优先被处理。可以使用优先级队列或手动排序来实现。

package main

import (
	"container/heap"
	"fmt"
	"sync"
	"time"
)

type Task struct {
	Duration int
	ID       int
}

type PriorityQueue []*Task

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].Duration < pq[j].Duration // 短作业优先
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
	*pq = append(*pq, x.(*Task))
}

func (pq *PriorityQueue) Pop() interface{} {
	old := *pq
	n := len(old)
	item := old[n-1]
	*pq = old[0 : n-1]
	return item
}

func worker(taskQueue chan *Task, wg *sync.WaitGroup) {
	for task := range taskQueue {
		fmt.Printf("Processing task ID: %d with duration %d\n", task.ID, task.Duration)
		time.Sleep(time.Duration(task.Duration) * time.Second) // 模拟任务处理时间
		wg.Done()
	}
}

func main() {
	const workerCount = 3
	taskQueue := make(chan *Task, workerCount)
	var wg sync.WaitGroup

	// 启动 worker
	for i := 0; i < workerCount; i++ {
		go worker(taskQueue, &wg)
	}

	// 创建优先级队列
	pq := make(PriorityQueue, 0)
	heap.Init(&pq)

	// 添加任务到优先级队列
	tasks := []*Task{
		{Duration: 3, ID: 1},
		{Duration: 1, ID: 2},
		{Duration: 2, ID: 3},
	}

	for _, task := range tasks {
		heap.Push(&pq, task)
	}

	// 将任务分发给 worker
	for pq.Len() > 0 {
		wg.Add(1)
		task := heap.Pop(&pq).(*Task)
		taskQueue <- task
	}

	close(taskQueue)
	wg.Wait()
}

4.3 最高响应比优先(HRRN)

这种调度方式比较复杂,需要计算每个任务的响应比。可以使用自定义逻辑来计算响应比,并按此顺序处理任务。

package main

import (
	"fmt"
	"sync"
	"time"
)

type Task struct {
	ArrivalTime time.Time
	Duration    int
	ID          int
}

type TaskQueue struct {
	tasks []*Task
}

func (tq *TaskQueue) Add(task *Task) {
	tq.tasks = append(tq.tasks, task)
}

func (tq *TaskQueue) GetNext() *Task {
	if len(tq.tasks) == 0 {
		return nil
	}
	now := time.Now()
	var highestRatioTask *Task
	highestRatio := -1.0
	for _, task := range tq.tasks {
		waitTime := now.Sub(task.ArrivalTime).Seconds()
		responseRatio := (waitTime + float64(task.Duration)) / float64(task.Duration)
		if responseRatio > highestRatio {
			highestRatio = responseRatio
			highestRatioTask = task
		}
	}
	// Remove the selected task from the queue
	for i, task := range tq.tasks {
		if task == highestRatioTask {
			tq.tasks = append(tq.tasks[:i], tq.tasks[i+1:]...)
			break
		}
	}
	return highestRatioTask
}

func worker(taskQueue chan *Task, wg *sync.WaitGroup) {
	for task := range taskQueue {
		fmt.Printf("Processing task ID: %d with duration %d\n", task.ID, task.Duration)
		time.Sleep(time.Duration(task.Duration) * time.Second) // 模拟任务处理时间
		wg.Done()
	}
}

func main() {
	const workerCount = 3
	taskQueue := make(chan *Task, workerCount)
	var wg sync.WaitGroup

	// 启动 worker
	for i := 0; i < workerCount; i++ {
		go worker(taskQueue, &wg)
	}

	// 创建任务队列
	tq := &TaskQueue{}

	// 添加任务到任务队列
	tasks := []*Task{
		{ArrivalTime: time.Now(), Duration: 3, ID: 1},
		{ArrivalTime: time.Now(), Duration: 1, ID: 2},
		{ArrivalTime: time.Now(), Duration: 2, ID: 3},
	}

	for _, task := range tasks {
		tq.Add(task)
	}

	// 模拟任务的到达时间
	time.Sleep(2 * time.Second)
	tq.Add(&Task{ArrivalTime: time.Now(), Duration: 4, ID: 4})

	// 将任务分发给 worker
	for {
		task := tq.GetNext()
		if task == nil {
			break
		}
		wg.Add(1)
		taskQueue <- task
	}

	close(taskQueue)
	wg.Wait()
}