import reimport datetimelogline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] \"GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" \"Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''pattern = '''(?P[\d\.]{7,}) - - \[(?P [^\[\]]+)\] "(?P [^"]+)" (?P \d+) (?P \d+) "([^"]+)" "(?P [^"]+)'''regex = re.compile(pattern)def extract(line): matcher = regex.match(line) if matcher: return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}ops = { 'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'request': lambda request: dict(zip(('method', 'url', 'protocol'), request.split()))}# for k , v in extract(logline).items():# d[k] = ops[k].get(k, lambda x:x)(v)# {k:ops[k].get(k, lambda x:x)(v) for k , v in matcher.groupdict().items() }'''滑动窗口: 数据载入: 对于项目来说,数据就是日志的一行行记录,载入数据就是文件IO的读取,将 获取数据的方法封装成函数'''def load(path:str): ''' 装载日志文件 逐行读取文件 ''' with open(path) as f: for line in f: field = extract(line) if field: yield field #yield 是真好使 else: continue #记录完每一条就continue #就变成了生成器函数,调用返回的是生成器对象 然后就需要next迭代''' 时间窗口分析: 概念: 很多数据,都是跟时间有关,比如日志,都是按照时间顺序来产生的。 分析数据,要按照时间求值 interval 表示每一次求值的时间间隔 width 时间窗口宽度,指每一次求值的时间窗口宽度 当 width > interval 会出现重叠现象 当 width = interval 没有重叠现象 当 width < interval 数据会丢失。不采纳 时序数据: 日志、监控等产生的数据都是与时间相关的数据, 按照时间先后产生并记录下来的数据,一般按照时间对数据进行分析 数据分析基本程序解构: 无限的生成随机数函数,产生时间相关的数据,返回时间+随机数 每次取3个数据,求平均值 例子: import random import datetime import time def source(): while source(): yield {'value':random.randint(1,100), 'datetime':datetime.datetime.now()} time.sleep(1) #获取数据 s = source() items = [next(s) for _ in range(3)] #处理函数 def handler(iterable): vals = [x['value'] for x in iterable] return sum(vals) / len(vals) print(items) print("{:.2f}".format(handle(items))) 模拟一段时间内产生的数据,等一段固定的时间取数据来计算平均值 '''import randomimport timeimport datetimedef source(): while source(): #随机值和时间 kv键对应 yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}def window(src, handler, width:int, interval:int): ''' 窗口函数 :param src: 数据源,生成器,来取数据的 :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒 :return: '''# 迭代最上面的load(载入文件)函数 是个生成器 迭代出来的数据为字典,然后拿字段处理 start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800', "%Y/%m/%d %H:%M:%S %z") current = datetime.datetime.strptime('1970/01/01 01:01:01 +0800', "%Y/%m/%d %H:%M:%S %z") delta = datetime.timedelta(seconds=width - interval) # 重叠数据 buffer = [] #缓冲区 待处理的数据 for x in src:#迭代一批日志数据 if x:#存入临时缓存等待计算 buffer.append(x)#追加数据 current = x['datetime'] #指日志中数据的时间指针的位置 一直在修改时间 if (current - start).total_seconds() >= interval: ret = handler(buffer) print(ret) start = current #将时间间隔(interval)赋值给start buffer = [x for x in buffer if x['datetime'] > current - delta]#buffer处理def donothing_handler(iterable:list): print(iterable) return iterable#一批批数据进行处理def handler(iterable:list): #字典处理 return sum(iterable) // len(iterable)window(load('c:/test.log'), donothing_handler, 10, 5)#载入日志文件 width10 interval5