From 92f837078fcf29399a8bd639a3a8ff7a30df6414 Mon Sep 17 00:00:00 2001 From: Laurent Pinchart Date: Wed, 26 Feb 2014 15:59:18 +0100 Subject: Initial import Signed-off-by: Laurent Pinchart --- usbmon.py | 761 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 761 insertions(+) create mode 100755 usbmon.py diff --git a/usbmon.py b/usbmon.py new file mode 100755 index 0000000..23d694b --- /dev/null +++ b/usbmon.py @@ -0,0 +1,761 @@ +#!/usr/bin/python +# +# USB monitor parser and formatter +# +# Copyright 2009 Laurent Pinchart +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import binascii +import errno +import getopt +import struct +import sys +import os.path + +def printable(c): + if ord(c) >= 0x20: + return c + else: + return '.' + +def format_buffer(data): + buffer = [] + for i in range((len(data)+7)/8): + line = data[i*8:(i+1)*8] + ascii = [printable(c) for c in line] + line = '%s %s %s%s' % (binascii.b2a_hex(line[0:4]), + binascii.b2a_hex(line[4:8]), + ' ' * (16-2*len(line)), ''.join(ascii)) + buffer.append(line) + + return buffer + +class StandardPrinter: + + def __init__(self, mode): + self.mode = mode + + def request(self, setup): + + request = '%02x' + wValue = '%04x' % setup.wValue + wIndex = '%04x' % setup.wIndex + + if setup.recipient > URBSetup.RCPT_ENDPOINT: + pass + elif setup.bRequest == 0: + request = 'GET_STATUS (%02x)' + elif setup.bRequest == 1: + request = 'CLEAR_FEATURE (%02x)' + if setup.recipient == 2 and setup.wValue == 0: + wValue = 'ENDPOINT_HALT (0000)' + if setup.wIndex & 0x0080: + wIndex = 'EP %u IN (%04x)' % (setup.wIndex & 0x000f, setup.wIndex) + else: + wIndex = 'EP %u OUT (%04x)' % (setup.wIndex & 0x000f, setup.wIndex) + elif setup.recipient == 0 and setup.wValue == 1: + wValue = 'DEVICE_REMOTE_WAKEUP (0001)' + elif setup.recipient == 0 and setup.wValue == 2: + wValue = 'TEST_MODE (0002)' + elif setup.bRequest == 2: + request = 'Reserved (%02x)' + elif setup.bRequest == 3: + request = 'SET_FEATURE (%02x)' + elif setup.bRequest == 4: + request = 'Reserved (%02x)' + elif setup.bRequest == 5: + request = 'SET_ADDRESS (%02x)' + elif setup.bRequest == 6: + request = 'GET_DESCRIPTOR (%02x)' + try: + dType = URBSetup.DescriptorTypes[setup.wValue >> 8] + except: + dType = 'Unknown (%u)' % (setup.wValue >> 8) + dIndex = setup.wValue & 0xff + wValue = '%s %d (%04x)' % (dType, dIndex, setup.wValue) + elif setup.bRequest == 7: + request = 'SET_DESCRIPTOR (%02x)' + elif setup.bRequest == 8: + request = 'GET_CONFIGURATION (%02x)' + elif setup.bRequest == 9: + request = 'SET_CONFIGURATION (%02x)' + elif setup.bRequest == 10: + request = 'GET_INTERFACE (%02x)' + elif setup.bRequest == 11: + request = 'SET_INTERFACE (%02x)' + elif setup.bRequest == 12: + request = 'SYNCH_FRAME (%02x)' + else: + request = 'Unknown request %02x' + + return request, wValue, wIndex + + def _setup_text(self, setup): + return 'SETUP %02x %02x %04x %04x %04x' % \ + (setup.bmRequestType, setup.bRequest, setup.wValue, + setup.wIndex, setup.wLength) + + def _setup_html(self, setup): + + if setup.direction: + dir = 'Device-to-host' + else: + dir = 'Host-to-device' + + type = URBSetup.RequestTypes[setup.type] + try: + recipientName = URBSetup.RequestRecipients[setup.recipient] + except KeyError, e: + recipientName = 'Reserved (%u)' % setup.recipient + + request = '%02x' + wValue = '%04x' % setup.wValue + wIndex = '%04x' % setup.wIndex + wLength = '%04x' % setup.wLength + + request, wValue, wIndex = self.request(setup) + + bmRequestType = '%s %s request to %s' % (dir, type, recipientName) + html = [] + html.append('SETUP %s' % bmRequestType) + html.append('bRequest: %s' % (request % setup.bRequest)) + html.append('wValue: %s' % wValue) + html.append('wIndex: %s' % wIndex) + html.append('wLength: %s' % wLength) + return '\n'.join(['

