Jusene's Blog

python 窗口分发

字数统计: 605阅读时长: 3 min
2018/05/20 Share

对于时序数据的处理,会涉及到window窗口的概念,就比如每几秒(秒)返回前几秒(window)处理的的result,就如每1分钟返回这一分钟nginx日志的请求url的热区数据,为了学习这些,我自己先写个简单的用例,一步步加深。

  • 返回每5秒前10秒钟产生数据的平均值
1
import datetime
2
import random
3
import time
4
5
6
def window(source ,handler, interval: int,width: int):
7
    store = []
8
    startime = datetime.datetime.now()
9
    while True:
10
        data = next(source)   
11
        if data:
12
            store.append(data)
13
            currentime = data['time']
14
        if (currentime - startime).total_seconds() >= interval:   #对比时间间隔是否符合interval
15
            startime = currentime
16
            dt = currentime - datetime.timedelta(seconds=width)     # 设置窗口大小
17
            store = [x  for x in store if x['time'] > dt]           #保留下符合窗口大小的数据,width >= interval
18
            handler(store)
19
20
21
def source():
22
    while True
23
        data = {'time':datetime.datetime.now(),'value': random.randint(0,100)}
24
        yield data
25
        time.sleep(1)
26
27
28
def handler(items):
29
    values = [item['value'] for item in items]
30
    print('随机数平均值:',round(sum(values)/len(values),2))
31
32
def main(interval,width):
33
    data = source()
34
    window(data,handler,interval,width)
35
36
if __name__ == "__main__":
37
    main(5,10)

以上的代码我们只有一个处理进程在工作,如果我们需要进行多处理进程工作,我们就需要涉及分发的概念。

从单一的 source -> worker 到 source -> 分发器 -> 多个worker

  • 分发器实践
1
import datetime
2
import random
3
import time
4
import queue
5
import multiprocessing
6
from functools import wraps
7
8
# 进程间队列
9
analyers = queue.Queue()
10
11
# 数据生产
12
def source():
13
    while True:
14
        data = {'time':datetime.datetime.now(),'value': random.randint(0,100)}
15
        yield data
16
        time.sleep(1)
17
18
data = source()
19
20
# 进程队列装饰器
21
def dispatcher(data,interval,width):
22
    def register(fn):
23
        @wraps(fn)
24
        def wrap(*args,**kwargs):
25
            p = multiprocessing.Process(target=window,args=(data,fn,interval,width))
26
            analyers.put(p)
27
        return wrap
28
    return register
29
30
# 队列获取,子进程启动
31
def start():
32
    for _ in range(analyers.qsize()):
33
        analyers.get_nowait().start()
34
35
36
# 窗口函数
37
def window(source ,handler, interval: int,width: int):
38
    store = []
39
    startime = datetime.datetime.now()
40
    while True:
41
        data = next(source)   
42
        if data:
43
            store.append(data)
44
            currentime = data['time']
45
        if (currentime - startime).total_seconds() >= interval:   #对比时间间隔是否符合interval
46
            startime = currentime
47
            dt = currentime - datetime.timedelta(seconds=width)  
48
            store = [x  for x in store if x['time'] > dt]           #保留下符合窗口大小的数据,width >= interval
49
            handler(store)
50
51
52
@dispatcher(data,5,10)
53
def handler(*args,**kwargs):
54
    values = [item['value'] for item in args[0]]
55
    print('随机数平均值:',round(sum(values)/len(values),2))
56
57
58
@dispatcher(data,5,10)
59
def handler1(*args,**kwargs):
60
    values = [item['value'] for item in args[0]]
61
    print('随机数总和:',sum(values))
62
63
if __name__ == "__main__":
64
    handler()
65
    handler1()
66
    start()
CATALOG