刚到一家公司需要写一个实时分析tshark捕捉到的数据,tshark一直往文本里面写数据,写一个程序要实时获取到添加的数据并进行分析处理最后入库。此时思绪狂飞,想了一些比较挫的方法。
本人想到的方法:
1.每隔一定时间去查看下文件的mtime,如果有改动则读取数据,并记录读取的行数。下次再去读这个文件的数据则上次记录的行数开始继续读。当文件行数太大的时候这个程序的效率就很慢了,也可以记录上次读取的字节数,然后使用linux下的open系统系统中的seek从指定位置处读取。但是要是用C语言来写的话后面的字符串处理和分析就比较麻烦了果断放弃这个方案。
我们经理的方法:
1.利用linux中的pipe,这个方法很是有用。pipe就是管道,一个输入端,一个输出端。只要有数据输入就立马送到输出端进行处理。此办法效率还不错。最终被设计成这样:
1、tshark捕捉数据重定向到pipe中。
2、一个线程从pipe读取数据处理后放入到队列中。
3、另外一个线程从队中取数据进行最后的 处理和入库。
存在的问题:
此法很好,后来当我处理到一个问题的时候需要对实时对apache的日志分析并入库,此时我就准备直接把经理写好的程序demo拿来改改用。但是发现apache无法往pipe中写,我将apache日志文件删除后,重新建立成pipe格式的文件,然后写程序读这个pipe,发现读不了。
可能是apache不支持往pipe中写日志吧。这下歇菜了。
《Python开发技术详解》.( 周伟,宗杰).[高清PDF扫描版+随书视频+代码]
最终的解决方案:
tail -f + pipe 搞定。我们都知道tail -f 一个文件的时候会实时显示文件新增的内容。然后python中的subprocess.Popen中可以指定输出到PIPE中stdout=subprocess.PIPE,这样的话只要一个while每次去读pipe中数据就OK了。
2.方法
经理的方法(为了保密代码做了一定的处理):
#!/usr/bin/env python2.7
import datetime
import time
from CURD import Field, Model, Database
import threading
import Queue
import os
CONFIG = {
}
class dami_log(Model):
date = Field()
netloc = Field()
ip = Field()
path = Field()
cookie = Field()
mac = Field()
cid = Field()
class dami_case(Model):
id = Field()
case_name = Field()
save_time = Field()
is_current = Field()
def thread_read_pipe(q):
if not os.path.exists(CONFIG['pipe']):
os.mkfifo(CONFIG['pipe'], 0777)
f = file(CONFIG['pipe'], 'r')
rest = ''
while True:
读取数据
dat = f.readline()
数据处理.....
放入队列
q.put(dat)
def thread_dump(q,cid):
while True:
try:
从队列中获取数据
line = q.get()
数据处理
dat = line.split('\t')
if any(map(lambda x:dat[3].endswith(x) or (x+"?" in dat[3]),["js","css","ico","jpg","png","gif"])) or dat[3]=="*" or "192.168.10.1" in dat[1]:
print line
else:
payload = {
'date': dat[0] \
if '.' not in dat[0] \
else datetime.datetime.fromtimestamp(
time.mktime(time.strptime(dat[0].rsplit('.', 1)[0],
'%b %d, %Y %X'))),
'netloc': dat[1],
'ip': dat[2],
'path': dat[3],
'cookie': dat[4],
'mac': dat[5],
'cid':cid
}
dami_log(**payload).save()
except Exception as e:
print e
pass
def _main():
CONFIG['db_user'] = '***'
CONFIG['db_passwd'] = '***'
CONFIG['db'] = '***'
CONFIG['pipe'] = '/var/www/script/tsharklog/log.log'
Database.config(user=CONFIG['db_user'],
passwd=CONFIG['db_passwd'],
db=CONFIG['db'])
cid = dami_case.where(is_current=1).getone()
q = Queue.Queue()
trp = threading.Thread(target=thread_read_pipe, args=(q,))
trp.start()
td = threading.Thread(target=thread_dump, args=(q,cid.id))
td.start()
if __name__ == '__main__':