%s

' % p for p in html]) + + def setup(self, setup): + if self.mode == 'html': + return self._setup_html(setup) + else: + return self._setup_text(setup) + + def _bulk_text(self, data, dir): + return format_buffer(data) + + def _bulk_html(self, data, dir): + return '
%s
' % '\n'.join(format_buffer(data)) + + def bulk(self, data, dir): + if self.mode == 'html': + return self._bulk_html(data, dir) + else: + return self._bulk_text(data, dir) + + def _control_text(self, setup, data): + return format_buffer(data) + + def _control_html(self, setup, data): + return '
%s
' % '\n'.join(format_buffer(data)) + + def control(self, setup, data): + if self.mode == 'html': + return self._control_html(setup, data) + else: + return self._control_text(setup, data) + + def _urb_html(self, urb): + timestamp = '%.06f' % urb.timestamp + event = URB.events[urb.event] + dir = URB.directions[urb.dir] + type = URB.types[urb.type] + info = ['%s (%s)' % (urb.index, urb.id), timestamp, event, urb.addr, type, dir, urb.endp] + urbdata = '' + if urb.setup: + urbdata += self.setup(urb.setup) + + if urb.length > 0 or urb.setup is None: + data = [] + if urb.setup: + data.append(' ') + if urb.event == 'S' or urb.status is None: + data.append('%u data bytes' % urb.length) + elif urb.status != 0: + status = errno.errorcode[-urb.status] + data.append('%u data bytes (status %s)' % + (urb.length, status)) + else: + data.append('%u data bytes' % urb.length) + + if urb.data: + if urb.type == 'B': + data.append(self.bulk(urb.data, urb.dir)) + elif urb.type == 'C': + data.append(self.control(urb.setup, urb.data)) + else: + data.append('
%s
' % '\n'.join(format_buffer(urb.data))) + + urbdata += '\n'.join(['

%s

