#!/usr/bin/python # # USB monitor parser and formatter # # Copyright 2009 Laurent Pinchart # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import binascii import errno import getopt import struct import sys import os.path def printable(c): if ord(c) >= 0x20: return c else: return '.' def format_buffer(data): buffer = [] for i in range((len(data)+7)/8): line = data[i*8:(i+1)*8] ascii = [printable(c) for c in line] line = '%s %s %s%s' % (binascii.b2a_hex(line[0:4]), binascii.b2a_hex(line[4:8]), ' ' * (16-2*len(line)), ''.join(ascii)) buffer.append(line) return buffer class StandardPrinter: def __init__(self, mode): self.mode = mode def request(self, setup): request = '%02x' wValue = '%04x' % setup.wValue wIndex = '%04x' % setup.wIndex if setup.recipient > URBSetup.RCPT_ENDPOINT: pass elif setup.bRequest == 0: request = 'GET_STATUS (%02x)' elif setup.bRequest == 1: request = 'CLEAR_FEATURE (%02x)' if setup.recipient == 2 and setup.wValue == 0: wValue = 'ENDPOINT_HALT (0000)' if setup.wIndex & 0x0080: wIndex = 'EP %u IN (%04x)' % (setup.wIndex & 0x000f, setup.wIndex) else: wIndex = 'EP %u OUT (%04x)' % (setup.wIndex & 0x000f, setup.wIndex) elif setup.recipient == 0 and setup.wValue == 1: wValue = 'DEVICE_REMOTE_WAKEUP (0001)' elif setup.recipient == 0 and setup.wValue == 2: wValue = 'TEST_MODE (0002)' elif setup.bRequest == 2: request = 'Reserved (%02x)' elif setup.bRequest == 3: request = 'SET_FEATURE (%02x)' elif setup.bRequest == 4: request = 'Reserved (%02x)' elif setup.bRequest == 5: request = 'SET_ADDRESS (%02x)' elif setup.bRequest == 6: request = 'GET_DESCRIPTOR (%02x)' try: dType = URBSetup.DescriptorTypes[setup.wValue >> 8] except: dType = 'Unknown (%u)' % (setup.wValue >> 8) dIndex = setup.wValue & 0xff wValue = '%s %d (%04x)' % (dType, dIndex, setup.wValue) elif setup.bRequest == 7: request = 'SET_DESCRIPTOR (%02x)' elif setup.bRequest == 8: request = 'GET_CONFIGURATION (%02x)' elif setup.bRequest == 9: request = 'SET_CONFIGURATION (%02x)' elif setup.bRequest == 10: request = 'GET_INTERFACE (%02x)' elif setup.bRequest == 11: request = 'SET_INTERFACE (%02x)' elif setup.bRequest == 12: request = 'SYNCH_FRAME (%02x)' else: request = 'Unknown request %02x' return request, wValue, wIndex def _setup_text(self, setup): return 'SETUP %02x %02x %04x %04x %04x' % \ (setup.bmRequestType, setup.bRequest, setup.wValue, setup.wIndex, setup.wLength) def _setup_html(self, setup): if setup.direction: dir = 'Device-to-host' else: dir = 'Host-to-device' type = URBSetup.RequestTypes[setup.type] try: recipientName = URBSetup.RequestRecipients[setup.recipient] except KeyError, e: recipientName = 'Reserved (%u)' % setup.recipient request = '%02x' wValue = '%04x' % setup.wValue wIndex = '%04x' % setup.wIndex wLength = '%04x' % setup.wLength request, wValue, wIndex = self.request(setup) bmRequestType = '%s %s request to %s' % (dir, type, recipientName) html = [] html.append('SETUP %s' % bmRequestType) html.append('bRequest: %s' % (request % setup.bRequest)) html.append('wValue: %s' % wValue) html.append('wIndex: %s' % wIndex) html.append('wLength: %s' % wLength) return '\n'.join(['

%s

