本节对日志模块(logging module)进行简单的介绍。
logging 模块logging 模块是用于记录诊断信息的 Python 标准库模块。日志模块非常庞大,具有许多复杂的功能。我们将会展示一个简单的例子来说明其用处。
再探异常在本节练习中,我们创建这样一个 parse() 函数:
# fileparse.py def parse(f, types=None, names=None, delimiter=None): records = [] for line in f: line = line.strip() if not line: continue try: records.append(split(line,types,names,delimiter)) except ValueError as e: print("Couldn't parse :", line) print("Reason :", e) return records请看到 try-except 语句,在 except 块中,我们应该做什么?
应该打印警告消息(warning message)?
try: records.append(split(line,types,names,delimiter)) except ValueError as e: print("Couldn't parse :", line) print("Reason :", e)还是默默忽略警告消息?
try: records.append(split(line,types,names,delimiter)) except ValueError as e: pass任何一种方式都无法令人满意,通常情况下,两种方式我们都需要(用户可选)。
使用 logginglogging 模块可以解决这个问题:
# fileparse.py import logging log = logging.getLogger(__name__) def parse(f,types=None,names=None,delimiter=None): ... try: records.append(split(line,types,names,delimiter)) except ValueError as e: log.warning("Couldn't parse : %s", line) log.debug("Reason : %s", e)修改代码以使程序能够遇到问题的时候发出警告消息,或者特殊的 Logger 对象。 Logger 对象使用 logging.getLogger(__name__) 创建。
日志基础创建一个记录器对象(logger object)。
log = logging.getLogger(name) # name is a string发出日志消息:
log.critical(message [, args]) log.error(message [, args]) log.warning(message [, args]) log.info(message [, args]) log.debug(message [, args])不同方法代表不同级别的严重性。
所有的方法都创建格式化的日志消息。args 和 % 运算符 一起使用以创建消息。
logmsg = message % args # Written to the log 日志配置配置:
# main.py ... if __name__ == '__main__': import logging logging.basicConfig( filename = 'app.log', # Log output file level = logging.INFO, # Output level )通常,在程序启动时,日志配置是一次性的(译注:程序启动后无法重新配置)。该配置与日志调用是分开的。
说明日志是可以任意配置的。你可以对日志配置的任何一方面进行调整:如输出文件,级别,消息格式等等,不必担心对使用日志模块的代码造成影响。
练习 练习 8.2:将日志添加到模块中在 fileparse.py 中,有一些与异常有关的错误处理,这些异常是由错误输入引起的。如下所示:
# fileparse.py import csv def parse_csv(lines, select=None, types=None, has_headers=True, delimiter=',', silence_errors=False): ''' Parse a CSV file into a list of records with type conversion. ''' if select and not has_headers: raise RuntimeError('select requires column headers') rows = csv.reader(lines, delimiter=delimiter) # Read the file headers (if any) headers = next(rows) if has_headers else [] # If specific columns have been selected, make indices for filtering and set output columns if select: indices = [ headers.index(colname) for colname in select ] headers = select records = [] for rowno, row in enumerate(rows, 1): if not row: # Skip rows with no data continue # If specific column indices are selected, pick them out if select: row = [ row[index] for index in indices] # Apply type conversion to the row if types: try: row = [func(val) for func, val in zip(types, row)] except ValueError as e: if not silence_errors: print(f"Row {rowno}: Couldn't convert {row}") print(f"Row {rowno}: Reason {e}") continue # Make a dictionary or a tuple if headers: record = dict(zip(headers, row)) else: record = tuple(row) records.append(record) return records请注意发出诊断消息的 print 语句。使用日志操作来替换这些 print 语句相对来说更简单。请像下面这样修改代码:
# fileparse.py import csv import logging log = logging.getLogger(__name__) def parse_csv(lines, select=None, types=None, has_headers=True, delimiter=',', silence_errors=False): ''' Parse a CSV file into a list of records with type conversion. ''' if select and not has_headers: raise RuntimeError('select requires column headers') rows = csv.reader(lines, delimiter=delimiter) # Read the file headers (if any) headers = next(rows) if has_headers else [] # If specific columns have been selected, make indices for filtering and set output columns if select: indices = [ headers.index(colname) for colname in select ] headers = select records = [] for rowno, row in enumerate(rows, 1): if not row: # Skip rows with no data continue # If specific column indices are selected, pick them out if select: row = [ row[index] for index in indices] # Apply type conversion to the row if types: try: row = [func(val) for func, val in zip(types, row)] except ValueError as e: if not silence_errors: log.warning("Row %d: Couldn't convert %s", rowno, row) log.debug("Row %d: Reason %s", rowno, e) continue # Make a dictionary or a tuple if headers: record = dict(zip(headers, row)) else: record = tuple(row) records.append(record) return records