' % p for p in data]) + + info.append(urbdata) + html = ''.join(['%s' % inf for inf in info]) + return html + + def _urb_text(self, urb): + timestamp = '%.06f' % urb.timestamp + event = URB.events[urb.event] + dir = URB.directions[urb.dir] + type = URB.types[urb.type] + typedir = '%s %s' % (type, dir) + + info = '' + if urb.setup: + info += '\n\t\t\t%s' % self.setup(urb.setup) + + if urb.length > 0: + data = [] + if urb.event == 'S' or urb.status is None: + data.append('%u data bytes' % urb.length) + else: + data.append('%u data bytes (%d)' % (urb.length, urb.status)) + + if urb.data: + if urb.type == 'B': + data.extend(self.bulk(urb.data, urb.dir)) + elif urb.type == 'C': + data.extend(self.control(urb.setup, urb.data)) + else: + data.extend(format_buffer(urb.data)) + + info = ''.join(['\n\t\t\t%s' % p for p in data]) + '\n' + + urb = '%u\t%s\t%s\t%s%s\t%u/EP%u%s' % (urb.index, timestamp, event, + typedir, ' '*(10-len(typedir)), urb.addr, urb.endp, info) + return urb + + def urb(self, urb): + if self.mode == 'html': + return self._urb_html(urb) + else: + return self._urb_text(urb) + +class CCIDPrinter(StandardPrinter): + + SET_PARAMETERS = 0x61 + ICC_POWER_ON = 0x62 + ICC_POWER_OFF = 0x63 + GET_SLOT_STATUS = 0x65 + SECURE = 0x69 + T0APDU = 0x6a + ESCAPE_OUT = 0x6b + GET_PARAMETERS = 0x6c + RESET_PARAMETERS = 0x6d + ICC_CLOCK = 0x6e + XFR_BLOCK = 0x6f + MECHANICAL = 0x71 + ABORT = 0x72 + SET_DATA_RATE = 0x73 + DATA_BLOCK = 0x80 + SLOT_STATUS = 0x81 + PARAMETERS = 0x82 + ESCAPE_IN = 0x83 + DATA_RATE = 0x84 + + BulkOutCommands = { + SET_PARAMETERS : 'Set Parameters', + ICC_POWER_ON : 'ICC Power On', + ICC_POWER_OFF : 'Icc Power Off', + GET_SLOT_STATUS : 'Get Slot Status', + SECURE : 'Secure', + T0APDU : 'T0APDU', + ESCAPE_OUT : 'Escape', + GET_PARAMETERS : 'Get Parameters', + RESET_PARAMETERS : 'Reset Parameters', + ICC_CLOCK : 'ICC Clock', + XFR_BLOCK : 'Transfer Block', + MECHANICAL : 'Mechanical', + ABORT : 'Abort' + } + + BulkInCommands = { + SET_DATA_RATE : 'Set Data Rate', + DATA_BLOCK : 'Data Block', + SLOT_STATUS : 'Slot Status', + PARAMETERS : 'Parameters', + ESCAPE_IN : 'Escape', + DATA_RATE : 'Data Rate' + } + + def __init__(self, mode = 'text'): + StandardPrinter.__init__(self, mode) + + def _bulk_out_text(self, data): + pass + + def _bulk_out_html(self, data): + bMessageType, dwLength, bSlot, bSeq, bMessage = \ + struct.unpack(' 10: + html.append('
%s
' % '\n'.join(format_buffer(data[10:]))) + return '\n'.join(['

%s

' % p for p in html]) + + def _bulk_in_text(self, data): + pass + + def _bulk_in_html(self, data): + bMessageType, dwLength, bSlot, bSeq, bMessage = \ + struct.unpack(' 10: + html.append('
%s
' % '\n'.join(format_buffer(data[10:]))) + return '\n'.join(['

%s

' % p for p in html]) + + def bulk(self, data, dir): + if dir == 'i': + if self.mode == 'html': + return self._bulk_in_html(data) + elif self.mode == 'text': + return self._bulk_in_text(data) + else: + if self.mode == 'html': + return self._bulk_out_html(data) + elif self.mode == 'text': + return self._bulk_out_text(data) + +class UVCPrinter(StandardPrinter): + + SET_CUR = 0x01 + GET_CUR = 0x81 + GET_MIN = 0x82 + GET_MAX = 0x83 + GET_RES = 0x84 + GET_LEN = 0x85 + GET_INFO = 0x86 + GET_DEF = 0x87 + + Requests = { + SET_CUR : 'SET CUR', + GET_CUR : 'GET CUR', + GET_MIN : 'GET MIN', + GET_MAX : 'GET MAX', + GET_RES : 'GET RES', + GET_LEN : 'GET LEN', + GET_INFO : 'GET INFO', + GET_DEF : 'GET DEF' + } + + def __init__(self, mode = 'text'): + StandardPrinter.__init__(self, mode) + + def request(self, setup): + if setup.type != URBSetup.REQ_CLASS: + return StandardPrinter.request(self, setup) + + request = '%02x' + wValue = '%04x' % setup.wValue + wIndex = '%04x' % setup.wIndex + + if setup.recipient != URBSetup.RCPT_INTERFACE: + return request, wValue, wIndex + + try: + request = UVCPrinter.Requests[setup.bRequest] + request = request + ' (%02x)' + except KeyError, e: + request = 'Unknown (%02x)' + + entity = setup.wIndex >> 8 + interface = setup.wIndex & 0xff + wIndex = 'INTF %u ENTITY %u (%04x)' % (interface, entity, setup.wIndex) + + return request, wValue, wIndex + + def _control_text(self, setup, data): + return format_buffer(data) + + def _control_html(self, setup, data): + # In order to perform proper decoding we need a copy of the USB + # descriptors to find out which control the request is targetted + # at. As a quick hack, decode 26 bytes or 32 bytes requests as + # video probe/commit. + + if len(data) == 26: + data = struct.unpack('%s' % '\n'.join(format_buffer(data)) + + html = [] + html.append('bmHint 0x%02x' % data[0]) + html.append('bFormatIndex %9u' % data[1]) + html.append('bFrameIndex %9u' % data[2]) + html.append('dwFrameInterval %9u' % data[3]) + html.append('wKeyFrameRate %9u' % data[4]) + html.append('wPFrameRate %9u' % data[5]) + html.append('wCompQuality %9u' % data[6]) + html.append('wCompWindowSize %9u' % data[7]) + html.append('wDelay %9u' % data[8]) + html.append('dwMaxVideoFrameSize %9u' % data[9]) + html.append('dwMaxPayloadTransferSize %9u' % data[10]) + if len(data) > 11: + html.append('dwClockFrequency %9u' % data[11]) + html.append('bmFramingInfo 0x%02x' % data[12]) + html.append('bPreferredVersion %9u' % data[13]) + html.append('bMinVersion %9u' % data[14]) + html.append('bMaxVersion %9u' % data[15]) + return '
%s
' % '\n'.join(html) + +class URBSetup: + + DescriptorTypes = { + 1 : 'DEVICE', + 2 : 'CONFIGURATION', + 3 : 'STRING', + 4 : 'INTERFACE', + 5 : 'ENDPOINT', + 6 : 'DEVICE_QUALIFIER', + 7 : 'OTHER_SPEED_CONFIGURATION', + 8 : 'INTERFACE_POWER' + } + + REQ_STANDARD = 0 + REQ_CLASS = 1 + REQ_VENDOR = 2 + + RequestTypes = { + REQ_STANDARD : 'Standard', + REQ_CLASS : 'Class', + REQ_VENDOR : 'Vendor', + 3 : 'Reserved' + } + + RCPT_DEVICE = 0 + RCPT_INTERFACE = 1 + RCPT_ENDPOINT = 2 + + RequestRecipients = { + RCPT_DEVICE : 'Device', + RCPT_INTERFACE : 'Interface', + RCPT_ENDPOINT : 'Endpoint', + 3 : 'Other' + } + + def __init__(self, data): + self.bmRequestType = int(data[0], 16) + self.bRequest = int(data[1], 16) + self.wValue = int(data[2], 16) + self.wIndex = int(data[3], 16) + self.wLength = int(data[4], 16) + + self.direction = (self.bmRequestType & 0x80) >> 7 + self.type = (self.bmRequestType & 0x60) >> 5 + self.recipient = (self.bmRequestType & 0x1f) >> 0 + + +class URB: + + events = { 'S' : 'Submission', + 'C' : 'Callback', + 'E' : 'Error' } + types = { 'C' : 'Control', + 'Z' : 'Isochronous', + 'I' : 'Interrupt', + 'B' : 'Bulk' } + directions = { 'i' : 'IN', + 'o' : 'OUT' } + + def __init__(self, index, data): + data = data.strip('\n').split(' ') + self.__index = index + self.__id = data[0] + if data[1].find('.') != -1: + self.__timestamp = float(data[1]) + else: + self.__timestamp = int(data[1]) / 1000000. + self.__event = data[2] + pipe = data[3].split(':') + self.__type = pipe[0][0] + self.__dir = pipe[0][1] + if len(pipe) == 3: + # Legacy, deprecated format without bus number + self.__addr = int(pipe[1]) + self.__endp = int(pipe[2]) + else: + # New format with bus number + self.__addr = int(pipe[2]) + self.__endp = int(pipe[3]) + + if data[4] == 's': + self.__setup = URBSetup(data[5:10]) + self.__status = None + if len(data) > 11 and data[11] == '=': + self.__length = int(data[10]) + self.__data = binascii.a2b_hex(''.join(data[12:])) + else: + self.__length = 0 + self.__data = None + else: + status = data[4].split(':') + self.__setup = None + if self.__event == 'S': + # The status field is always present event for submissions, but + # isn't always a parseable integer + self.__status = None + else: + self.__status = int(status[0]) + if self.__type == 'Z': + frames = int(data[5]) + frames = min(frames, 5) + self.__length = int(data[6 + frames]) + self.__data = None + else: + self.__length = int(data[5]) + self.__data = binascii.a2b_hex(''.join(data[7:])) + + def __get_id(self): + return self.__id + def __get_addr(self): + return self.__addr + def __get_data(self): + return self.__data + def __get_dir(self): + return self.__dir + def __get_endp(self): + return self.__endp + def __get_event(self): + return self.__event + def __get_index(self): + return self.__index + def __get_length(self): + return self.__length + def __get_setup(self): + return self.__setup + def __get_status(self): + return self.__status + def __get_timestamp(self): + return self.__timestamp + def __get_type(self): + return self.__type + + id = property(__get_id, None, None, None) + addr = property(__get_addr, None, None, None) + data = property(__get_data, None, None, None) + dir = property(__get_dir, None, None, None) + endp = property(__get_endp, None, None, None) + event = property(__get_event, None, None, None) + index = property(__get_index, None, None, None) + length = property(__get_length, None, None, None) + setup = property(__get_setup, None, None, None) + status = property(__get_status, None, None, None) + timestamp = property(__get_timestamp, None, None, None) + type = property(__get_type, None, None, None) + + +class HtmlWriter: + def __init__(self, stream, printer): + self.__stream = stream + self.__printer = printer + + def begin(self): + self.__stream.write(''' + + USBMon report + + + + + + + + + + + + + + + ''') + + def write(self, urb): + cls = [] + if urb.event == 'S': + cls.append('urb-submit') + elif urb.event == 'C': + cls.append('urb-callback') + elif urb.event == 'E': + cls.append('urb-error') + self.__stream.write('%s\n' % (' '.join(cls), self.__printer.urb(urb))) + + def end(self): + self.__stream.write(''' +
URBTimestampEventAddressTypeDirectionEndpointData
+ + +''') + + +class TextWriter: + def __init__(self, stream, printer): + self.__stream = stream + self.__printer = printer + + def begin(self): + pass + + def write(self, urb): + self.__stream.write('%s\n' % self.__printer.urb(urb)) + + def end(self): + pass + + +def usage(argv): + print 'Usage: %s input_file' % os.path.basename(argv[0]) + print ' -a addr : Filter urbs based on the device address' + print ' -c range : Capture data from URBs in range (start-end)' + print ' -C class : Device class' + print ' -d dir : Filter urbs based on the direction: IN or OUT' + print ' -e endp : Filter urbs based on the endpoint' + print ' -f fmt : Output format: text or html' + print ' -h : Show this help screen' + print ' -o file : Output results to file instead of stdout' + +def main(argv): + + printers = { + 'std' : StandardPrinter, + 'ccid': CCIDPrinter, + 'uvc' : UVCPrinter + } + + writers = { + 'text': TextWriter, + 'html': HtmlWriter + } + + opts = getopt.getopt(argv[1:], 'a:c:C:d:e:f:ho:') + outf = sys.stdout + addr = -1 + devclass = 'std' + endp = -1 + capture = None + format = 'html' + dir = None + + for opt in opts[0]: + if opt[0] == '-a': + addr = int(opt[1]) + elif opt[0] == '-c': + capture = [int(v) for v in opt[1].split('-')] + elif opt[0] == '-C': + devclass = opt[1] + elif opt[0] == '-d': + dir = opt[1] + elif opt[0] == '-e': + endp = int(opt[1]) + elif opt[0] == '-f': + format = opt[1] + elif opt[0] == '-h': + usage(argv) + return 0 + elif opt[0] == '-o': + outf = open(opt[1], 'wb') + + if len(opts[1]) != 1: + usage(argv) + return 0 + + printer = printers[devclass](format) + + writer = writers[format](outf, printer) + writer.begin() + + urb_data = open(opts[1][0], 'rb').readlines() + index = 0 + for urb in urb_data: + try: + urb = URB(index, urb) + except: + print 'Unable to parse URB %u: %s' % (index, urb) + raise + + index += 1 + if addr != -1 and urb.addr != addr: + continue + if endp != -1 and urb.endp != endp: + continue + if dir != None and URB.directions[urb.dir] != dir: + continue + + writer.write(urb) + + if capture and capture[0] <= index and capture[1] >= index: + pass + + writer.end() + outf.close() + return 0 + + +if __name__ == "__main__": + main(sys.argv) + -- cgit v1.2.3