feat: add Command classes
This commit is contained in:
66
src/command.py
Normal file
66
src/command.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import abc
|
||||||
|
from enum import IntEnum
|
||||||
|
import struct
|
||||||
|
from typing import Type
|
||||||
|
|
||||||
|
|
||||||
|
class CommandType(IntEnum):
|
||||||
|
CAR_CONTROL = 0
|
||||||
|
|
||||||
|
|
||||||
|
class CarControl(IntEnum):
|
||||||
|
FORWARD = 0
|
||||||
|
BACKWARD = 1
|
||||||
|
LEFT = 2
|
||||||
|
RIGHT = 3
|
||||||
|
|
||||||
|
|
||||||
|
class Command(abc.ABC):
|
||||||
|
TYPE: CommandType
|
||||||
|
REGISTRY: dict[CommandType, Type[Command]] = {}
|
||||||
|
|
||||||
|
def __init_subclass__(cls) -> None:
|
||||||
|
super().__init_subclass__()
|
||||||
|
if cls.TYPE in Command.REGISTRY:
|
||||||
|
raise ValueError(
|
||||||
|
f"Command type {cls.TYPE} already registered by {Command.REGISTRY[cls.TYPE]}"
|
||||||
|
)
|
||||||
|
Command.REGISTRY[cls.TYPE] = cls
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_payload(self) -> bytes: ...
|
||||||
|
|
||||||
|
def pack(self) -> bytes:
|
||||||
|
payload: bytes = self.get_payload()
|
||||||
|
return struct.pack(">B", self.TYPE) + payload
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def unpack(data: bytes) -> Command:
|
||||||
|
type: CommandType = CommandType(data[0])
|
||||||
|
return Command.REGISTRY[type].from_payload(data[1:])
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def from_payload(cls, payload: bytes) -> Command: ...
|
||||||
|
|
||||||
|
|
||||||
|
class ControlCommand(Command):
|
||||||
|
TYPE = CommandType.CAR_CONTROL
|
||||||
|
__match_args__ = ("control", "active")
|
||||||
|
|
||||||
|
def __init__(self, control: CarControl, active: bool) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.control: CarControl = control
|
||||||
|
self.active: bool = active
|
||||||
|
|
||||||
|
def get_payload(self) -> bytes:
|
||||||
|
return struct.pack(">B", (self.control << 1) | self.active)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_payload(cls, payload: bytes) -> Command:
|
||||||
|
value: int = payload[0]
|
||||||
|
active: bool = (value & 1) == 1
|
||||||
|
control: int = value >> 1
|
||||||
|
return ControlCommand(CarControl(control), active)
|
||||||
Reference in New Issue
Block a user