-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathpoll.py
More file actions
73 lines (54 loc) · 1.6 KB
/
poll.py
File metadata and controls
73 lines (54 loc) · 1.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/python
# -*- coding=utf-8 -*-
__author__ = ['"wuyadong" <wuyadong311521@gmail.com>']
import select
class Poll(object):
READ = 0x01
WRITE = 0x02
ERROR = 0x04
def poll(self, timeout):
raise NotImplementedError
def register(self, fd, events):
raise NotImplementedError
def unregister(self, fd):
raise NotImplementedError
def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
def close(self):
raise NotImplementedError
class SelectPoll(Poll):
def __init__(self):
self._read_fds = set()
self._write_fds = set()
self._error_fds = set()
def poll(self, timeout):
readable, writeable, errors = select.select(self._read_fds,
self._write_fds,
self._error_fds,
timeout)
events = {}
for fd in readable:
events[fd] = events.get(fd, 0) | SelectPoll.READ
for fd in writeable:
events[fd] = events.get(fd, 0) | SelectPoll.WRITE
for fd in errors:
events[fd] = events.get(fd, 0) | SelectPoll.ERROR
return events.items()
def register(self, fd, events):
if fd in self._read_fds or fd in self._write_fds \
or fd in self._error_fds:
raise IOError("fd already registered")
if events & SelectPoll.READ:
self._read_fds.add(fd)
if events & SelectPoll.WRITE:
self._write_fds.add(fd)
if events & SelectPoll.ERROR:
self._error_fds.add(fd)
self._read_fds.add(fd)
def unregister(self, fd):
self._read_fds.discard(fd)
self._write_fds.discard(fd)
self._error_fds.discard(fd)
def close(self):
pass