【Go】使用压缩文件优化io (二)

原文链接: https://blog.thinkeridea.com/201907/go/compress_file_io_optimization2.html

上一篇文章《使用压缩文件优化io (一)》中记录了日志备份 io 优化方案,使用文件流数据压缩方案优化 io 性能,效果十分显著。这篇文章记录数据分析前置清洗、格式化数据的 io 优化方案,我们有一台专用的日志前置处理服务器,所有业务日志通过这台机器从 OSS 拉取回来清洗、格式化,最后进入到数据仓储中便于后续的分析。

随着业务扩展这台服务器压力越来越大,高峰时数据延迟越来越厉害,早期也是使用 Python 脚本 + awk 以及一些 shell 命令完成相关工作,在数据集不是很大的时候这种方案很好,效率也很高,随着数据集变大,发现服务器负载很高,经过分析是还是 io 阻塞,依旧采用对数据流进行处理的方案优化io,以下记录优化的过程。

背景介绍

服务器配置:4 核 8G; 磁盘:1T

分析前置服务会根据业务不同分为十分钟、一小时两个阶段拉取分析日志,每隔一个阶段会去 OSS 拉取日志回到服务器进行处理,处理过程因 io 阻塞,导致 CPU 和 load 异常高,且处理效率严重下降,这次优化主要就是降低 io 阻塞,提升 CPU 利用率 (处理业务逻辑而不是等待 io) 和处理效率。

后文中会详细描述优化前后的方案,并用 go 编写测试,使用一台 2 核4G的服务器进行测试,测试数据集大小为:

文件数量:432个

压缩文件:17G

解压后文件:63G

压缩方案:lzo

Goroutine 数量:20

优化前

优化前日志处理流程:

获取待处理文件列表

拉取 OSS 日志到本地磁盘 (压缩文件)

解压缩日志文件

读取日志数据

业务处理……

导入到数据仓储中

导致 io 阻塞的部分主要是: 拉取 OSS 日志、解压缩日志文件及读取日志数据,优化也主要从这三块着手。

这里有一段公共的日志读取方法,该方法接收一个 io.Reader, 并按行读取日志,并简单切分日志字段,并没有实质的处理日志数据,后面的优化方案也将使用这个方法读取日志。

package main import ( "bufio" "bytes" "io" "github.com/thinkeridea/go-extend/exbytes" ) func Read(r io.Reader) { rawBuffer := make([]byte, 512) buf := bufio.NewReader(r) for { line, ok, err := readLine(buf, rawBuffer) if err == io.EOF { return } if err != nil { panic(nil) } if ok { rawBuffer = line } c := bytes.Count(line, []byte{'\x01'}) if c != 65 { panic("无效的行") } } } func readLine(r *bufio.Reader, rawBuffer []byte) ([]byte, bool, error) { var ok bool line, err := r.ReadSlice('\n') if (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { rawBuffer = append(rawBuffer[:0], line...) for (err == bufio.ErrBufferFull || len(line) < 3 || exbytes.ToString(line[len(line)-3:]) != "\r\r\n") && err != io.EOF { line, err = r.ReadSlice('\n') rawBuffer = append(rawBuffer, line...) } line = rawBuffer ok = true } if len(line) > 0 && err == io.EOF { err = nil } return line, ok, err }

日志按 \r\r\n 分隔行,使用 \x01 切分字段,读取方法使用 bufio.ReadSlice 方法,避免内存分配,且当 bufio 缓冲区满之后使用 rwaBuffer 作为本地可扩展缓冲,每次扩展之后会保留最大的扩展空间,因为业务日志每行大小差不多,这样可以极大的减少内存分配,效率是 bufio.ReadLine 方法的好几倍。

package main import ( "fmt" "os" "os/exec" "path/filepath" "strings" "sync" "time" ".../pkg/aliyun_oss" // 虚假的包 ) func main() { var oss *aliyun_oss.AliyunOSS // 对接阿里云 OSS,需要自行封装 files := list() // 这是一个虚构的方法,用来获取待处理的文件列表 fmt.Printf("待处理文件数量:%d\n", len(files)) start := time.Now() defer func(t time.Time) { fmt.Printf("共耗时:%0.6f\n", time.Now().Sub(t).Seconds()) }(start) // 下载日志文件 n := 20 var wg sync.WaitGroup c := make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { f, ok := <-c if !ok { return } if _, err := os.Stat(f); err == nil { return } else if !os.IsNotExist(err) { panic(err) } dir := filepath.Dir(f) err := os.MkdirAll(dir, 0755) if err != nil { panic(err) } err = oss.GetObjectToFile(f, f) if err != nil { panic(err) } } }() } for _, f := range files { c <- f } close(c) wg.Wait() fmt.Printf("下载文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) // 解压日志文件 start = time.Now() shell := exec.Command("/bin/bash", "-c", "lzop -df logs/*/*/*/*/*/*.lzo") err := shell.Run() if err != nil { panic(err) } fmt.Printf("解压文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) // 读取日志文件 start = time.Now() c = make(chan string) wg.Add(n) for i := 0; i < n; i++ { go func() { defer wg.Done() for { file, ok := <-c if !ok { return } f, err := os.Open(file) if err != nil { panic(err) } Read(f) f.Close() } }() } for _, f := range files { c <- strings.TrimRight(f, ".lzo") } close(c) wg.Wait() fmt.Printf("读取文件耗时:%0.6f\n", time.Now().Sub(start).Seconds()) }

运行程序输出如下:

待处理文件数量:432 下载文件耗时:303.562865 解压文件耗时:611.236232 读取文件耗时:460.371245 共耗时:1375.187261

通过 iostat -m -x 5 10000 分析各个阶段结果如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzxdw.html