Stuff

This commit is contained in:
2026-01-28 17:49:05 +01:00
parent 39ed75298f
commit 9cb7bcedf1
9 changed files with 5848 additions and 57 deletions

View File

@@ -34,33 +34,33 @@ class Pine:
""" IPC result codes. A list of possible result codes the IPC can send back. Each one of them is what we call an
"opcode" or "tag" and is the first byte sent by the IPC to differentiate between results.
"""
IPC_OK = 0, # IPC command successfully completed.
IPC_OK = 0 # IPC command successfully completed.
IPC_FAIL = 0xFF # IPC command failed to complete.
class IPCCommand(IntEnum):
READ8 = 0,
READ16 = 1,
READ32 = 2,
READ64 = 3,
WRITE8 = 4,
WRITE16 = 5,
WRITE32 = 6,
WRITE64 = 7,
VERSION = 8,
SAVE_STATE = 9,
LOAD_STATE = 0xA,
TITLE = 0xB,
ID = 0xC,
UUID = 0xD,
GAME_VERSION = 0xE,
STATUS = 0xF,
UNIMPLEMENTED = 0xFF,
READ8 = 0
READ16 = 1
READ32 = 2
READ64 = 3
WRITE8 = 4
WRITE16 = 5
WRITE32 = 6
WRITE64 = 7
VERSION = 8
SAVE_STATE = 9
LOAD_STATE = 0xA
TITLE = 0xB
ID = 0xC
UUID = 0xD
GAME_VERSION = 0xE
STATUS = 0xF
UNIMPLEMENTED = 0xFF
class DataSize(IntEnum):
INT8 = 1,
INT16 = 2,
INT32 = 4,
INT64 = 8,
INT8 = 1
INT16 = 2
INT32 = 4
INT64 = 8
def __init__(self, slot: int = 28011):
if not 0 < slot <= 65536:
@@ -161,7 +161,7 @@ class Pine:
def write_float(self, address: int, value: float) -> None:
request = Pine._create_request(Pine.IPCCommand.WRITE32, address, 9 + Pine.DataSize.INT32)
request + struct.pack("<f", value)
request += struct.pack("<f", value)
self._send_request(request)
def write_bytes(self, address: int, data: bytes) -> None:
@@ -189,9 +189,124 @@ class Pine:
self._send_request(request)
bytes_written += 1
def batch_read_int32(self, addresses: list[int]) -> list[int]:
"""Read multiple int32 values in a single request. Much faster than individual reads."""
if not addresses:
return []
# Build batched request
request = b''
for address in addresses:
request += Pine.to_bytes(Pine.IPCCommand.READ32, 1)
request += Pine.to_bytes(address, 4)
# Prepend total size
total_size = len(request) + 4
request = Pine.to_bytes(total_size, 4) + request
response = self._send_request(request)
# Parse results - one status byte for the entire batch, then data
results = []
offset = 4 # Skip overall size header
offset += 1 # Skip single status byte for entire batch
for _ in addresses:
value = Pine.from_bytes(response[offset:offset + 4])
results.append(value)
offset += 4
return results
def batch_read_int16(self, addresses: list[int]) -> list[int]:
"""Read multiple int16 values in a single request."""
if not addresses:
return []
request = b''
for address in addresses:
request += Pine.to_bytes(Pine.IPCCommand.READ16, 1)
request += Pine.to_bytes(address, 4)
total_size = len(request) + 4
request = Pine.to_bytes(total_size, 4) + request
response = self._send_request(request)
# Parse results - one status byte for the entire batch, then data
results = []
offset = 4 # Skip overall size header
offset += 1 # Skip single status byte for entire batch
for _ in addresses:
value = Pine.from_bytes(response[offset:offset + 2])
results.append(value)
offset += 2
return results
def batch_read_int8(self, addresses: list[int]) -> list[int]:
"""Read multiple int8 values in a single request."""
if not addresses:
return []
request = b''
for address in addresses:
request += Pine.to_bytes(Pine.IPCCommand.READ8, 1)
request += Pine.to_bytes(address, 4)
total_size = len(request) + 4
request = Pine.to_bytes(total_size, 4) + request
response = self._send_request(request)
# Parse results - one status byte for the entire batch, then data
results = []
offset = 4 # Skip overall size header
offset += 1 # Skip single status byte for entire batch
for _ in addresses:
value = Pine.from_bytes(response[offset:offset + 1])
results.append(value)
offset += 1
return results
def batch_write_int32(self, operations: list[tuple[int, int]]) -> None:
"""Write multiple int32 values in a single request. Each operation is (address, value)."""
if not operations:
return
request = b''
for address, value in operations:
request += Pine.to_bytes(Pine.IPCCommand.WRITE32, 1)
request += Pine.to_bytes(address, 4)
request += value.to_bytes(length=4, byteorder="little")
total_size = len(request) + 4
request = Pine.to_bytes(total_size, 4) + request
self._send_request(request)
def batch_write_float(self, operations: list[tuple[int, float]]) -> None:
"""Write multiple float values in a single request. Each operation is (address, value)."""
if not operations:
return
request = b''
for address, value in operations:
request += Pine.to_bytes(Pine.IPCCommand.WRITE32, 1)
request += Pine.to_bytes(address, 4)
request += struct.pack("<f", value)
total_size = len(request) + 4
request = Pine.to_bytes(total_size, 4) + request
self._send_request(request)
def get_game_id(self) -> str:
request = Pine.to_bytes(5, 4) + Pine.to_bytes(Pine.IPCCommand.ID, 1)
response = self._send_request(request)
try:
response = self._send_request(request)
except ConnectionError:
return ""
return response[9:-1].decode("ascii")
def _send_request(self, request: bytes) -> bytes:
@@ -247,4 +362,3 @@ class Pine:
@staticmethod
def from_bytes(arr: bytes) -> int:
return int.from_bytes(arr, byteorder="little")