From ff2618e68a1b40a883fb53651bc14fec79ad4d3b Mon Sep 17 00:00:00 2001 From: Peter Hutterer Date: Mon, 12 Aug 2019 14:52:50 +1000 Subject: [PATCH] protocol: add helper functions for converting to/from little endians A few messages where we assumed the argument is a single byte are actually little-endian 16 bits. let's add easy-to-use wrappers. Signed-off-by: Peter Hutterer --- test/test_messages.py | 32 +++++++++++++++++++++++++++ tuhi/protocol.py | 50 +++++++++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/test/test_messages.py b/test/test_messages.py index a3f40c6..523b241 100644 --- a/test/test_messages.py +++ b/test/test_messages.py @@ -74,6 +74,38 @@ class TestUtils(unittest.TestCase): with self.assertRaises(ValueError): ProtocolVersion.from_string('IntuosPro') + def test_little_u16(self): + values = [ + (1, [0x01, 0x00]), + (256, [0x00, 0x01]), + ] + + for v in values: + self.assertEqual(little_u16(v[0]), bytes(v[1])) + self.assertEqual(little_u16(v[1]), v[0]) + + invalid = [0x10000, -1, [0x00, 0x00, 0x00]] + for v in invalid: + with self.assertRaises(AssertionError): + little_u16(v) + + def test_little_u32(self): + values = [ + (1, [0x01, 0x00, 0x00, 0x00]), + (256, [0x00, 0x01, 0x00, 0x00]), + (0x10000, [0x00, 0x00, 0x01, 0x00]), + (0x1000000, [0x00, 0x00, 0x00, 0x01]), + ] + + for v in values: + self.assertEqual(little_u32(v[0]), bytes(v[1])) + self.assertEqual(little_u32(v[1]), v[0]) + + invalid = [0x100000000, -1, [0x00, 0x00, 0x00, 0x00, 0x00]] + for v in invalid: + with self.assertRaises(AssertionError): + little_u32(v) + class TestProtocolAny(unittest.TestCase): protocol_version = ProtocolVersion.ANY diff --git a/tuhi/protocol.py b/tuhi/protocol.py index 76cb5a9..996c115 100644 --- a/tuhi/protocol.py +++ b/tuhi/protocol.py @@ -55,6 +55,34 @@ import enum import time +def little_u16(x): + ''' + Convert to or from a 16-bit integer to a little-endian 2-byte array. If + passed an integer, the return value is a 2-byte array. If passed a + 2-byte array, the return value is a 16-bit integer. + ''' + if isinstance(x, int): + assert(x <= 0xffff and x >= 0x0000) + return x.to_bytes(2, byteorder='little') + else: + assert(len(x) == 2) + return int.from_bytes(x, byteorder='little') + + +def little_u32(x): + ''' + Convert to or from a 16-bit integer to a little-endian 4-byte array. If + passed an integer, the return value is a 4-byte array. If passed a + 4-byte array, the return value is a 16-bit integer. + ''' + if isinstance(x, int): + assert(x <= 0xffffffff and x >= 0x00000000) + return x.to_bytes(4, byteorder='little') + else: + assert(len(x) == 4) + return int.from_bytes(x, byteorder='little') + + class Interactions(enum.Enum): '''All possible interactions with a device. Not all of these interactions may be available on any specific device.''' @@ -622,7 +650,7 @@ class MsgGetTimeIntuosPro(Msg): if reply.length != 6: raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}') - self.timestamp = int.from_bytes(reply[0:4], byteorder='little') # bytes[5:6] are ms + self.timestamp = little_u32(reply[0:4]) # bytes[5:6] are ms class MsgSetTime(Msg): @@ -662,7 +690,7 @@ class MsgSetTimeIntuosPro(Msg): def __init__(self, timestamp, *args, **kwargs): super().__init__(*args, **kwargs) self.timestamp = int(timestamp) - self.args = list(self.timestamp.to_bytes(length=4, byteorder='little')) + [0x00, 0x00] + self.args = list(little_u32(self.timestamp)) + [0x00, 0x00] # uses the default 0xb3 handler @@ -782,10 +810,10 @@ class MsgGetWidth(Msg): if reply.opcode != 0xeb: raise UnexpectedReply(self) - if reply[0] != 0x3 or len(reply) != 6: + if little_u16(reply[0:2]) != 0x3 or len(reply) != 6: raise UnexpectedDataError(reply) - self.width = int.from_bytes(reply[2:4], byteorder='little') + self.width = little_u32(reply[2:6]) class MsgGetHeight(Msg): @@ -800,16 +828,16 @@ class MsgGetHeight(Msg): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.args = [0x04, 0x00] + self.args = little_u16(0x04) def _handle_reply(self, reply): if reply.opcode != 0xeb: raise UnexpectedReply(self) - if reply[0] != 0x4 or len(reply) != 6: + if little_u16(reply[0:2]) != 0x4 or len(reply) != 6: raise UnexpectedDataError(reply) - self.height = int.from_bytes(reply[2:4], byteorder='little') + self.height = little_u32(reply[2:6]) class MsgUnknownE3Command(Msg): @@ -914,7 +942,7 @@ class MsgGetStrokesSlate(Msg): if reply.opcode != 0xcf: raise UnexpectedReply(reply) - self.count = int.from_bytes(reply[0:4], byteorder='little') + self.count = little_u32(reply[0:4]) str_timestamp = ''.join([f'{d:02x}' for d in reply[4:]]) t = time.strptime(str_timestamp, '%y%m%d%H%M%S') self.timestamp = calendar.timegm(t) @@ -940,8 +968,8 @@ class MsgGetStrokesIntuosPro(Msg): if reply.opcode != 0xcf: raise UnexpectedReply(reply) - self.count = int.from_bytes(reply[0:4], byteorder='little') - seconds = int.from_bytes(reply[4:], byteorder='little') + self.count = little_u32(reply[0:4]) + seconds = little_u32(reply[4:8]) self.timestamp = seconds @@ -976,7 +1004,7 @@ class MsgGetDataAvailableSlate(Msg): if reply.opcode != 0xc2: raise UnexpectedReply(self) - self.count = int.from_bytes(reply[0:2], byteorder='little') + self.count = little_u16(reply[0:2]) class MsgStartReading(Msg):