Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions src/pykmp/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,25 @@ class CrcChecksumInvalidError(BaseCodecError):
"""CRC checksum validation of the data link byte sequence did not pass."""


class TruncatedStuffingError(BaseCodecError):
"""Byte stuffing indicates one more byte at the end of the input"""

def __str__(self) -> str: # noqa: D105
return "Byte stuffing indicates one more byte at the end of the input"


@attrs.frozen(kw_only=True)
class InvalidStuffingByteError(BaseCodecError):
"""Byte stuffing encountered an unrecognized encoded byte"""

raw_byte: int

def __str__(self) -> str: # noqa: D105
return (
f"Byte stuffing encountered an unrecognized encoded byte {self.raw_byte:02X}"
)


@attrs.frozen(kw_only=True)
class ApplicationData:
"""Data class for the data in the application layer of the Kamstrup KMP protocol."""
Expand Down Expand Up @@ -180,22 +199,6 @@ class PhysicalCodec:
direction: PhysicalDirection = attrs.field()
_start_byte: int = attrs.field(init=False) # depends on direction

BYTE_STUFFING_MAP: Final[dict[bytes, bytes]] = {
the_byte.to_bytes(1, "big"): (
constants.ByteCode.STUFFING.value.to_bytes(1, "big")
+ (the_byte ^ 0xFF).to_bytes(1, "big")
)
for the_byte in (
# Order matters for having BYTE_STUFFING as the first; itself is used in the
# escaped sequence.
constants.ByteCode.STUFFING.value,
constants.ByteCode.ACK.value,
constants.ByteCode.START_FROM_METER.value,
constants.ByteCode.START_TO_METER.value,
constants.ByteCode.STOP.value,
)
}

def __attrs_post_init__(self) -> None:
"""Select start byte value according to configuration (direction)."""
self._start_byte = self._direction_to_start_byte(self.direction)
Expand Down Expand Up @@ -238,10 +241,33 @@ def decode(self, frame: PhysicalBytes) -> DataLinkBytes:
)

data_bytes = frame[1:-1]
for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items():
data_bytes = data_bytes.replace(escaped_bytes, unescaped_byte)

return cast(DataLinkBytes, data_bytes)
res = bytes()
i = 0
in_stuffing = False

while i < len(data_bytes):
if in_stuffing:
in_stuffing = False
xored = data_bytes[i] ^ 0xff
if xored not in (
constants.ByteCode.STUFFING.value,
constants.ByteCode.ACK.value,
constants.ByteCode.START_FROM_METER.value,
constants.ByteCode.START_TO_METER.value,
constants.ByteCode.STOP.value,
):
raise InvalidStuffingByteError(raw_byte=data_bytes[i])
res += xored.to_bytes(1, "big")
elif data_bytes[i] == constants.ByteCode.STUFFING.value:
in_stuffing = True
else:
res += data_bytes[i].to_bytes(1, "big")
i += 1

if in_stuffing:
raise TruncatedStuffingError

return cast(DataLinkBytes, res)

def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes:
"""
Expand All @@ -257,12 +283,24 @@ def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes:
)

raw = cast(bytes, data_bytes)
for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items():
raw = raw.replace(unescaped_byte, escaped_bytes)
res = bytes()
i = 0
while i < len(raw):
if raw[i] in (
constants.ByteCode.STUFFING.value,
constants.ByteCode.ACK.value,
constants.ByteCode.START_FROM_METER.value,
constants.ByteCode.START_TO_METER.value,
constants.ByteCode.STOP.value,
):
res += ((constants.ByteCode.STUFFING.value << 8) | (raw[i] ^ 0xff)).to_bytes(2, "big")
else:
res += raw[i].to_bytes(1, "big")
i += 1

frame = (
self._start_byte.to_bytes(1, "big")
+ raw
+ res
+ constants.ByteCode.STOP.value.to_bytes(1, "big")
)
return cast(PhysicalBytes, frame)
Expand Down
23 changes: 23 additions & 0 deletions tests/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
PhysicalCodec,
PhysicalDirection,
UnsupportedDecimalExponentError,
InvalidStuffingByteError,
TruncatedStuffingError,
)

from . import util
Expand All @@ -51,6 +53,13 @@
DataLinkBytes(b"\x3F\x10\x00\x80\x16\x04\x11\x01\x2A\xF0\x24\x63\x03"),
id="Kamstrup doc 6.2.4 GetRegister response (destuffing needed)",
),
pytest.param(
PhysicalDirection.FROM_METER,
# the key part is that the 000B1BE47F 000B0FC8 should not cause recursive decoding
PhysicalBytes(bytes.fromhex('40 3F B8 1BF9 04 43 000B1BE47F 000B0FC8 0d')),
DataLinkBytes(bytes.fromhex('3F B8 06 04 43 000B1B7F 000B0FC8')),
id='GetLogIDPastAbsResponse snippet which was by mistake destuffed too much',
),
],
)
def test_codec_physical_decode(
Expand Down Expand Up @@ -116,6 +125,20 @@ def test_codec_physical_decode_ack(
"Frame is of zero length.",
id="empty",
),
pytest.param(
PhysicalDirection.FROM_METER,
PhysicalBytes(bytes.fromhex('40 3F 1BFF 0D')),
InvalidStuffingByteError,
"Byte stuffing encountered an unrecognized encoded byte FF",
id="Unrecongized stuffing value",
),
pytest.param(
PhysicalDirection.FROM_METER,
PhysicalBytes(bytes.fromhex('40 3F 1B 0D')),
TruncatedStuffingError,
"Byte stuffing indicates one more byte at the end of the input",
id="Truncated stuffing value",
),
],
)
def test_codec_physical_decode_error(
Expand Down