为读取信息,也需要依照信息格式编写函数。先读取4个字节并作为一个uint32以表示长度length,然后依据这个数字读取相应位数的数据,这部分中的第一个字节表示ID,剩下的表示payload。
// Read函数用于解析信息 func Read(r io.Reader) (*Message, error) { lengthBuf := make([]byte, 4) _, err := io.ReadFull(r, lengthBuf) if err != nil { return nil, err } length := binary.BigEndian.Uint32(lengthBuf) // keep-alive if length == 0 { return nil, nil } messageBuf := make([]byte, length) _, err = io.ReadFull(r, messageBuf) if err != nil { return nil, err } m := Message{ ID: messageID(messageBuf[0]), Payload: messageBuf[1:], } return &m, nil } 位域(~/bitfield/bitfield.go)peers主机使用位域来高效地编码自身能够提供的资源分块。位域类似基于字节的数组,被标为1的位即代表拥有这个资源分块。因为使用单个的位即能完成标注,位域有极高的压缩能力,这意味着在一个布尔(bool)空间内完成了8次布尔类型的操作。
当然这样的思路需要一定的代价:可以寻址的最小内存单位是字节,处理单个的位就需要额外的函数设计。
// Bitfield用以表示一台peer主机拥有的资源分块 type Bitfield []byte // HasPiece用以表明一个位域(bitfield)是否有特定的索引集 func (bf Bitfield) HasPiece(index int) bool { byteIndex := index / 8 offset := index % 8 if byteIndex < 0 || byteIndex >= len(bf) { return false } return bf[byteIndex] >> uint(7-offset)&1 != 0 } // SetPiece用以在位域设置单个位 func (bf Bitfield) SetPiece(index int) { byteIndex := index / 8 offset := index % 8 // 撇除不合规的索引 if byteIndex < 0 || byteIndex >= len(bf) { return } bf[byteIndex] |= 1 << uint(7-offset) } 组装至此完成了所有下载种子文件的工具:
从trackers服务器获得了peers主机列表;
与peers主机达成TCP连接;
与peers主机进行握手;
与peers主机收发信息。
现在面临的问题是如何解决下载必然造成的高并发(concurrency),并且需要统一管理每个连接的peer主机的状态(state)。
高并发(~/p2p/p2p.go)在Effective Go中对并发的描述中有这样一句话:
Do not communicate by sharing memory; instead, share memory by communicating.
官网给出了解释。
这里将Go中重要的Channel类型作为简洁且线程安全的队列。Channel可以被认为是管道,通过并发核心单元就可以发送或者接收数据进行通讯(communication)。
建立两个Channel来同步并发工作:一个用于在peers主机间分派工作(要下载的资源分块),另一个用于已下载的分块。
workQueue := make(chan *pieceWork, len(t.PieceHashes)) results := make(chan *pieceResult) for index, hash := range t.PieceHashes { length := t.calculatePieceSize(index) workQueue <- &pieceWork{index, hash, length} } // 执行下载 for _, peer := range t.Peers { go t.startDownloadWorker(peer, workQueue, results) } // 收集分块 buf := make([]byte, t.Length) donePieces := 0 for donePieces < len(t.PieceHashes) { res := <- results begin, end := t.calculateBoundsForPiece(res.index) copy(buf[begin:end], res.buf) donePieces ++ percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100 numWorkers := runtime.NumGoroutine() - 1 log.Printf("(%0.2f%%) downloaded piece #%d from %d peers\n", percent, res.index, numWorkers) } close(workQueue)为取得的每个peer主机都生成一个goroutine(轻量级线程)。每个线程连接peer主机并握手,然后从workQueue中抽取任务,尝试进行下载,并把下载得到的分块传至名为results的channel。
可以用流程图表示这个过程: