日志文件收集方案有很多种,即便没有使用过,也一定听说过Logstash,不过本文并不是要介绍如何使用Logstash进行日志收集,而是尝试使用python实现一个简易的文件日志收集程序。
程序需要考虑两个技术点
程序要能监控一个文件,当这个文件有新增数据时,及时发现,并进行收集。
当文件有变化,打开文件后,必须准确的找到上一次读取数据的位置,从这个位置读取新的数据
关于第一点,实现起来并不难,程序启动后,可以第一时间获取文件的最后修改时间,而后轮询获取文件的最后修改时间,如果该时间有变化,就说明文件有新变化,那么就可以打开文件,读取新写入的内容。
接下来要考虑的才是程序的核心部分,如何在打开后,准确的找到文件的新增数据。假设上一次收集到文件的末尾,文件一共有100行,1秒钟以后,又新写入了10行,程序打开文件后,必须准确的定位到第100行,然后从101行开始读取数据。
每一次收集结束后,都必须记录文件被读取到了哪里,可以使用file对象的tell方法,它可以返回当前文件指针相对于文件开始位置的偏移量,把这个偏移量记做checkpoint。下一次打开时,则可以使用seek方法,迅速的将文件指针偏移到checkpoint,上一次读取结束的位置,从这里继续读取文件。
import os
import time
def get_tail_checkpoint(filename):
with open('./data/data.txt', encoding='utf-8')as f:
f.seek(0, 2)
checkpoint = f.tell()
return checkpoint, os.stat(filename).st_mtime
def file_monitor(filename):
checkpoint, last_modify_time = get_tail_checkpoint(filename)
while True:
time.sleep(1)
modify_time = os.stat(filename).st_mtime
if modify_time <= last_modify_time:
continue
last_modify_time = modify_time
with open('./data/data.txt', encoding='utf-8')as f:
f.seek(checkpoint)
line = f.readline()
while line:
if line != '\n': # 单纯一个换行,不输出
print(line)
line = f.readline()
checkpoint = f.tell()
if __name__ == '__main__':
filename = './data/data.txt'
file_monitor(filename)
file_monitor 执行后首先调用get_tail_checkpoint获取文件的偏移量和最后修改时间,seek函数有两个参数,offset和whence,offset是偏移量,而whence则决定从哪里开始计算偏移量,whence默认为0,表示从文件头开始,1表示从当前位置,2表示从文件末尾,因此seek(0, 2) 返回的就是文件末尾处相较于文件开头的偏移量。
函数使用一个while循环,每隔1秒钟就重新获取文件的最后修改时间,如果文件修改时间有变化,则说明写入了新数据,则打开文件,偏移值checkpoint读取数据,并在读取结束后更新checkpoint。
本文的实现,非常简易,只是作为一种技术探索,属实验性质,并没有在极端条件比如高速写入情况进行测试,不能够保证在生产环境下的可行性。
QQ交流群: 211426309