' % p for p in html]) def setup(self, setup): if self.mode == 'html': return self._setup_html(setup) else: return self._setup_text(setup) def _bulk_text(self, data, dir): return format_buffer(data) def _bulk_html(self, data, dir): return '
%s
' % '\n'.join(format_buffer(data)) def bulk(self, data, dir): if self.mode == 'html': return self._bulk_html(data, dir) else: return self._bulk_text(data, dir) def _control_text(self, setup, data): return format_buffer(data) def _control_html(self, setup, data): return '
%s
' % '\n'.join(format_buffer(data)) def control(self, setup, data): if self.mode == 'html': return self._control_html(setup, data) else: return self._control_text(setup, data) def _urb_html(self, urb): timestamp = '%.06f' % urb.timestamp event = URB.events[urb.event] dir = URB.directions[urb.dir] type = URB.types[urb.type] info = ['%s (%s)' % (urb.index, urb.id), timestamp, event, urb.addr, type, dir, urb.endp] urbdata = '' if urb.setup: urbdata += self.setup(urb.setup) if urb.length > 0 or urb.setup is None: data = [] if urb.setup: data.append(' ') if urb.event == 'S' or urb.status is None: data.append('%u data bytes' % urb.length) elif urb.status != 0: status = errno.errorcode[-urb.status] data.append('%u data bytes (status %s)' % (urb.length, status)) else: data.append('%u data bytes' % urb.length) if urb.data: if urb.type == 'B': data.append(self.bulk(urb.data, urb.dir)) elif urb.type == 'C': data.append(self.control(urb.setup, urb.data)) else: data.append('
%s
' % '\n'.join(format_buffer(urb.data))) urbdata += '\n'.join(['

%s

