WSGI(Web Server Gateway Interface),简单来说就是Python Web服务网关接口,是实现Python Web服务的统一接口,如Flask, Django都是基于该接口实现的框架,从wsgi出发一步步了解框架的构建过程。
首先,Python原生为我们提供了wsgi的实现,WSGI的实现非常简单:
1 | def application(environ, start_response): |
2 | start_response('200 OK',[("Content-type","text/plain")]) |
3 | return ["hello world".encode()] |
- environ: 一个包含本地环境变量和http请求的dict对象
- start_response: 一个发送http响应的函数
简单的echo服务
1 | from wsgiref.simple_server import make_server |
2 |
|
3 | def application(environ, start_response): |
4 | start_response('200 OK',[("Content-type","text/plain")]) |
5 | return ["hello world".encode()] |
6 |
|
7 | if __name__ == "__main__": |
8 | server = make_server('0.0.0.0',8000,application) |
9 | try: |
10 | server.serve_forever() |
11 | except KeyboardInterrupt: |
12 | server.shutdown() |
params解析
1 | from wsgiref.simple_server import make_server |
2 | from urllib.parse import parse_qs |
3 |
|
4 | def application(environ, start_response): |
5 | params = parse_qs(environ["QUERY_STRING"]) |
6 | name = params.get('name', ['anonymous'])[0] |
7 | start_response('200 OK',[("Content-type","text/plain")]) |
8 | return ["hello {}".format(name).encode()] |
9 |
|
10 | if __name__ == "__main__": |
11 | server = make_server('0.0.0.0',8000,application) |
12 | try: |
13 | server.serve_forever() |
14 | except KeyboardInterrupt: |
15 | server.shutdown() |
1 | from wsgiref.simple_server import make_server |
2 | from urllib.parse import parse_qs |
3 |
|
4 | def application(environ, start_response): |
5 | request_body = environ.get('wsgi.input').read(int(environ.get('CONTENT_LENGTH',0))) |
6 | params = parse_qs(request_body) |
7 | name = params.get(b'name', ['anonymous'])[0] |
8 | start_response('200 OK',[("Content-type","text/plain")]) |
9 | return ["hello {}".format(name.decode()).encode()] |
10 |
|
11 | if __name__ == "__main__": |
12 | server = make_server('0.0.0.0',8000,application) |
13 | try: |
14 | server.serve_forever() |
15 | except KeyboardInterrupt: |
16 | server.shutdown() |
使用webob对象化wsgi的environ
1 | from wsgiref.simple_server import make_server |
2 | import webob |
3 |
|
4 | def application(environ, start_response): |
5 | request = webob.Request(environ) |
6 | name = request.params.get("name","anoymous") |
7 | response = webob.Response() |
8 | response.text = u'hello {}'.format(name) |
9 | response.status_code = 200 |
10 | response.content_type = 'text/plain' |
11 | return response(environ, start_response) |
12 |
|
13 | if __name__ == "__main__": |
14 | server = make_server('0.0.0.0',8000,application) |
15 | try: |
16 | server.serve_forever() |
17 | except KeyboardInterrupt: |
18 | server.shutdown() |
1 | from wsgiref.simple_server import make_server |
2 | import webob |
3 | from webob.dec import wsgify |
4 |
|
5 | @wsgify |
6 | def application(request: webob.Request) -> webob.Response: |
7 | name = request.params.get("name","anoymous") |
8 | response = webob.Response() |
9 | response.text = u'hello {}'.format(name) |
10 | response.status_code = 200 |
11 | response.content_type = 'text/plain' |
12 | return response |
13 | |
14 | if __name__ == "__main__": |
15 | server = make_server('0.0.0.0',8000,application) |
16 | try: |
17 | server.serve_forever() |
18 | except KeyboardInterrupt: |
19 | server.shutdown() |
1 | from webob import Request |
2 | from functools import wraps |
3 |
|
4 | def Wsgify(fn): |
5 | @wraps(fn) |
6 | def wrap(environ, start_response): |
7 | request = Request(environ) |
8 | response = fn(request) |
9 | return response(environ, start_response) |
10 | return wrap |
11 | ``` |
12 |
|
13 |
|
14 |
|
15 | 一个web框架最重要的一部分是路由,所以学习框架的时候,只需要清楚如何路由基本都可以上手 |
16 |
|
17 | - 静态路由 |
18 |
|
19 | ```python |
20 | from wsgiref.simple_server import make_server |
21 | import webob |
22 | from webob.dec import wsgify |
23 |
|
24 | def hello(request: webob.Request) -> webob.Response: |
25 | name = request.params.get('name', 'anonymous') |
26 | response = webob.Response() |
27 | response.text = u'hello {}'.format(name) |
28 | response.status_code = 200 |
29 | response.content_type = 'text/plain' |
30 | return response |
31 |
|
32 | def index(request: webob.Request) -> webob.Response: |
33 | return webob.Response(body='hello world', content_type='text/plain') |
34 |
|
35 | router = { |
36 | '/hello': hello, |
37 | '/': index} |
38 |
|
39 | @wsgify |
40 | def application(request: webob.Request) -> webob.Response: |
41 | return router.get(request.path,index)(request) |
42 |
|
43 | if __name__ == "__main__": |
44 | server = make_server('0.0.0.0',8000,application) |
45 | try: |
46 | server.serve_forever() |
47 | except KeyboardInterrupt: |
48 | server.shutdown() |
动态路由
1 | from wsgiref.simple_server import make_server |
2 | import webob |
3 | from webob.dec import wsgify |
4 |
|
5 | def hello(request: webob.Request) -> webob.Response: |
6 | name = request.params.get('name', 'anonymous') |
7 | response = webob.Response() |
8 | response.text = u'hello {}'.format(name) |
9 | response.status_code = 200 |
10 | response.content_type = 'text/plain' |
11 | return response |
12 |
|
13 | def index(request: webob.Request) -> webob.Response: |
14 | return webob.Response(body='hello world', content_type='text/plain') |
15 |
|
16 | class Application: |
17 | ROUTER = {} |
18 |
|
19 | @classmethod |
20 | def register(cls, path, view): |
21 | cls.ROUTER[path] = view |
22 |
|
23 | def default_view(self, request: webob.Request) -> webob.Response: |
24 | |
25 | raise webob.exc.HTTPNotFound() |
26 |
|
27 | @wsgify |
28 | def __call__(self, request: webob.Request) -> webob.Response: |
29 | return self.ROUTER.get(request.path, self.default_view)(request) |
30 |
|
31 | if __name__ == "__main__": |
32 | Application.register('/hello', hello) |
33 | Application.register('/', index) |
34 | server = make_server('0.0.0.0',8000,Application()) |
35 | try: |
36 | server.serve_forever() |
37 | except KeyboardInterrupt: |
38 | server.shutdown() |
1 | from wsgiref.simple_server import make_server |
2 | import webob |
3 | from webob.dec import wsgify |
4 | class Application: |
5 | ROUTER = {} |
6 | @classmethod |
7 | def register(cls, path): |
8 | def wrap(view) |
9 | cls.ROUTER[path] = view |
10 | return view |
11 | return wrap |
12 | |
13 | def default_view(self, request: webob.Request) -> webob.Response: |
14 | |
15 | raise webob.exc.HTTPNotFound() |
16 |
|
17 | @wsgify |
18 | def __call__(self, request: webob.Request) -> webob.Response: |
19 | return self.ROUTER.get(request.path, self.default_view)(request) |
20 |
|
21 | @Application.register('/hello') |
22 | def hello(request: webob.Request) -> webob.Response: |
23 | name = request.params.get('name', 'anonymous') |
24 | response = webob.Response() |
25 | response.text = u'hello {}'.format(name) |
26 | response.status_code = 200 |
27 | response.content_type = 'text/plain' |
28 | return response |
29 |
|
30 | @Application.register('/') |
31 | def index(request: webob.Request) -> webob.Response: |
32 | return webob.Response(body='hello world', content_type='text/plain') |
33 |
|
34 | if __name__ == "__main__": |
35 | server = make_server('0.0.0.0',8000,Application()) |
36 | try: |
37 | server.serve_forever() |
38 | except KeyboardInterrupt: |
39 | server.shutdown() |