对于时序数据的处理,会涉及到window窗口的概念,就比如每几秒(秒)返回前几秒(window)处理的的result,就如每1分钟返回这一分钟nginx日志的请求url的热区数据,为了学习这些,我自己先写个简单的用例,一步步加深。
- 返回每5秒前10秒钟产生数据的平均值
1 | import datetime |
2 | import random |
3 | import time |
4 | |
5 | |
6 | def window(source ,handler, interval: int,width: int): |
7 | store = [] |
8 | startime = datetime.datetime.now() |
9 | while True: |
10 | data = next(source) |
11 | if data: |
12 | store.append(data) |
13 | currentime = data['time'] |
14 | if (currentime - startime).total_seconds() >= interval: #对比时间间隔是否符合interval |
15 | startime = currentime |
16 | dt = currentime - datetime.timedelta(seconds=width) # 设置窗口大小 |
17 | store = [x for x in store if x['time'] > dt] #保留下符合窗口大小的数据,width >= interval |
18 | handler(store) |
19 | |
20 | |
21 | def source(): |
22 | while True |
23 | data = {'time':datetime.datetime.now(),'value': random.randint(0,100)} |
24 | yield data |
25 | time.sleep(1) |
26 | |
27 | |
28 | def handler(items): |
29 | values = [item['value'] for item in items] |
30 | print('随机数平均值:',round(sum(values)/len(values),2)) |
31 | |
32 | def main(interval,width): |
33 | data = source() |
34 | window(data,handler,interval,width) |
35 | |
36 | if __name__ == "__main__": |
37 | main(5,10) |
以上的代码我们只有一个处理进程在工作,如果我们需要进行多处理进程工作,我们就需要涉及分发的概念。
从单一的 source -> worker 到 source -> 分发器 -> 多个worker
- 分发器实践
1 | import datetime |
2 | import random |
3 | import time |
4 | import queue |
5 | import multiprocessing |
6 | from functools import wraps |
7 | |
8 | # 进程间队列 |
9 | analyers = queue.Queue() |
10 | |
11 | # 数据生产 |
12 | def source(): |
13 | while True: |
14 | data = {'time':datetime.datetime.now(),'value': random.randint(0,100)} |
15 | yield data |
16 | time.sleep(1) |
17 | |
18 | data = source() |
19 | |
20 | # 进程队列装饰器 |
21 | def dispatcher(data,interval,width): |
22 | def register(fn): |
23 |
|
24 | def wrap(*args,**kwargs): |
25 | p = multiprocessing.Process(target=window,args=(data,fn,interval,width)) |
26 | analyers.put(p) |
27 | return wrap |
28 | return register |
29 | |
30 | # 队列获取,子进程启动 |
31 | def start(): |
32 | for _ in range(analyers.qsize()): |
33 | analyers.get_nowait().start() |
34 | |
35 | |
36 | # 窗口函数 |
37 | def window(source ,handler, interval: int,width: int): |
38 | store = [] |
39 | startime = datetime.datetime.now() |
40 | while True: |
41 | data = next(source) |
42 | if data: |
43 | store.append(data) |
44 | currentime = data['time'] |
45 | if (currentime - startime).total_seconds() >= interval: #对比时间间隔是否符合interval |
46 | startime = currentime |
47 | dt = currentime - datetime.timedelta(seconds=width) |
48 | store = [x for x in store if x['time'] > dt] #保留下符合窗口大小的数据,width >= interval |
49 | handler(store) |
50 | |
51 | |
52 |
|
53 | def handler(*args,**kwargs): |
54 | values = [item['value'] for item in args[0]] |
55 | print('随机数平均值:',round(sum(values)/len(values),2)) |
56 | |
57 | |
58 |
|
59 | def handler1(*args,**kwargs): |
60 | values = [item['value'] for item in args[0]] |
61 | print('随机数总和:',sum(values)) |
62 | |
63 | if __name__ == "__main__": |
64 | handler() |
65 | handler1() |
66 | start() |