protocol: add helper functions for converting to/from little endians

A few messages where we assumed the argument is a single byte are actually
little-endian 16 bits. let's add easy-to-use wrappers.

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
This commit is contained in:
Peter Hutterer 2019-08-12 14:52:50 +10:00
parent a7836abfe6
commit ff2618e68a
2 changed files with 71 additions and 11 deletions

View File

@ -74,6 +74,38 @@ class TestUtils(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
ProtocolVersion.from_string('IntuosPro') ProtocolVersion.from_string('IntuosPro')
def test_little_u16(self):
values = [
(1, [0x01, 0x00]),
(256, [0x00, 0x01]),
]
for v in values:
self.assertEqual(little_u16(v[0]), bytes(v[1]))
self.assertEqual(little_u16(v[1]), v[0])
invalid = [0x10000, -1, [0x00, 0x00, 0x00]]
for v in invalid:
with self.assertRaises(AssertionError):
little_u16(v)
def test_little_u32(self):
values = [
(1, [0x01, 0x00, 0x00, 0x00]),
(256, [0x00, 0x01, 0x00, 0x00]),
(0x10000, [0x00, 0x00, 0x01, 0x00]),
(0x1000000, [0x00, 0x00, 0x00, 0x01]),
]
for v in values:
self.assertEqual(little_u32(v[0]), bytes(v[1]))
self.assertEqual(little_u32(v[1]), v[0])
invalid = [0x100000000, -1, [0x00, 0x00, 0x00, 0x00, 0x00]]
for v in invalid:
with self.assertRaises(AssertionError):
little_u32(v)
class TestProtocolAny(unittest.TestCase): class TestProtocolAny(unittest.TestCase):
protocol_version = ProtocolVersion.ANY protocol_version = ProtocolVersion.ANY

View File

@ -55,6 +55,34 @@ import enum
import time import time
def little_u16(x):
'''
Convert to or from a 16-bit integer to a little-endian 2-byte array. If
passed an integer, the return value is a 2-byte array. If passed a
2-byte array, the return value is a 16-bit integer.
'''
if isinstance(x, int):
assert(x <= 0xffff and x >= 0x0000)
return x.to_bytes(2, byteorder='little')
else:
assert(len(x) == 2)
return int.from_bytes(x, byteorder='little')
def little_u32(x):
'''
Convert to or from a 16-bit integer to a little-endian 4-byte array. If
passed an integer, the return value is a 4-byte array. If passed a
4-byte array, the return value is a 16-bit integer.
'''
if isinstance(x, int):
assert(x <= 0xffffffff and x >= 0x00000000)
return x.to_bytes(4, byteorder='little')
else:
assert(len(x) == 4)
return int.from_bytes(x, byteorder='little')
class Interactions(enum.Enum): class Interactions(enum.Enum):
'''All possible interactions with a device. Not all of these '''All possible interactions with a device. Not all of these
interactions may be available on any specific device.''' interactions may be available on any specific device.'''
@ -622,7 +650,7 @@ class MsgGetTimeIntuosPro(Msg):
if reply.length != 6: if reply.length != 6:
raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}') raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}')
self.timestamp = int.from_bytes(reply[0:4], byteorder='little') # bytes[5:6] are ms self.timestamp = little_u32(reply[0:4]) # bytes[5:6] are ms
class MsgSetTime(Msg): class MsgSetTime(Msg):
@ -662,7 +690,7 @@ class MsgSetTimeIntuosPro(Msg):
def __init__(self, timestamp, *args, **kwargs): def __init__(self, timestamp, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.timestamp = int(timestamp) self.timestamp = int(timestamp)
self.args = list(self.timestamp.to_bytes(length=4, byteorder='little')) + [0x00, 0x00] self.args = list(little_u32(self.timestamp)) + [0x00, 0x00]
# uses the default 0xb3 handler # uses the default 0xb3 handler
@ -782,10 +810,10 @@ class MsgGetWidth(Msg):
if reply.opcode != 0xeb: if reply.opcode != 0xeb:
raise UnexpectedReply(self) raise UnexpectedReply(self)
if reply[0] != 0x3 or len(reply) != 6: if little_u16(reply[0:2]) != 0x3 or len(reply) != 6:
raise UnexpectedDataError(reply) raise UnexpectedDataError(reply)
self.width = int.from_bytes(reply[2:4], byteorder='little') self.width = little_u32(reply[2:6])
class MsgGetHeight(Msg): class MsgGetHeight(Msg):
@ -800,16 +828,16 @@ class MsgGetHeight(Msg):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.args = [0x04, 0x00] self.args = little_u16(0x04)
def _handle_reply(self, reply): def _handle_reply(self, reply):
if reply.opcode != 0xeb: if reply.opcode != 0xeb:
raise UnexpectedReply(self) raise UnexpectedReply(self)
if reply[0] != 0x4 or len(reply) != 6: if little_u16(reply[0:2]) != 0x4 or len(reply) != 6:
raise UnexpectedDataError(reply) raise UnexpectedDataError(reply)
self.height = int.from_bytes(reply[2:4], byteorder='little') self.height = little_u32(reply[2:6])
class MsgUnknownE3Command(Msg): class MsgUnknownE3Command(Msg):
@ -914,7 +942,7 @@ class MsgGetStrokesSlate(Msg):
if reply.opcode != 0xcf: if reply.opcode != 0xcf:
raise UnexpectedReply(reply) raise UnexpectedReply(reply)
self.count = int.from_bytes(reply[0:4], byteorder='little') self.count = little_u32(reply[0:4])
str_timestamp = ''.join([f'{d:02x}' for d in reply[4:]]) str_timestamp = ''.join([f'{d:02x}' for d in reply[4:]])
t = time.strptime(str_timestamp, '%y%m%d%H%M%S') t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
self.timestamp = calendar.timegm(t) self.timestamp = calendar.timegm(t)
@ -940,8 +968,8 @@ class MsgGetStrokesIntuosPro(Msg):
if reply.opcode != 0xcf: if reply.opcode != 0xcf:
raise UnexpectedReply(reply) raise UnexpectedReply(reply)
self.count = int.from_bytes(reply[0:4], byteorder='little') self.count = little_u32(reply[0:4])
seconds = int.from_bytes(reply[4:], byteorder='little') seconds = little_u32(reply[4:8])
self.timestamp = seconds self.timestamp = seconds
@ -976,7 +1004,7 @@ class MsgGetDataAvailableSlate(Msg):
if reply.opcode != 0xc2: if reply.opcode != 0xc2:
raise UnexpectedReply(self) raise UnexpectedReply(self)
self.count = int.from_bytes(reply[0:2], byteorder='little') self.count = little_u16(reply[0:2])
class MsgStartReading(Msg): class MsgStartReading(Msg):