Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import os |
| 2 | import hashlib |
| 3 | import tempfile |
| 4 | import shutil |
| 5 | from io import FileIO, BytesIO |
| 6 | import struct |
| 7 | import logging |
| 8 | |
| 9 | logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| 10 | logger = logging.getLogger(__name__) |
| 11 | |
| 12 | class Archiver: |
| 13 | def __init__(self, index, name, offset, size, bytes_data=None): |
| 14 | self.index = index |
| 15 | self.name = name |
| 16 | self.offset = offset |
| 17 | self.size = size |
| 18 | self.bytes_data = bytes_data |
| 19 | |
| 20 | class TRPReader: |
| 21 | class TRPHeader: |
| 22 | def __init__(self): |
| 23 | self.magic = None |
| 24 | self.version = None |
| 25 | self.file_size = None |
| 26 | self.files_count = None |
| 27 | self.element_size = None |
| 28 | self.dev_flag = None |
| 29 | self.padding = None |
| 30 | self.sha1 = None |
| 31 | |
| 32 | def __init__(self, filename=None): |
| 33 | self._hdr = self.TRPHeader() |
| 34 | self._trophyList = [] |
| 35 | self._hdrmagic = { |
| 36 | bytes([220, 162, 77, 0]), # Original magic number |
| 37 | bytes([5, 216, 3, 164]), # New magic number (a403d805 in little-endian) |
| 38 | bytes([126, 237, 245, 255]) # New magic number (fff5ed7e in little-endian) |
| 39 | } |
| 40 | self._iserror = False |
| 41 | self._readbytes = False |
| 42 | self._throwerror = True |
| 43 | self._error = "" |
| 44 | self._calculatedsha1 = None |
| 45 | self._inputfile = filename |
| 46 | self._title = None |
| 47 | self._npcommid = None |
| 48 | self._temp_dir = None |
| 49 | if filename: |
| 50 | self.load(filename) |
| 51 | |
| 52 | def load(self, filename=None): |
| 53 | if filename is None and self._inputfile is None: |
| 54 | raise ValueError("Filename must be provided either in the constructor or in the load method") |
| 55 | |
| 56 | if filename is not None: |
| 57 | self._inputfile = filename |
| 58 | |
| 59 | try: |
| 60 | self._iserror = False |
| 61 | self._calculatedsha1 = None |
| 62 | self._trophyList = [] |
| 63 | |
| 64 | if not os.path.exists(self._inputfile): |
| 65 | raise FileNotFoundError(f"File not found: {self._inputfile}") |
| 66 | |
| 67 | self.verify_file_structure() |
| 68 | |
| 69 | with open(self._inputfile, 'rb') as fs: |
| 70 | self.read_content(fs) |
| 71 | # Ensure that self._title is set here or in read_content |
| 72 | if self._title is None: |
| 73 | self._title = "Unknown Title" # Or an appropriate default value |
| 74 | except Exception as e: |
| 75 | self._iserror = True |
| 76 | self._error = str(e) |
| 77 | logger.error(f"Error loading trophy file: {self._error}") |
| 78 | |
| 79 | if self._iserror and self._throwerror: |
| 80 | raise Exception(self._error) |
| 81 | |
| 82 | def read_header(self, fs): |
| 83 | try: |
| 84 | self._hdr.magic = fs.read(4) |
| 85 | self._hdr.version = fs.read(4) |
| 86 | self._hdr.file_size = fs.read(8) |
| 87 | self._hdr.files_count = fs.read(4) |
| 88 | self._hdr.element_size = fs.read(4) |
| 89 | self._hdr.dev_flag = fs.read(4) |
| 90 | |
| 91 | version = self.bytes_to_int(self._hdr.version, 32) |
| 92 | file_size = self.bytes_to_int(self._hdr.file_size, 64) |
| 93 | files_count = self.bytes_to_int(self._hdr.files_count, 32) |
| 94 | |
| 95 | logger.debug(f"Header: magic={self._hdr.magic.hex()}, version={version}, file_size={file_size}, files_count={files_count}") |
| 96 | |
| 97 | if version == 1: |
| 98 | self._hdr.padding = fs.read(36) |
| 99 | elif version == 2: |
| 100 | self._hdr.sha1 = fs.read(20) |
| 101 | self._hdr.padding = fs.read(16) |
| 102 | elif version == 3: |
| 103 | self._hdr.sha1 = fs.read(20) |
| 104 | self._hdr.padding = fs.read(48) |
| 105 | else: |
| 106 | raise ValueError(f"Invalid version: {version}") |
| 107 | except Exception as e: |
| 108 | logger.error(f"Error reading header: {e}") |
| 109 | raise |
| 110 | |
| 111 | def read_content(self, fs): |
| 112 | fs.seek(0) |
| 113 | data = fs.read() |
| 114 | png_signature = b'\x89PNG\r\n\x1a\n' |
| 115 | esfm_signature = b'ESFM' |
| 116 | ucp_signature = b'\x00\x00\x00\x00' # Magic number for Trophy00.ucp |
| 117 | |
| 118 | i = 0 |
| 119 | while i < len(data): |
| 120 | if data[i:i+8] == png_signature: |
| 121 | offset = i |
| 122 | size = self.get_png_size(data[i:]) |
| 123 | if size: |
| 124 | name = f"TROP{len(self._trophyList):03d}.PNG" |
| 125 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 126 | logger.info(f"Found PNG image '{name}' at offset 0x{offset:X}, size {size}") |
| 127 | i += size |
| 128 | else: |
| 129 | i += 1 |
| 130 | elif data[i:i+4] == esfm_signature: |
| 131 | offset = i |
| 132 | try: |
| 133 | size = struct.unpack('>I', data[i+4:i+8])[0] + 8 # ESFM header (4 bytes) + size (4 bytes) |
| 134 | if size > 0 and size < len(data) - i: |
| 135 | name = f"FILE{len(self._trophyList):03d}.ESFM" |
| 136 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 137 | logger.info(f"Found ESFM file '{name}' at offset 0x{offset:X}, size {size}") |
| 138 | i += size |
| 139 | else: |
| 140 | logger.warning(f"Invalid ESFM size at offset 0x{offset:X}: {size}") |
| 141 | i += 1 |
| 142 | except struct.error: |
| 143 | logger.warning(f"Unable to read ESFM size at offset 0x{offset:X}") |
| 144 | i += 1 |
| 145 | elif data[i:i+4] == ucp_signature: |
| 146 | offset = i |
| 147 | try: |
| 148 | size = struct.unpack('>I', data[i+4:i+8])[0] + 8 # UCP header (4 bytes) + size (4 bytes) |
| 149 | if size > 0 and size < len(data) - i: |
| 150 | name = f"TROPHY{len(self._trophyList):03d}.UCP" |
| 151 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 152 | logger.info(f"Found UCP file '{name}' at offset 0x{offset:X}, size {size}") |
| 153 | i += size |
| 154 | else: |
| 155 | logger.warning(f"Invalid UCP size at offset 0x{offset:X}: {size}") |
| 156 | i += 1 |
| 157 | except struct.error: |
| 158 | logger.warning(f"Unable to read UCP size at offset 0x{offset:X}") |
| 159 | i += 1 |
| 160 | else: |
| 161 | i += 1 |
| 162 | |
| 163 | if not self._trophyList: |
| 164 | logger.warning("No files found in the TRP. The file might be corrupted or empty.") |
| 165 | else: |
| 166 | logger.info(f"Found {len(self._trophyList)} files") |
| 167 | |
| 168 | # Update the file count in the header |
| 169 | self._hdr.files_count = len(self._trophyList).to_bytes(4, byteorder='little') |
| 170 | |
| 171 | def get_png_size(self, data): |
| 172 | try: |
| 173 | idx = data.index(b'IEND') |
| 174 | return idx + 12 # IEND chunk is 12 bytes long, including the 4-byte CRC |
| 175 | except ValueError: |
| 176 | return None |
| 177 | |
| 178 | @property |
| 179 | def read_bytes(self): |
| 180 | return self._readbytes |
| 181 | |
| 182 | @read_bytes.setter |
| 183 | def read_bytes(self, value): |
| 184 | self._readbytes = value |
| 185 | |
| 186 | @property |
| 187 | def trophy_list(self): |
| 188 | return self._trophyList |
| 189 | |
| 190 | @property |
| 191 | def file_size(self): |
| 192 | return self.bytes_to_int(self._hdr.file_size, 64) |
| 193 | |
| 194 | @property |
| 195 | def file_count(self): |
| 196 | return self.bytes_to_int(self._hdr.files_count, 32) |
| 197 | |
| 198 | @property |
| 199 | def version(self): |
| 200 | return self.bytes_to_int(self._hdr.version, 32) |
| 201 | |
| 202 | @property |
| 203 | def sha1(self): |
| 204 | if self.version <= 1: |
| 205 | return None |
| 206 | return self.byte_array_to_hex_string(self._hdr.sha1) |
| 207 | |
| 208 | @property |
| 209 | def calculated_sha1(self): |
| 210 | return self._calculatedsha1 |
| 211 | |
| 212 | @property |
| 213 | def is_error(self): |
| 214 | return self._iserror |
| 215 | |
| 216 | @property |
| 217 | def throw_error(self): |
| 218 | return self._throwerror |
| 219 | |
| 220 | @throw_error.setter |
| 221 | def throw_error(self, value): |
| 222 | self._throwerror = value |
| 223 | |
| 224 | @property |
| 225 | def title(self): |
| 226 | return self._title |
| 227 | |
| 228 | @title.setter |
| 229 | def title(self, value): |
| 230 | self._title = value |
| 231 | |
| 232 | @property |
| 233 | def np_comm_id(self): |
| 234 | return self._npcommid |
| 235 | |
| 236 | @np_comm_id.setter |
| 237 | def np_comm_id(self, value): |
| 238 | self._npcommid = value |
| 239 | |
| 240 | def extract_file_to_memory(self, filename): |
| 241 | archiver = next((a for a in self._trophyList if a.name.upper().startswith(filename.upper())), None) |
| 242 | if archiver is None: |
| 243 | return None |
| 244 | |
| 245 | with open(self._inputfile, 'rb') as fs: |
| 246 | fs.seek(archiver.offset) |
| 247 | return fs.read(archiver.size) |
| 248 | |
| 249 | def byte_arrays_equal(self, first, second): |
| 250 | if first == second: |
| 251 | return True |
| 252 | if len(first) != len(second): |
| 253 | return False |
| 254 | return all(a == b for a, b in zip(first, second)) |
| 255 | |
| 256 | @staticmethod |
| 257 | def byte_array_to_little_endian_int(byte_array): |
| 258 | return int.from_bytes(byte_array, byteorder='little') |
| 259 | |
| 260 | @staticmethod |
| 261 | def byte_array_to_utf8_string(byte_array, errors='ignore'): |
| 262 | return ''.join(chr(b) for b in byte_array if 32 <= b <= 126 or b in (9, 10, 13)) |
| 263 | |
| 264 | @staticmethod |
| 265 | def byte_array_to_hex_string(byte_array): |
| 266 | return ''.join(f'{b:02x}' for b in byte_array) |
| 267 | |
| 268 | @staticmethod |
| 269 | def hex_string_to_long(hex_string): |
| 270 | return int(hex_string, 16) |
| 271 | |
| 272 | def calculate_sha1_hash(self): |
| 273 | if self.version <= 1: |
| 274 | return None |
| 275 | |
| 276 | sha1 = hashlib.sha1() |
| 277 | with open(self._inputfile, 'rb') as fs: |
| 278 | sha1.update(fs.read(28)) |
| 279 | fs.seek(48) |
| 280 | while chunk := fs.read(8192): |
| 281 | sha1.update(chunk) |
| 282 | return sha1.hexdigest() |
| 283 | |
| 284 | def extract(self): |
| 285 | if self._inputfile is None: |
| 286 | raise ValueError("No input file specified") |
| 287 | |
| 288 | input_dir = os.path.dirname(self._inputfile) |
| 289 | input_filename = os.path.splitext(os.path.basename(self._inputfile))[0] |
| 290 | |
| 291 | if self._temp_dir is None: |
| 292 | self._temp_dir = tempfile.mkdtemp(prefix=f"{input_filename}_extracted_", dir=input_dir) |
| 293 | |
| 294 | with open(self._inputfile, 'rb') as fs: |
| 295 | for archiver in self._trophyList: |
| 296 | fs.seek(archiver.offset) |
| 297 | data = fs.read(archiver.size) |
| 298 | output_file = os.path.join(self._temp_dir, archiver.name) |
| 299 | with open(output_file, 'wb') as out: |
| 300 | out.write(data) |
| 301 | logger.info(f"Extracted {archiver.name} to {output_file}") |
| 302 | |
| 303 | logger.info(f"All files extracted to: {self._temp_dir}") |
| 304 | return self._temp_dir |
| 305 | |
| 306 | def cleanup(self): |
| 307 | if self._temp_dir and os.path.exists(self._temp_dir): |
| 308 | try: |
| 309 | shutil.rmtree(self._temp_dir) |
| 310 | logger.info(f"Temporary directory {self._temp_dir} has been removed") |
| 311 | except Exception as e: |
| 312 | logger.error(f"Error removing temporary directory {self._temp_dir}: {e}") |
| 313 | self._temp_dir = None |
| 314 | |
| 315 | def extract_file(self, filename, outputpath, custom_name=None): |
| 316 | archiver = next((a for a in self._trophyList if a.name.upper().startswith(filename.upper())), None) |
| 317 | if archiver is None: |
| 318 | return |
| 319 | if not os.path.exists(outputpath): |
| 320 | os.makedirs(outputpath) |
| 321 | |
| 322 | with open(self._inputfile, 'rb') as fs: |
| 323 | fs.seek(archiver.offset) |
| 324 | data = fs.read(archiver.size) |
| 325 | output_file = os.path.join(outputpath, custom_name or archiver.name) |
| 326 | with open(output_file, 'wb') as out: |
| 327 | out.write(data) |
| 328 | |
| 329 | def verify_integrity(self): |
| 330 | if self.version > 1 and self.sha1: |
| 331 | calculated_sha1 = self.calculate_sha1_hash() |
| 332 | if calculated_sha1.lower() != self.sha1.lower(): |
| 333 | print(f"Warning: SHA1 mismatch. File may be corrupted.") |
| 334 | print(f"Calculated: {calculated_sha1}") |
| 335 | print(f"Expected: {self.sha1}") |
| 336 | else: |
| 337 | print("SHA1 verification passed.") |
| 338 | |
| 339 | expected_size = self.file_size |
| 340 | actual_size = os.path.getsize(self._inputfile) |
| 341 | if expected_size != actual_size: |
| 342 | print(f"Warning: File size mismatch. Expected: {expected_size}, Actual: {actual_size}") |
| 343 | |
| 344 | if len(self._trophyList) != self.file_count: |
| 345 | print(f"Warning: Trophy count mismatch. Expected: {self.file_count}, Actual: {len(self._trophyList)}") |
| 346 | |
| 347 | return self._trophyList |
| 348 | |
| 349 | def verify_trophy_data(self, name, offset, size): |
| 350 | file_size = os.path.getsize(self._inputfile) |
| 351 | if offset < 0 or size < 0 or offset + size > file_size: |
| 352 | return False |
| 353 | if offset == 0 and size == 0: |
| 354 | return False |
| 355 | if len(name.strip()) == 0: |
| 356 | return False |
| 357 | return True |
| 358 | |
| 359 | def verify_file_structure(self): |
| 360 | try: |
| 361 | actual_size = os.path.getsize(self._inputfile) |
| 362 | if actual_size < 64: # Minimum header size |
| 363 | logger.error(f"File too small: {actual_size} bytes") |
| 364 | return False |
| 365 | |
| 366 | with open(self._inputfile, 'rb') as fs: |
| 367 | magic = fs.read(4) |
| 368 | if magic not in self._hdrmagic: |
| 369 | logger.warning(f"Invalid file magic number: {magic.hex()}, but continuing anyway") |
| 370 | else: |
| 371 | logger.info(f"Valid magic number found: {magic.hex()}") |
| 372 | |
| 373 | version_bytes = fs.read(4) |
| 374 | file_size_bytes = fs.read(8) |
| 375 | files_count_bytes = fs.read(4) |
| 376 | |
| 377 | version = self.bytes_to_int(version_bytes, 32) |
| 378 | file_size = self.bytes_to_int(file_size_bytes, 64) |
| 379 | files_count = self.bytes_to_int(files_count_bytes, 32) |
| 380 | |
| 381 | logger.debug(f"Raw bytes: version={version_bytes.hex()}, file_size={file_size_bytes.hex()}, files_count={files_count_bytes.hex()}") |
| 382 | logger.debug(f"File structure: version={version}, file_size={file_size}, files_count={files_count}") |
| 383 | |
| 384 | if file_size != actual_size: |
| 385 | logger.warning(f"File size mismatch: expected {file_size}, actual {actual_size}") |
| 386 | self._hdr.file_size = actual_size.to_bytes(8, byteorder='little') |
| 387 | |
| 388 | if version not in [1, 2, 3]: |
| 389 | logger.warning(f"Invalid version: {version}, assuming version 3") |
| 390 | version = 3 |
| 391 | self._hdr.version = version.to_bytes(4, byteorder='little') |
| 392 | |
| 393 | if files_count <= 0 or files_count > 1000000: |
| 394 | logger.warning(f"Invalid file count: {files_count}, will try to extract anyway") |
| 395 | files_count = 0 # Reset the count and let read_content determine it |
| 396 | self._hdr.files_count = files_count.to_bytes(4, byteorder='little') |
| 397 | |
| 398 | return True |
| 399 | except Exception as e: |
| 400 | logger.error(f"Error during file structure verification: {e}") |
| 401 | return False |
| 402 | |
| 403 | @staticmethod |
| 404 | def bytes_to_int(bytes_data, bits=32): |
| 405 | value = int.from_bytes(bytes_data, byteorder='little', signed=False) |
| 406 | if bits == 32: |
| 407 | return value & 0xFFFFFFFF |
| 408 | elif bits == 64: |
| 409 | return value & 0xFFFFFFFFFFFFFFFF |
| 410 | else: |
| 411 | raise ValueError(f"Unsupported bit size: {bits}") |
| 412 | |
| 413 | def some_method_that_uses_title(self): |
| 414 | if self._title is None: |
| 415 | logger.warning("Title is not set") |
| 416 | return |
| 417 | |
| 418 | def get_temp_dir(self): |
| 419 | return self._temp_dir |
| 420 | |
| 421 | def read_content_flexible(self, fs): |
| 422 | fs.seek(0) |
| 423 | data = fs.read() |
| 424 | png_signature = b'\x89PNG\r\n\x1a\n' |
| 425 | esfm_signature = b'ESFM' |
| 426 | ucp_signature = b'\x00\x00\x00\x00' # Magic number for Trophy00.ucp |
| 427 | |
| 428 | i = 0 |
| 429 | while i < len(data): |
| 430 | if data[i:i+8] == png_signature: |
| 431 | offset = i |
| 432 | size = self.get_png_size(data[i:]) |
| 433 | if size: |
| 434 | name = f"TROP{len(self._trophyList):03d}.PNG" |
| 435 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 436 | logger.info(f"Found PNG image '{name}' at offset 0x{offset:X}, size {size}") |
| 437 | i += size |
| 438 | else: |
| 439 | i += 1 |
| 440 | elif data[i:i+4] == esfm_signature: |
| 441 | offset = i |
| 442 | try: |
| 443 | size = struct.unpack('>I', data[i+4:i+8])[0] + 8 |
| 444 | if size > 0 and size < len(data) - i: |
| 445 | name = f"FILE{len(self._trophyList):03d}.ESFM" |
| 446 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 447 | logger.info(f"Found ESFM file '{name}' at offset 0x{offset:X}, size {size}") |
| 448 | i += size |
| 449 | else: |
| 450 | i += 1 |
| 451 | except struct.error: |
| 452 | i += 1 |
| 453 | elif data[i:i+4] == ucp_signature: |
| 454 | offset = i |
| 455 | try: |
| 456 | size = struct.unpack('>I', data[i+4:i+8])[0] + 8 |
| 457 | if size > 0 and size < len(data) - i: |
| 458 | name = f"TROPHY{len(self._trophyList):03d}.UCP" |
| 459 | self._trophyList.append(Archiver(len(self._trophyList), name, offset, size)) |
| 460 | logger.info(f"Found UCP file '{name}' at offset 0x{offset:X}, size {size}") |
| 461 | i += size |
| 462 | else: |
| 463 | i += 1 |
| 464 | except struct.error: |
| 465 | i += 1 |
| 466 | else: |
| 467 | i += 1 |
| 468 | |
| 469 | if not self._trophyList: |
| 470 | logger.warning("No files found in the TRP. The file might be corrupted or empty.") |
| 471 | else: |
| 472 | logger.info(f"Found {len(self._trophyList)} files") |
| 473 | |
| 474 | self._hdr.files_count = len(self._trophyList).to_bytes(4, byteorder='little') |
| 475 | |
| 476 | def decrypt_trp(self, input_file, output_dir): |
| 477 | """Decrypt and extract TRP file contents""" |
| 478 | try: |
| 479 | # Carica il file TRP se non è già stato caricato |
| 480 | if not self._trophyList: |
| 481 | self.load(input_file) |
| 482 | |
| 483 | # Crea la directory di output se non esiste |
| 484 | os.makedirs(output_dir, exist_ok=True) |
| 485 | |
| 486 | # Estrai ogni file trovato |
| 487 | for trophy in self._trophyList: |
| 488 | try: |
| 489 | # Determina il tipo di file dall'estensione |
| 490 | file_ext = os.path.splitext(trophy.name)[1].lower() |
| 491 | |
| 492 | # Leggi i dati del file |
| 493 | with open(input_file, 'rb') as f: |
| 494 | f.seek(trophy.offset) |
| 495 | data = f.read(trophy.size) |
| 496 | |
| 497 | # Determina il nome del file di output |
| 498 | if "TROP" in trophy.name.upper(): |
| 499 | output_name = f"trophy_{trophy.index:03d}{file_ext}" |
| 500 | else: |
| 501 | output_name = trophy.name |
| 502 | |
| 503 | # Salva il file |
| 504 | output_path = os.path.join(output_dir, output_name) |
| 505 | with open(output_path, 'wb') as f: |
| 506 | f.write(data) |
| 507 | |
| 508 | logging.info(f"Extracted: {output_name}") |
| 509 | |
| 510 | except Exception as e: |
| 511 | logging.error(f"Error extracting {trophy.name}: {e}") |
| 512 | continue |
| 513 | |
| 514 | return "Trophy file decrypted and extracted successfully" |
| 515 | |
| 516 | except Exception as e: |
| 517 | error_msg = f"Error decrypting TRP: {str(e)}" |
| 518 | logging.error(error_msg) |
| 519 | raise Exception(error_msg) |