365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
|
The PINE API.
|
|
This is the client side implementation of the PINE protocol.
|
|
It allows for a three-way communication between the emulated game, the emulator and an external
|
|
tool, using the external tool as a relay for all communication. It is a socket based IPC that
|
|
is _very_ fast.
|
|
|
|
If you want to draw comparisons you can think of this as an equivalent of the BizHawk LUA API,
|
|
although with the logic out of the core and in an external tool. While BizHawk would run a lua
|
|
script at each frame in the core of the emulator we opt instead to keep the entire logic out of
|
|
the emulator to make it more easily extensible, more portable, require less code and be more
|
|
performant.
|
|
"""
|
|
import os
|
|
import struct
|
|
from enum import IntEnum
|
|
from platform import system
|
|
import socket
|
|
|
|
|
|
class Pine:
|
|
""" Exposes PS2 memory within a running instance of the PCSX2 emulator using the Pine IPC Protocol. """
|
|
|
|
""" Maximum memory used by an IPC message request. Equivalent to 50,000 Write64 requests. """
|
|
MAX_IPC_SIZE: int = 650000
|
|
|
|
""" Maximum memory used by an IPC message reply. Equivalent to 50,000 Read64 replies. """
|
|
MAX_IPC_RETURN_SIZE: int = 450000
|
|
|
|
""" Maximum number of commands sent in a batch message. """
|
|
MAX_BATCH_REPLY_COUNT: int = 50000
|
|
|
|
class IPCResult(IntEnum):
|
|
""" IPC result codes. A list of possible result codes the IPC can send back. Each one of them is what we call an
|
|
"opcode" or "tag" and is the first byte sent by the IPC to differentiate between results.
|
|
"""
|
|
IPC_OK = 0 # IPC command successfully completed.
|
|
IPC_FAIL = 0xFF # IPC command failed to complete.
|
|
|
|
class IPCCommand(IntEnum):
|
|
READ8 = 0
|
|
READ16 = 1
|
|
READ32 = 2
|
|
READ64 = 3
|
|
WRITE8 = 4
|
|
WRITE16 = 5
|
|
WRITE32 = 6
|
|
WRITE64 = 7
|
|
VERSION = 8
|
|
SAVE_STATE = 9
|
|
LOAD_STATE = 0xA
|
|
TITLE = 0xB
|
|
ID = 0xC
|
|
UUID = 0xD
|
|
GAME_VERSION = 0xE
|
|
STATUS = 0xF
|
|
UNIMPLEMENTED = 0xFF
|
|
|
|
class DataSize(IntEnum):
|
|
INT8 = 1
|
|
INT16 = 2
|
|
INT32 = 4
|
|
INT64 = 8
|
|
|
|
def __init__(self, slot: int = 28011):
|
|
if not 0 < slot <= 65536:
|
|
raise ValueError("Provided slot number is outside valid range")
|
|
self._slot: int = slot
|
|
self._sock: socket.socket = socket.socket()
|
|
self._sock_state: bool = False
|
|
# self._init_socket()
|
|
|
|
def _init_socket(self) -> None:
|
|
if system() == "Windows":
|
|
socket_family = socket.AF_INET
|
|
socket_name = ("127.0.0.1", self._slot)
|
|
elif system() == "Linux":
|
|
socket_family = socket.AF_UNIX
|
|
socket_name = os.environ.get("XDG_RUNTIME_DIR", "/tmp")
|
|
socket_name += "/pcsx2.sock"
|
|
elif system() == "Darwin":
|
|
socket_family = socket.AF_UNIX
|
|
socket_name = os.environ.get("TMPDIR", "/tmp")
|
|
socket_name += "/pcsx2.sock"
|
|
else:
|
|
socket_family = socket.AF_UNIX
|
|
socket_name = "/tmp/pcsx2.sock"
|
|
|
|
try:
|
|
self._sock = socket.socket(socket_family, socket.SOCK_STREAM)
|
|
self._sock.settimeout(5.0)
|
|
self._sock.connect(socket_name)
|
|
except socket.error:
|
|
self._sock.close()
|
|
self._sock_state = False
|
|
return
|
|
|
|
self._sock_state = True
|
|
|
|
def connect(self) -> None:
|
|
if not self._sock_state:
|
|
self._init_socket()
|
|
|
|
def disconnect(self) -> None:
|
|
if self._sock_state:
|
|
self._sock.close()
|
|
|
|
def is_connected(self) -> bool:
|
|
return self._sock_state
|
|
|
|
def read_int8(self, address: int) -> int:
|
|
request = Pine._create_request(Pine.IPCCommand.READ8, address, 9)
|
|
return Pine.from_bytes(self._send_request(request)[-1:])
|
|
|
|
def read_int16(self, address) -> int:
|
|
request = Pine._create_request(Pine.IPCCommand.READ16, address, 9)
|
|
return Pine.from_bytes(self._send_request(request)[-2:])
|
|
|
|
def read_int32(self, address) -> int:
|
|
request = Pine._create_request(Pine.IPCCommand.READ32, address, 9)
|
|
return Pine.from_bytes(self._send_request(request)[-4:])
|
|
|
|
def read_int64(self, address) -> int:
|
|
request = Pine._create_request(Pine.IPCCommand.READ64, address, 9)
|
|
return Pine.from_bytes(self._send_request(request)[-8:])
|
|
|
|
def read_bytes(self, address: int, length: int) -> bytes:
|
|
"""Careful! This can be quite slow for large reads"""
|
|
data = b''
|
|
while len(data) < length:
|
|
if length - len(data) >= 8:
|
|
data += self._send_request(Pine._create_request(Pine.IPCCommand.READ64, address + len(data), 9))[-8:]
|
|
elif length - len(data) >= 4:
|
|
data += self._send_request(Pine._create_request(Pine.IPCCommand.READ32, address + len(data), 9))[-4:]
|
|
elif length - len(data) >= 2:
|
|
data += self._send_request(Pine._create_request(Pine.IPCCommand.READ16, address + len(data), 9))[-2:]
|
|
elif length - len(data) >= 1:
|
|
data += self._send_request(Pine._create_request(Pine.IPCCommand.READ8, address + len(data), 9))[-1:]
|
|
|
|
return data
|
|
|
|
def write_int8(self, address: int, value: int) -> None:
|
|
request = Pine._create_request(Pine.IPCCommand.WRITE8, address, 9 + Pine.DataSize.INT8)
|
|
request += value.to_bytes(length=1, byteorder="little")
|
|
self._send_request(request)
|
|
|
|
def write_int16(self, address: int, value: int) -> None:
|
|
request = Pine._create_request(Pine.IPCCommand.WRITE16, address, 9 + Pine.DataSize.INT16)
|
|
request += value.to_bytes(length=2, byteorder="little")
|
|
self._send_request(request)
|
|
|
|
def write_int32(self, address: int, value: int) -> None:
|
|
request = Pine._create_request(Pine.IPCCommand.WRITE32, address, 9 + Pine.DataSize.INT32)
|
|
request += value.to_bytes(length=4, byteorder="little")
|
|
self._send_request(request)
|
|
|
|
def write_int64(self, address: int, value: int) -> None:
|
|
request = Pine._create_request(Pine.IPCCommand.WRITE64, address, 9 + Pine.DataSize.INT64)
|
|
request += value.to_bytes(length=8, byteorder="little")
|
|
self._send_request(request)
|
|
|
|
def write_float(self, address: int, value: float) -> None:
|
|
request = Pine._create_request(Pine.IPCCommand.WRITE32, address, 9 + Pine.DataSize.INT32)
|
|
request += struct.pack("<f", value)
|
|
self._send_request(request)
|
|
|
|
def write_bytes(self, address: int, data: bytes) -> None:
|
|
"""Careful! This can be quite slow for large writes"""
|
|
bytes_written = 0
|
|
while bytes_written < len(data):
|
|
if len(data) - bytes_written >= 8:
|
|
request = self._create_request(Pine.IPCCommand.WRITE64, address + bytes_written, 9 + Pine.DataSize.INT64)
|
|
request += data[bytes_written:bytes_written + 8]
|
|
self._send_request(request)
|
|
bytes_written += 8
|
|
elif len(data) - bytes_written >= 4:
|
|
request = self._create_request(Pine.IPCCommand.WRITE32, address + bytes_written, 9 + Pine.DataSize.INT32)
|
|
request += data[bytes_written:bytes_written + 4]
|
|
self._send_request(request)
|
|
bytes_written += 4
|
|
elif len(data) - bytes_written >= 2:
|
|
request = self._create_request(Pine.IPCCommand.WRITE16, address + bytes_written, 9 + Pine.DataSize.INT16)
|
|
request += data[bytes_written:bytes_written + 2]
|
|
self._send_request(request)
|
|
bytes_written += 2
|
|
elif len(data) - bytes_written >= 1:
|
|
request = self._create_request(Pine.IPCCommand.WRITE8, address + bytes_written, 9 + Pine.DataSize.INT8)
|
|
request += data[bytes_written:bytes_written + 1]
|
|
self._send_request(request)
|
|
bytes_written += 1
|
|
|
|
def batch_read_int32(self, addresses: list[int]) -> list[int]:
|
|
"""Read multiple int32 values in a single request. Much faster than individual reads."""
|
|
if not addresses:
|
|
return []
|
|
|
|
# Build batched request
|
|
request = b''
|
|
for address in addresses:
|
|
request += Pine.to_bytes(Pine.IPCCommand.READ32, 1)
|
|
request += Pine.to_bytes(address, 4)
|
|
|
|
# Prepend total size
|
|
total_size = len(request) + 4
|
|
request = Pine.to_bytes(total_size, 4) + request
|
|
|
|
response = self._send_request(request)
|
|
|
|
# Parse results - one status byte for the entire batch, then data
|
|
results = []
|
|
offset = 4 # Skip overall size header
|
|
offset += 1 # Skip single status byte for entire batch
|
|
for _ in addresses:
|
|
value = Pine.from_bytes(response[offset:offset + 4])
|
|
results.append(value)
|
|
offset += 4
|
|
|
|
return results
|
|
|
|
def batch_read_int16(self, addresses: list[int]) -> list[int]:
|
|
"""Read multiple int16 values in a single request."""
|
|
if not addresses:
|
|
return []
|
|
|
|
request = b''
|
|
for address in addresses:
|
|
request += Pine.to_bytes(Pine.IPCCommand.READ16, 1)
|
|
request += Pine.to_bytes(address, 4)
|
|
|
|
total_size = len(request) + 4
|
|
request = Pine.to_bytes(total_size, 4) + request
|
|
|
|
response = self._send_request(request)
|
|
|
|
# Parse results - one status byte for the entire batch, then data
|
|
results = []
|
|
offset = 4 # Skip overall size header
|
|
offset += 1 # Skip single status byte for entire batch
|
|
for _ in addresses:
|
|
value = Pine.from_bytes(response[offset:offset + 2])
|
|
results.append(value)
|
|
offset += 2
|
|
|
|
return results
|
|
|
|
def batch_read_int8(self, addresses: list[int]) -> list[int]:
|
|
"""Read multiple int8 values in a single request."""
|
|
if not addresses:
|
|
return []
|
|
|
|
request = b''
|
|
for address in addresses:
|
|
request += Pine.to_bytes(Pine.IPCCommand.READ8, 1)
|
|
request += Pine.to_bytes(address, 4)
|
|
|
|
total_size = len(request) + 4
|
|
request = Pine.to_bytes(total_size, 4) + request
|
|
|
|
response = self._send_request(request)
|
|
|
|
# Parse results - one status byte for the entire batch, then data
|
|
results = []
|
|
offset = 4 # Skip overall size header
|
|
offset += 1 # Skip single status byte for entire batch
|
|
for _ in addresses:
|
|
value = Pine.from_bytes(response[offset:offset + 1])
|
|
results.append(value)
|
|
offset += 1
|
|
|
|
return results
|
|
|
|
def batch_write_int32(self, operations: list[tuple[int, int]]) -> None:
|
|
"""Write multiple int32 values in a single request. Each operation is (address, value)."""
|
|
if not operations:
|
|
return
|
|
|
|
request = b''
|
|
for address, value in operations:
|
|
request += Pine.to_bytes(Pine.IPCCommand.WRITE32, 1)
|
|
request += Pine.to_bytes(address, 4)
|
|
request += value.to_bytes(length=4, byteorder="little")
|
|
|
|
total_size = len(request) + 4
|
|
request = Pine.to_bytes(total_size, 4) + request
|
|
|
|
self._send_request(request)
|
|
|
|
def batch_write_float(self, operations: list[tuple[int, float]]) -> None:
|
|
"""Write multiple float values in a single request. Each operation is (address, value)."""
|
|
if not operations:
|
|
return
|
|
|
|
request = b''
|
|
for address, value in operations:
|
|
request += Pine.to_bytes(Pine.IPCCommand.WRITE32, 1)
|
|
request += Pine.to_bytes(address, 4)
|
|
request += struct.pack("<f", value)
|
|
|
|
total_size = len(request) + 4
|
|
request = Pine.to_bytes(total_size, 4) + request
|
|
|
|
self._send_request(request)
|
|
|
|
def get_game_id(self) -> str:
|
|
request = Pine.to_bytes(5, 4) + Pine.to_bytes(Pine.IPCCommand.ID, 1)
|
|
try:
|
|
response = self._send_request(request)
|
|
except ConnectionError:
|
|
return ""
|
|
return response[9:-1].decode("ascii")
|
|
|
|
def _send_request(self, request: bytes) -> bytes:
|
|
if not self._sock_state:
|
|
self._init_socket()
|
|
|
|
try:
|
|
self._sock.sendall(request)
|
|
except socket.error:
|
|
self._sock.close()
|
|
self._sock_state = False
|
|
raise ConnectionError("Lost connection to PCSX2.")
|
|
|
|
end_length = 4
|
|
result: bytes = b''
|
|
while len(result) < end_length:
|
|
try:
|
|
response = self._sock.recv(4096)
|
|
except TimeoutError:
|
|
raise TimeoutError("Response timed out. "
|
|
"This might be caused by having two PINE connections open on the same slot")
|
|
|
|
if len(response) <= 0:
|
|
result = b''
|
|
break
|
|
|
|
result += response
|
|
|
|
if end_length == 4 and len(response) >= 4:
|
|
end_length = Pine.from_bytes(result[0:4])
|
|
if end_length > Pine.MAX_IPC_SIZE:
|
|
result = b''
|
|
break
|
|
|
|
if len(result) == 0:
|
|
raise ConnectionError("Invalid response from PCSX2.")
|
|
if result[4] == Pine.IPCResult.IPC_FAIL:
|
|
raise ConnectionError("Failure indicated in PCSX2 response.")
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _create_request(command: IPCCommand, address: int, size: int = 0) -> bytes:
|
|
ipc = Pine.to_bytes(size, 4)
|
|
ipc += Pine.to_bytes(command, 1)
|
|
ipc += Pine.to_bytes(address, 4)
|
|
return ipc
|
|
|
|
@staticmethod
|
|
def to_bytes(value: int, size: int) -> bytes:
|
|
return value.to_bytes(length=size, byteorder="little")
|
|
|
|
@staticmethod
|
|
def from_bytes(arr: bytes) -> int:
|
|
return int.from_bytes(arr, byteorder="little")
|