' % p for p in data]) info.append(urbdata) html = ''.join(['%s' % inf for inf in info]) return html def _urb_text(self, urb): timestamp = '%.06f' % urb.timestamp event = URB.events[urb.event] dir = URB.directions[urb.dir] type = URB.types[urb.type] typedir = '%s %s' % (type, dir) info = '' if urb.setup: info += '\n\t\t\t%s' % self.setup(urb.setup) if urb.length > 0: data = [] if urb.event == 'S' or urb.status is None: data.append('%u data bytes' % urb.length) else: data.append('%u data bytes (%d)' % (urb.length, urb.status)) if urb.data: if urb.type == 'B': data.extend(self.bulk(urb.data, urb.dir)) elif urb.type == 'C': data.extend(self.control(urb.setup, urb.data)) else: data.extend(format_buffer(urb.data)) info = ''.join(['\n\t\t\t%s' % p for p in data]) + '\n' urb = '%u\t%s\t%s\t%s%s\t%u/EP%u%s' % (urb.index, timestamp, event, typedir, ' '*(10-len(typedir)), urb.addr, urb.endp, info) return urb def urb(self, urb): if self.mode == 'html': return self._urb_html(urb) else: return self._urb_text(urb) class CCIDPrinter(StandardPrinter): SET_PARAMETERS = 0x61 ICC_POWER_ON = 0x62 ICC_POWER_OFF = 0x63 GET_SLOT_STATUS = 0x65 SECURE = 0x69 T0APDU = 0x6a ESCAPE_OUT = 0x6b GET_PARAMETERS = 0x6c RESET_PARAMETERS = 0x6d ICC_CLOCK = 0x6e XFR_BLOCK = 0x6f MECHANICAL = 0x71 ABORT = 0x72 SET_DATA_RATE = 0x73 DATA_BLOCK = 0x80 SLOT_STATUS = 0x81 PARAMETERS = 0x82 ESCAPE_IN = 0x83 DATA_RATE = 0x84 BulkOutCommands = { SET_PARAMETERS : 'Set Parameters', ICC_POWER_ON : 'ICC Power On', ICC_POWER_OFF : 'Icc Power Off', GET_SLOT_STATUS : 'Get Slot Status', SECURE : 'Secure', T0APDU : 'T0APDU', ESCAPE_OUT : 'Escape', GET_PARAMETERS : 'Get Parameters', RESET_PARAMETERS : 'Reset Parameters', ICC_CLOCK : 'ICC Clock', XFR_BLOCK : 'Transfer Block', MECHANICAL : 'Mechanical', ABORT : 'Abort' } BulkInCommands = { SET_DATA_RATE : 'Set Data Rate', DATA_BLOCK : 'Data Block', SLOT_STATUS : 'Slot Status', PARAMETERS : 'Parameters', ESCAPE_IN : 'Escape', DATA_RATE : 'Data Rate' } def __init__(self, mode = 'text'): StandardPrinter.__init__(self, mode) def _bulk_out_text(self, data): pass def _bulk_out_html(self, data): bMessageType, dwLength, bSlot, bSeq, bMessage = \ struct.unpack(' 10: html.append('
%s
' % '\n'.join(format_buffer(data[10:]))) return '\n'.join(['

%s

' % p for p in html]) def _bulk_in_text(self, data): pass def _bulk_in_html(self, data): bMessageType, dwLength, bSlot, bSeq, bMessage = \ struct.unpack(' 10: html.append('
%s
' % '\n'.join(format_buffer(data[10:]))) return '\n'.join(['

%s

' % p for p in html]) def bulk(self, data, dir): if dir == 'i': if self.mode == 'html': return self._bulk_in_html(data) elif self.mode == 'text': return self._bulk_in_text(data) else: if self.mode == 'html': return self._bulk_out_html(data) elif self.mode == 'text': return self._bulk_out_text(data) class UVCPrinter(StandardPrinter): SET_CUR = 0x01 GET_CUR = 0x81 GET_MIN = 0x82 GET_MAX = 0x83 GET_RES = 0x84 GET_LEN = 0x85 GET_INFO = 0x86 GET_DEF = 0x87 Requests = { SET_CUR : 'SET CUR', GET_CUR : 'GET CUR', GET_MIN : 'GET MIN', GET_MAX : 'GET MAX', GET_RES : 'GET RES', GET_LEN : 'GET LEN', GET_INFO : 'GET INFO', GET_DEF : 'GET DEF' } def __init__(self, mode = 'text'): StandardPrinter.__init__(self, mode) def request(self, setup): if setup.type != URBSetup.REQ_CLASS: return StandardPrinter.request(self, setup) request = '%02x' wValue = '%04x' % setup.wValue wIndex = '%04x' % setup.wIndex if setup.recipient != URBSetup.RCPT_INTERFACE: return request, wValue, wIndex try: request = UVCPrinter.Requests[setup.bRequest] request = request + ' (%02x)' except KeyError, e: request = 'Unknown (%02x)' entity = setup.wIndex >> 8 interface = setup.wIndex & 0xff wIndex = 'INTF %u ENTITY %u (%04x)' % (interface, entity, setup.wIndex) return request, wValue, wIndex def _control_text(self, setup, data): return format_buffer(data) def _control_html(self, setup, data): # In order to perform proper decoding we need a copy of the USB # descriptors to find out which control the request is targetted # at. As a quick hack, decode 26 bytes or 32 bytes requests as # video probe/commit. if len(data) == 26: data = struct.unpack('%s' % '\n'.join(format_buffer(data)) html = [] html.append('bmHint 0x%02x' % data[0]) html.append('bFormatIndex %9u' % data[1]) html.append('bFrameIndex %9u' % data[2]) html.append('dwFrameInterval %9u' % data[3]) html.append('wKeyFrameRate %9u' % data[4]) html.append('wPFrameRate %9u' % data[5]) html.append('wCompQuality %9u' % data[6]) html.append('wCompWindowSize %9u' % data[7]) html.append('wDelay %9u' % data[8]) html.append('dwMaxVideoFrameSize %9u' % data[9]) html.append('dwMaxPayloadTransferSize %9u' % data[10]) if len(data) > 11: html.append('dwClockFrequency %9u' % data[11]) html.append('bmFramingInfo 0x%02x' % data[12]) html.append('bPreferredVersion %9u' % data[13]) html.append('bMinVersion %9u' % data[14]) html.append('bMaxVersion %9u' % data[15]) return '
%s
' % '\n'.join(html) class URBSetup: DescriptorTypes = { 1 : 'DEVICE', 2 : 'CONFIGURATION', 3 : 'STRING', 4 : 'INTERFACE', 5 : 'ENDPOINT', 6 : 'DEVICE_QUALIFIER', 7 : 'OTHER_SPEED_CONFIGURATION', 8 : 'INTERFACE_POWER' } REQ_STANDARD = 0 REQ_CLASS = 1 REQ_VENDOR = 2 RequestTypes = { REQ_STANDARD : 'Standard', REQ_CLASS : 'Class', REQ_VENDOR : 'Vendor', 3 : 'Reserved' } RCPT_DEVICE = 0 RCPT_INTERFACE = 1 RCPT_ENDPOINT = 2 RequestRecipients = { RCPT_DEVICE : 'Device', RCPT_INTERFACE : 'Interface', RCPT_ENDPOINT : 'Endpoint', 3 : 'Other' } def __init__(self, data): self.bmRequestType = int(data[0], 16) self.bRequest = int(data[1], 16) self.wValue = int(data[2], 16) self.wIndex = int(data[3], 16) self.wLength = int(data[4], 16) self.direction = (self.bmRequestType & 0x80) >> 7 self.type = (self.bmRequestType & 0x60) >> 5 self.recipient = (self.bmRequestType & 0x1f) >> 0 class URB: events = { 'S' : 'Submission', 'C' : 'Callback', 'E' : 'Error' } types = { 'C' : 'Control', 'Z' : 'Isochronous', 'I' : 'Interrupt', 'B' : 'Bulk' } directions = { 'i' : 'IN', 'o' : 'OUT' } def __init__(self, index, data): data = data.strip('\n').split(' ') self.__index = index self.__id = data[0] if data[1].find('.') != -1: self.__timestamp = float(data[1]) else: self.__timestamp = int(data[1]) / 1000000. self.__event = data[2] pipe = data[3].split(':') self.__type = pipe[0][0] self.__dir = pipe[0][1] if len(pipe) == 3: # Legacy, deprecated format without bus number self.__addr = int(pipe[1]) self.__endp = int(pipe[2]) else: # New format with bus number self.__addr = int(pipe[2]) self.__endp = int(pipe[3]) if data[4] == 's': self.__setup = URBSetup(data[5:10]) self.__status = None if len(data) > 11 and data[11] == '=': self.__length = int(data[10]) self.__data = binascii.a2b_hex(''.join(data[12:])) else: self.__length = 0 self.__data = None else: status = data[4].split(':') self.__setup = None if self.__event == 'S': # The status field is always present event for submissions, but # isn't always a parseable integer self.__status = None else: self.__status = int(status[0]) if self.__type == 'Z': frames = int(data[5]) frames = min(frames, 5) self.__length = int(data[6 + frames]) self.__data = None else: self.__length = int(data[5]) self.__data = binascii.a2b_hex(''.join(data[7:])) def __get_id(self): return self.__id def __get_addr(self): return self.__addr def __get_data(self): return self.__data def __get_dir(self): return self.__dir def __get_endp(self): return self.__endp def __get_event(self): return self.__event def __get_index(self): return self.__index def __get_length(self): return self.__length def __get_setup(self): return self.__setup def __get_status(self): return self.__status def __get_timestamp(self): return self.__timestamp def __get_type(self): return self.__type id = property(__get_id, None, None, None) addr = property(__get_addr, None, None, None) data = property(__get_data, None, None, None) dir = property(__get_dir, None, None, None) endp = property(__get_endp, None, None, None) event = property(__get_event, None, None, None) index = property(__get_index, None, None, None) length = property(__get_length, None, None, None) setup = property(__get_setup, None, None, None) status = property(__get_status, None, None, None) timestamp = property(__get_timestamp, None, None, None) type = property(__get_type, None, None, None) class HtmlWriter: def __init__(self, stream, printer): self.__stream = stream self.__printer = printer def begin(self): self.__stream.write(''' USBMon report ''') def write(self, urb): cls = [] if urb.event == 'S': cls.append('urb-submit') elif urb.event == 'C': cls.append('urb-callback') elif urb.event == 'E': cls.append('urb-error') self.__stream.write('%s\n' % (' '.join(cls), self.__printer.urb(urb))) def end(self): self.__stream.write('''
URB Timestamp Event Address Type Direction Endpoint Data
''') class TextWriter: def __init__(self, stream, printer): self.__stream = stream self.__printer = printer def begin(self): pass def write(self, urb): self.__stream.write('%s\n' % self.__printer.urb(urb)) def end(self): pass def usage(argv): print 'Usage: %s input_file' % os.path.basename(argv[0]) print ' -a addr : Filter urbs based on the device address' print ' -c range : Capture data from URBs in range (start-end)' print ' -C class : Device class' print ' -d dir : Filter urbs based on the direction: IN or OUT' print ' -e endp : Filter urbs based on the endpoint' print ' -f fmt : Output format: text or html' print ' -h : Show this help screen' print ' -o file : Output results to file instead of stdout' def main(argv): printers = { 'std' : StandardPrinter, 'ccid': CCIDPrinter, 'uvc' : UVCPrinter } writers = { 'text': TextWriter, 'html': HtmlWriter } opts = getopt.getopt(argv[1:], 'a:c:C:d:e:f:ho:') outf = sys.stdout addr = [] devclass = 'std' endp = -1 capture = None format = 'html' dir = None for opt in opts[0]: if opt[0] == '-a': addr.append(int(opt[1], 0)) elif opt[0] == '-c': capture = [int(v) for v in opt[1].split('-')] elif opt[0] == '-C': devclass = opt[1] elif opt[0] == '-d': dir = opt[1] elif opt[0] == '-e': endp = int(opt[1]) elif opt[0] == '-f': format = opt[1] elif opt[0] == '-h': usage(argv) return 0 elif opt[0] == '-o': outf = open(opt[1], 'wb') if len(opts[1]) != 1: usage(argv) return 0 printer = printers[devclass](format) writer = writers[format](outf, printer) writer.begin() urb_data = open(opts[1][0], 'rb').readlines() index = 0 for urb in urb_data: try: urb = URB(index, urb) except: print 'Unable to parse URB %u: %s' % (index, urb) raise index += 1 if len(addr) and urb.addr not in addr: continue if endp != -1 and urb.endp != endp: continue if dir != None and URB.directions[urb.dir] != dir: continue writer.write(urb) if capture and capture[0] <= index and capture[1] >= index: pass writer.end() outf.close() return 0 if __name__ == "__main__": main(sys.argv)