最近在实现两个需求,由于两者之间并没有依赖关系,所以想利用队列进行解耦;但在 Go 的标准库中并没有现成可用并且并发安全的数据结构;但 Go 提供了一个更加优雅的解决方案,那就是 channel。
channel 应用Go 与 Java 的一个很大的区别就是并发模型不同,Go 采用的是 CSP(Communicating sequential processes) 模型;用 Go 官方的说法:
Do not communicate by sharing memory; instead, share memory by communicating.
翻译过来就是:不用使用共享内存来通信,而是用通信来共享内存。
而这里所提到的通信,在 Go 里就是指代的 channel。
只讲概念并不能快速的理解与应用,所以接下来会结合几个实际案例更方便理解。
futrue taskGo 官方没有提供类似于 Java 的 FutureTask 支持:
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(2); Task task = new Task(); FutureTask<String> futureTask = new FutureTask<>(task); executorService.submit(futureTask); String s = futureTask.get(); System.out.println(s); executorService.shutdown(); } } class Task implements Callable<String> { @Override public String call() throws Exception { // 模拟http System.out.println("http request"); Thread.sleep(1000); return "request success"; } }但我们可以使用 channel 配合 goroutine 实现类似的功能:
func main() { ch := Request("https://github.com") select { case r := <-ch: fmt.Println(r) } } func Request(url string) <-chan string { ch := make(chan string) go func() { // 模拟http请求 time.Sleep(time.Second) ch <- fmt.Sprintf("url=%s, res=%s", url, "ok") }() return ch }goroutine 发起请求后直接将这个 channel 返回,调用方会在请求响应之前一直阻塞,直到 goroutine 拿到了响应结果。
goroutine 互相通信 /** * 偶数线程 */ public static class OuNum implements Runnable { private TwoThreadWaitNotifySimple number; public OuNum(TwoThreadWaitNotifySimple number) { this.number = number; } @Override public void run() { for (int i = 0; i < 11; i++) { synchronized (TwoThreadWaitNotifySimple.class) { if (number.flag) { if (i % 2 == 0) { System.out.println(Thread.currentThread().getName() + "+-+偶数" + i); number.flag = false; TwoThreadWaitNotifySimple.class.notify(); } } else { try { TwoThreadWaitNotifySimple.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } /** * 奇数线程 */ public static class JiNum implements Runnable { private TwoThreadWaitNotifySimple number; public JiNum(TwoThreadWaitNotifySimple number) { this.number = number; } @Override public void run() { for (int i = 0; i < 11; i++) { synchronized (TwoThreadWaitNotifySimple.class) { if (!number.flag) { if (i % 2 == 1) { System.out.println(Thread.currentThread().getName() + "+-+奇数" + i); number.flag = true; TwoThreadWaitNotifySimple.class.notify(); } } else { try { TwoThreadWaitNotifySimple.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }这里截取了”两个线程交替打印奇偶数“的部分代码。
Java 提供了 object.wait()/object.notify() 这样的等待通知机制,可以实现两个线程间通信。
go 通过 channel 也能实现相同效果:
func main() { ch := make(chan struct{}) go func() { for i := 1; i < 11; i++ { ch <- struct{}{} //奇数 if i%2 == 1 { fmt.Println("奇数:", i) } } }() go func() { for i := 1; i < 11; i++ { <-ch if i%2 == 0 { fmt.Println("偶数:", i) } } }() time.Sleep(10 * time.Second) }本质上他们都是利用了线程(goroutine)阻塞然后唤醒的特性,只是 Java 是通过 wait/notify 机制;
而 go 提供的 channel 也有类似的特性:
向 channel 发送数据时(ch<-struct{}{})会被阻塞,直到 channel 被消费(<-ch)。
以上针对于无缓冲 channel。
channel 本身是由 go 原生保证并发安全的,不用额外的同步措施,可以放心使用。
广播通知不仅是两个 goroutine 之间通信,同样也能广播通知,类似于如下 Java 代码:
public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new Thread(() -> { try { synchronized (NotifyAll.class){ NotifyAll.class.wait(); } System.out.println(Thread.currentThread().getName() + "done...."); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(3000); synchronized (NotifyAll.class){ NotifyAll.class.notifyAll(); } }主线程将所有等待的子线程全部唤醒,这个本质上也是通过 wait/notify 机制实现的,区别只是通知了所有等待的线程。