forked from wmww/wayland-debug
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsession.py
More file actions
351 lines (325 loc) · 14.2 KB
/
session.py
File metadata and controls
351 lines (325 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import re
from util import *
import wl
import matcher
help_command_color = '1;37'
def command_format(cmd):
if check_gdb():
return '(gdb) ' + color(help_command_color, 'wl' + cmd)
else:
return '$ ' + color(help_command_color, cmd)
class Command:
def __init__(self, name, arg, func, help_text):
self.name = name
self.arg = arg
self.func = func
self.help = help_text
def matches(self, command):
return self.name.startswith(command.lower())
class Session:
def __init__(self, display_matcher, stop_matcher, output):
assert display_matcher
assert stop_matcher
self.current_connection = None # The connection that is currently being shown
self.connection_explicitly_selected = False # If the current connection was selected by the user
# A list of connections in the order they were created
# Each connection has a name (single letter) and id (string containing pointer address, if coming from libwayland)
self.connection_list = []
# Mapping of open connection ids to connection objects
self.connections = {}
self.commands = [
Command('help', '[COMMAND]', self.help_command,
'Show this help message, or get help for a specific command'),
Command('list', '[CONN:] [MATCHER] [~ COUNT]', self.list_command,
'List messages matching given matcher (or list all messages, if no matcher provided)\n' +
'Prepend "CONN:" to show messages from a different connection than the one currently active\n' +
'Append "~ COUNT" to show at most the last COUNT messages that match\n' +
'See ' + command_format('help matcher') + ' for matcher syntax'),
Command('filter', '[MATCHER]', self.filter_command,
'Show the current output filter matcher, or add a new one\n' +
'See ' + command_format('help matcher') + ' for matcher syntax'),
Command('breakpoint', '[MATCHER]', self.break_point_command,
'Show the current breakpoint matcher, or add a new one\n' +
'Use an inverse matcher (^) to disable existing breakpoints\n' +
'See ' + command_format('help matcher') + ' for matcher syntax'),
Command('matcher', '[MATCHER]', self.matcher_command,
'Parse a matcher, and show it unsimplified'),
Command('connection', '[CONNECTION]', self.connection_command,
'Show Wayland connections, or switch to another connection'),
Command('resume', None, self.resume_command,
'Resume processing events\n' +
'In GDB you can also use the continue gdb command'),
Command('quit', None, self.quit_command,
'Quit the program'),
]
self.is_stopped = False
self.should_quit = False
self.display_matcher = display_matcher
self.stop_matcher = stop_matcher
self.out = output
self.last_shown_timestamp = None
def set_stopped(self, val):
self.is_stopped = val
def stopped(self):
return self.is_stopped
def quit(self):
return self.should_quit
def message(self, connection_id, message):
if message == None:
return
self.is_stopped = False
if not connection_id in self.connections:
self.out.warn('connection_id ' + repr(connection_id) + ' never explicitly created')
self.open_connection(connection_id, None, message.timestamp)
connection = self.connections[connection_id]
connection.message(message)
if connection == self.current_connection:
if self.display_matcher.matches(message):
self._show_message(message)
if self.stop_matcher.matches(message):
self.out.show(color('1;37', ' Stopped at ') + str(message).strip())
self.is_stopped = True
def open_connection(self, connection_id, is_server, time):
# is_server can be none if the value is unknown
self.close_connection(connection_id, time)
if len(self.connection_list) >= 26:
name = str(len(self.connection_list))
else:
name = chr(len(self.connection_list) + ord('A'))
connection = wl.Connection(name, is_server, None, time, self.out)
connection.id = connection_id
# Compositors running nested will open up a client connection first,
# We should switch to the server as that's the one we're likely interested in
if (
not self.current_connection or
(not self.current_connection.is_server and is_server) or
not self.connection_explicitly_selected and (is_server or not self.current_connection.is_server)):
self.current_connection = connection
self.out.show(color('1;32', 'Switching to new ' + connection.description() + ' connection ' + name))
else:
self.out.show(color('1;32', 'New ' + connection.description() + ' connection ' + name))
self.connections[connection_id] = connection
self.connection_list.append(connection)
def close_connection(self, connection_id, time):
if connection_id in self.connections:
connection = self.connections[connection_id]
del self.connections[connection_id]
# Connection will still be in connection list
connection.close(time)
self.out.show(color('1;31', 'Closed ' + connection.description() + ' connection ' + connection.name))
def show_messages(self, connection, matcher, cap=None):
msg = 'Messages that match ' + str(matcher)
if connection != self.current_connection:
msg += ' on connection ' + connection.name
msg += ':'
self.out.show(msg)
if connection == None:
self.out.show(' ╰╴ No connection')
return
matching, matched, didnt_match, not_searched = self._get_matching(connection, matcher, cap)
if not matching:
if not self.connections:
self.out.show(' ╰╴ No messages yet')
else:
assert didnt_match == len(self.messages)
self.out.show(' ╰╴ None of the ' + color('1;31', str(didnt_match)) + ' messages so far')
else:
self.last_shown_timestamp = None
for message in matching:
self._show_message(message)
self.out.show(
'(' +
color(('1;32' if matched > 0 else '37'), str(matched)) + ' matched, ' +
color(('1;31' if didnt_match > 0 else '37'), str(didnt_match)) + ' didn\'t' +
(', ' + color(('37'), str(not_searched)) + ' not checked' if not_searched != 0 else '') +
')')
self.last_shown_timestamp = None
def _show_message(self, message):
delta = message.timestamp - self.last_shown_timestamp if self.last_shown_timestamp != None else 0
if delta > 1.0:
self.out.show(color('37', ' ───┤ {:0.4f}s ├───'.format(delta)))
self.last_shown_timestamp = message.timestamp
message.show(self.out)
def _get_matching(self, connection, matcher, cap=None):
if cap == 0:
cap = None
didnt_match = 0
acc = []
if connection:
messages = connection.messages
else:
messages = []
for message in reversed(messages):
if matcher.matches(message):
acc.append(message)
if cap and len(acc) >= cap:
break
else:
didnt_match += 1
return (reversed(acc), len(acc), didnt_match, len(messages) - len(acc) - didnt_match)
def command(self, command):
assert isinstance(command, str)
command = command.strip()
args = re.split('\s', command, 1)
if len(args) == 0:
return False
cmd = no_color(args[0]).strip()
arg = None if len(args) < 2 else no_color(args[1]).strip()
if cmd == '':
assert not arg
self.out.error('No command specified')
cmd = 'help'
if cmd == 'w' or cmd == 'wl': # in case they use GDB style commands when not in GDB
return self.command(arg)
if cmd.startswith('wl'): # in case they use GDB style commands when not in GDB
cmd = cmd[2:]
cmd = self._get_command(cmd)
if cmd:
self.out.log('Got ' + cmd.name + ' command' + (' with \'' + arg + '\'' if arg else ''))
cmd.func(arg)
def _get_command(self, command):
found = []
for c in self.commands:
if c.name.startswith(command):
found.append(c)
if len(found) == 1:
return found[0]
else:
if len(found) > 1:
self.out.error('\'' + command + '\' could refer to multiple commands: ' + ', '.join(c.name for c in found))
else:
self.out.error('Unknown command \'' + command + '\'')
return None
def help_command(self, arg):
if arg:
if arg.startswith('wl'):
arg = arg[2:].strip()
if arg == 'matcher':
import matcher
matcher.print_help()
return
else:
cmd = self._get_command(arg)
if cmd:
start = command_format(cmd.name) + ': '
body = cmd.help.replace('\n', '\n' + ' ' * len(no_color(start)))
self.out.show(start + body)
return
self.out.show('Usage: ' + command_format('<COMMAND> [ARGUMENT]'))
self.out.show('Commands can be abbreviated (down to just the first unique letter)')
self.out.show('Help with matcher syntax: ' + command_format('help matcher'))
self.out.show('Commands:')
for c in self.commands:
s = c.name
if c.arg:
s += ' ' + c.arg
self.out.show(' ' + command_format(s))
# Old can be None
def parse_and_join(self, new_unparsed, old):
try:
parsed = matcher.parse(new_unparsed)
if old:
return matcher.join(parsed, old).simplify()
else:
return parsed.simplify()
except RuntimeError as e:
self.out.error('Failed to parse "' + new_unparsed + '":\n ' + str(e))
return old
def filter_command(self, arg):
if arg:
self.display_matcher = self.parse_and_join(arg, self.display_matcher)
self.out.show('Only showing messages that match ' + str(self.display_matcher))
else:
self.out.show('Output filter: ' + str(self.display_matcher))
def break_point_command(self, arg):
if arg:
self.stop_matcher = self.parse_and_join(arg, self.stop_matcher)
self.out.show('Breaking on messages that match: ' + str(self.stop_matcher))
else:
self.out.show('Breakpoint matcher: ' + str(self.stop_matcher))
def matcher_command(self, arg):
if arg:
try:
parsed = matcher.parse(arg)
unsimplified_str = str(parsed)
self.out.show('Unsimplified: ' + unsimplified_str)
self.out.show(' Simplified: ' + str(parsed.simplify()))
self.out.show(' Reparsed: ' + str(matcher.parse(unsimplified_str).simplify()))
except RuntimeError as e:
self.out.error('Failed to parse "' + arg + '":\n ' + str(e))
else:
self.out.show('No matcher to parse')
def list_command(self, arg):
cap = None
connection = self.current_connection
if arg:
args = arg.split('~')
if len(args) == 2:
try:
cap = int(args[1])
except ValueError:
self.out.error('Expected number after \'~\', got \'' + args[1] + '\'')
return
arg = args[0]
args = arg.split(':')
if len(args) == 2:
c = self._get_connection(args[0])
if c == None:
self.out.error('"' + args[0] + '" does not name a connection')
return
connection = c
arg = args[1]
else:
arg = args[0]
m = self.parse_and_join(arg, None)
if not m:
return
else:
m = matcher.always
self.show_messages(connection, m, cap)
def _get_connection(self, name):
name = name.lower()
found = None
for c in self.connection_list:
l = [c.name, c.id, c.title]
for i in l:
if i and name == i.lower():
return c
return None
def connection_command(self, arg):
if arg:
c = self._get_connection(arg)
if c:
self.current_connection = c
self.out.show('Switched to connection ' + color('1;37', self.current_connection.name))
self.connection_explicitly_selected = True
return
else:
self.out.error('"' + arg + '" does not name a connection')
for c in self.connection_list:
delim = ', '
if c == self.current_connection:
clr = '1;37'
line = ' => '
else:
clr = '37'
line = ' '
line += c.name + ' (' + c.description() + '): '
line = color(clr, line)
if c.id:
line += color('35', '"' + (c.id) + '"') + delim
if c.open:
line += color('1;32', 'open')
else:
line += color('1;31', 'closed')
line += delim
line += color('1;34', str(len(c.messages))) + ' messages'
self.out.show(line)
def resume_command(self, arg):
self.is_stopped = False
self.out.log('Resuming...')
def quit_command(self, arg):
self.should_quit = True
if __name__ == '__main__':
print('File meant to be imported, not run')
exit(1)