超越 “Hello World”(5)

二进制输出数据日志文件: log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:

USER test
  PASS none

二进制输入数据日志文件: log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:

+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1
  +OK password, please.
  -ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ

貌似可以工作,让我们通过直接下载一个较大的二进制文件和通过代理下载测试一下程序性能。

直接下载(文件大小约72MB):

time wget
  ...
  Saving to: `otp_src_R15B01.tar.gz'
  ...
  real        1m2.819s

现在启动代理,然后通过代理下载文件:

go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080

下载:

time wget :8080/download/otp_src_R15B01.tar.gz
  ...
  Saving to: `otp_src_R15B01.tar.gz.1'
  ...
  real        0m56.209s

比较结果:

diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1

两者匹配,说明程序正确工作。

来看性能。我在我的Mac Air笔记本上重复这个实验许多次。令人惊讶的是,通过代理下载文件比直接下载还快一点。在上面的例子:1m2819s(直接)vs 0m.56209s (通过代理)。我能想到的唯一的解释是wget是单线程的,它在一个线程内多路复用输入输出数据流。相比,代理处理数据流在各自的线程中,这可能导致一点提速。但是这种差别很小,几乎可以忽略不计,在其他计算机或网络这种差异可能完全消失。主要的结论是:

通过代理下载没有降低程序运行速度,尽管有创建大量日志文件的额外开销。

总体来看,我希望你从简单性和清晰度的角度来看这个程序。上面已经指出,但我再次强调:在这个程序中,我是渐进地使用了多线程。问题的实质促使我在处理一个连接时识别出并行活动,然后Go语言并行机制的易用性和安全性使并行得以实现。最后,使用并行我并没有考虑效率与复杂性(比较难调试)间的权衡。

我同意,有时一个问题只需改变位和字节,这时代码的线性效率是你唯一需要关心的。但你遇到越来越多的问题,并行能力、多线程处理成为关键因素,对于这种应用,Go语言非常耀眼。

程序整体代码 gotcpspy.go :

package main

import (
 "encoding/hex"
 "flag"
 "fmt"
 "net"
 "os"
 "runtime"
 "strings"
 "time"
)

var (
 host        *string = flag.String("host", "", "target host or address")
 port        *string = flag.String("port", "0", "target port")
 listen_port *string = flag.String("listen_port", "0", "listen port")
)

func die(format string, v ...interface{}) {
 os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))
 os.Exit(1)
}

func connection_logger(data chan []byte, conn_n int, local_info, remote_info string) {
 log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log", format_time(time.Now()), conn_n, local_info, remote_info)
 logger_loop(data, log_name)
}

func binary_logger(data chan []byte, conn_n int, peer string) {
 log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log", format_time(time.Now()), conn_n, peer)
 logger_loop(data, log_name)
}

func logger_loop(data chan []byte, log_name string) {
 f, err := os.Create(log_name)
 if err != nil {
  die("Unable to create file %s, %v\n", log_name, err)
 }
 defer f.Close()
 for {
  b := <-data
  if len(b) == 0 {
   break
  }
  f.Write(b)
  f.Sync()
 }
}

func format_time(t time.Time) string {
 return t.Format("2006.01.02-15.04.05")
}

func printable_addr(a net.Addr) string {
 return strings.Replace(a.String(), ":", "-", -1)
}

type Channel struct {
 from, to              net.Conn
 logger, binary_logger chan []byte
 ack                  chan bool
}

func pass_through(c *Channel) {
 from_peer := printable_addr(c.from.LocalAddr())
 to_peer := printable_addr(c.to.LocalAddr())

b := make([]byte, 10240)
 offset := 0
 packet_n := 0
 for {
  n, err := c.from.Read(b)
  if err != nil {
   c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n", from_peer))
   break
  }

if n > 0 {
   c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X) %d bytes from %s\n", packet_n, offset, n, from_peer))
   c.logger <- []byte(hex.Dump(b[:n]))
   c.binary_logger <- b[:n]
   c.to.Write(b[:n])
   c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n", packet_n, to_peer))
   offset += n
   packet_n += 1
  }
 }
 c.from.Close()
 c.to.Close()
 c.ack <- true
}

func process_connection(local net.Conn, conn_n int, target string) {
 remote, err := net.Dial("tcp", target)
 if err != nil {
  fmt.Printf("Unable to connect to %s, %v\n", target, err)
 }

local_info := printable_addr(remote.LocalAddr())
 remote_info := printable_addr(remote.RemoteAddr())

started := time.Now()

logger := make(chan []byte)
 from_logger := make(chan []byte)
 to_logger := make(chan []byte)
 ack := make(chan bool)

go connection_logger(logger, conn_n, local_info, remote_info)
 go binary_logger(from_logger, conn_n, local_info)
 go binary_logger(to_logger, conn_n, remote_info)

logger <- []byte(fmt.Sprintf("Connected to %s at %s\n", target, format_time(started)))

go pass_through(&Channel{remote, local, logger, to_logger, ack})
 go pass_through(&Channel{local, remote, logger, from_logger, ack})
 <-ack // Make sure that the both copiers gracefully finish.
 <-ack //

finished := time.Now()
 duration := finished.Sub(started)
 logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n", format_time(started), duration.String()))

logger <- []byte{}      // Stop logger
 from_logger <- []byte{} // Stop "from" binary logger
 to_logger <- []byte{}  // Stop "to" binary logger
}

func main() {
 runtime.GOMAXPROCS(runtime.NumCPU())
 flag.Parse()
 if flag.NFlag() != 3 {
  fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_port=local_port\n")
  flag.PrintDefaults()
  os.Exit(1)
 }
 target := net.JoinHostPort(*host, *port)
 fmt.Printf("Start listening on port %s and forwarding data to %s\n", *listen_port, target)
 ln, err := net.Listen("tcp", ":"+*listen_port)
 if err != nil {
  fmt.Printf("Unable to start listener, %v\n", err)
  os.Exit(1)
 }
 conn_n := 1
 for {
  if conn, err := ln.Accept(); err == nil {
   go process_connection(conn, conn_n, target)
   conn_n += 1
  } else {
   fmt.Printf("Accept failed, %v\n", err)
  }
 }
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/901c8d2d20942b7d88e3cd2a380cd535.html