Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import struct |
| 2 | from .package_base import PackageBase |
| 3 | from .enums import DRMType, ContentType, IROTag |
| 4 | from Utilities import Logger |
| 5 | import os |
| 6 | import shutil |
| 7 | import logging |
| 8 | import subprocess |
| 9 | from .crypto_utils import AES_ctx, AES_set_key, AES_cbc_decrypt, AES_KEY_LEN_128 |
| 10 | |
| 11 | |
| 12 | |
| 13 | class PackagePS4(PackageBase): |
| 14 | MAGIC_PS4 = 0x7f434E54 # ?CNT for PS4 |
| 15 | |
| 16 | def __init__(self, file: str): |
| 17 | super().__init__(file) |
| 18 | self.is_ps4 = False |
| 19 | self.pkg_magic = None |
| 20 | self.pkg_type = None |
| 21 | self.pkg_file_count = 0 |
| 22 | self.pkg_entry_count = 0 |
| 23 | self.pkg_sc_entry_count = 0 |
| 24 | self.pkg_entry_count_2 = 0 |
| 25 | self.pkg_table_offset = 0 |
| 26 | self.pkg_entry_data_size = 0 |
| 27 | self.pkg_body_offset = 0 |
| 28 | self.pkg_body_size = 0 |
| 29 | self.pkg_content_offset = 0 |
| 30 | self.pkg_content_size = 0 |
| 31 | self.pkg_drm_type = None |
| 32 | self.pkg_content_type = None |
| 33 | self.pkg_content_flags = None |
| 34 | self.pkg_promote_size = 0 |
| 35 | self.pkg_version_date = None |
| 36 | self.pkg_version_hash = None |
| 37 | self.pkg_iro_tag = None |
| 38 | self.pkg_drm_type_version = None |
| 39 | self.pkg_content_id = None |
| 40 | |
| 41 | with open(file, "rb") as fp: |
| 42 | magic = struct.unpack(">I", fp.read(4))[0] |
| 43 | Logger.log_information(f"Read magic number: {magic:08X}") |
| 44 | if magic == self.MAGIC_PS4: |
| 45 | self.is_ps4 = True |
| 46 | self._load_ps4_pkg(fp) |
| 47 | else: |
| 48 | raise ValueError(f"Unknown PKG format: {magic:08X}") |
| 49 | |
| 50 | def _load_ps4_pkg(self, fp): |
| 51 | try: |
| 52 | header_format = ">5I2H2I4Q36s12s12I" |
| 53 | fp.seek(0) |
| 54 | data = fp.read(struct.calcsize(header_format)) |
| 55 | |
| 56 | (self.pkg_magic, self.pkg_type, self.pkg_0x008, self.pkg_file_count, self.pkg_entry_count, |
| 57 | self.pkg_sc_entry_count, self.pkg_entry_count_2, self.pkg_table_offset, self.pkg_entry_data_size, |
| 58 | self.pkg_body_offset, self.pkg_body_size, self.pkg_content_offset, self.pkg_content_size, |
| 59 | self.pkg_content_id, self.pkg_padding, self.pkg_drm_type, self.pkg_content_type, |
| 60 | self.pkg_content_flags, self.pkg_promote_size, self.pkg_version_date, self.pkg_version_hash, |
| 61 | self.pkg_0x088, self.pkg_0x08C, self.pkg_0x090, self.pkg_0x094, self.pkg_iro_tag, |
| 62 | self.pkg_drm_type_version) = struct.unpack(header_format, data) |
| 63 | |
| 64 | Logger.log_information(f"Loaded header data: {self.pkg_magic}, {self.pkg_type}, {self.pkg_drm_type}, {self.pkg_content_type}") |
| 65 | |
| 66 | self.pkg_content_id = self._safe_decode(self.pkg_content_id) |
| 67 | self.content_id = self.pkg_content_id |
| 68 | |
| 69 | fp.seek(0x100, os.SEEK_SET) |
| 70 | data = fp.read(128) |
| 71 | self.digests = [data[i:i+32].hex() for i in range(0, 128, 32)] |
| 72 | |
| 73 | try: |
| 74 | self.pkg_content_type = ContentType(self.pkg_content_type) |
| 75 | except ValueError: |
| 76 | Logger.log_warning(f"Warning: {self.pkg_content_type} is not a valid ContentType. Setting to None.") |
| 77 | self.pkg_content_type = None |
| 78 | |
| 79 | try: |
| 80 | self.pkg_iro_tag = IROTag(self.pkg_iro_tag) |
| 81 | except ValueError: |
| 82 | Logger.log_warning(f"Warning: {self.pkg_iro_tag} is not a valid IROTag. Setting to None.") |
| 83 | self.pkg_iro_tag = None |
| 84 | self.invalid_irotag = True |
| 85 | else: |
| 86 | self.invalid_irotag = False |
| 87 | |
| 88 | self.__load_files(fp) |
| 89 | self.files = self.files |
| 90 | except Exception as e: |
| 91 | Logger.log_error(f"Error loading PS4 PKG file: {str(e)}") |
| 92 | raise ValueError(f"Error loading PS4 PKG file: {str(e)}") |
| 93 | |
| 94 | def __load_files(self, fp): |
| 95 | old_pos = fp.tell() |
| 96 | fp.seek(self.pkg_table_offset, os.SEEK_SET) |
| 97 | |
| 98 | entry_format = ">6IQ" |
| 99 | self.files = {} |
| 100 | for i in range(self.pkg_entry_count): |
| 101 | entry_data = fp.read(struct.calcsize(entry_format)) |
| 102 | file_id, filename_offset, flags1, flags2, offset, size, padding = struct.unpack(entry_format, entry_data) |
| 103 | self.files[file_id] = { |
| 104 | "id": file_id, |
| 105 | "fn_offset": filename_offset, |
| 106 | "flags1": flags1, |
| 107 | "flags2": flags2, |
| 108 | "offset": offset, |
| 109 | "size": size, |
| 110 | "padding": padding, |
| 111 | "key_idx": (flags2 & 0xF00) >> 12, |
| 112 | "encrypted": (flags1 & PackageBase.FLAG_ENCRYPTED) == PackageBase.FLAG_ENCRYPTED |
| 113 | } |
| 114 | |
| 115 | for key, file in self.files.items(): |
| 116 | try: |
| 117 | fp.seek(self.files[0x200]["offset"] + file["fn_offset"]) |
| 118 | fn = self._read_null_terminated_string(fp) |
| 119 | if fn: |
| 120 | try: |
| 121 | self.files[key]["name"] = self._safe_decode(fn) |
| 122 | except Exception as e: |
| 123 | Logger.log_warning(f"Failed to decode filename for file ID {key}: {e}") |
| 124 | self.files[key]["name"] = f"file_{key}" |
| 125 | except (OverflowError, OSError) as e: |
| 126 | Logger.log_warning(f"Error seeking to filename for file ID {key}: {e}") |
| 127 | self.files[key]["name"] = f"file_{key}" |
| 128 | |
| 129 | fp.seek(old_pos) |
| 130 | |
| 131 | def get_info(self): |
| 132 | info = super().get_info() |
| 133 | if self.is_ps4: |
| 134 | info.update({ |
| 135 | "pkg_magic": f"0x{self.pkg_magic:X}", |
| 136 | "pkg_type": f"0x{self.pkg_type:X}", |
| 137 | "pkg_0x008": self.pkg_0x008, |
| 138 | "pkg_file_count": self.pkg_file_count, |
| 139 | "pkg_entry_count": self.pkg_entry_count, |
| 140 | "pkg_sc_entry_count": self.pkg_sc_entry_count, |
| 141 | "pkg_entry_count_2": self.pkg_entry_count_2, |
| 142 | "pkg_table_offset": f"0x{self.pkg_table_offset:X}", |
| 143 | "pkg_entry_data_size": self.pkg_entry_data_size, |
| 144 | "pkg_body_offset": f"0x{self.pkg_body_offset:X}", |
| 145 | "pkg_body_size": self.pkg_body_size, |
| 146 | "pkg_content_offset": f"0x{self.pkg_content_offset:X}", |
| 147 | "pkg_content_size": self.pkg_content_size, |
| 148 | "pkg_content_id": self.pkg_content_id, |
| 149 | "pkg_padding": self.pkg_padding.hex() if isinstance(self.pkg_padding, bytes) else str(self.pkg_padding), |
| 150 | "pkg_drm_type": self.pkg_drm_type.name if isinstance(self.pkg_drm_type, DRMType) else str(self.pkg_drm_type), |
| 151 | "pkg_content_type": self.pkg_content_type.name if isinstance(self.pkg_content_type, ContentType) else str(self.pkg_content_type), |
| 152 | "pkg_content_flags": f"0x{self.pkg_content_flags:X}" if self.pkg_content_flags is not None else "None", |
| 153 | "pkg_promote_size": self.pkg_promote_size, |
| 154 | "pkg_version_date": self.pkg_version_date, |
| 155 | "pkg_version_hash": self.pkg_version_hash.hex() if isinstance(self.pkg_version_hash, bytes) else f"0x{self.pkg_version_hash:X}" if self.pkg_version_hash is not None else "None", |
| 156 | "pkg_0x088": f"0x{self.pkg_0x088:X}" if self.pkg_0x088 is not None else "None", |
| 157 | "pkg_0x08C": f"0x{self.pkg_0x08C:X}" if self.pkg_0x08C is not None else "None", |
| 158 | "pkg_0x090": f"0x{self.pkg_0x090:X}" if self.pkg_0x090 is not None else "None", |
| 159 | "pkg_0x094": f"0x{self.pkg_0x094:X}" if self.pkg_0x094 is not None else "None", |
| 160 | "pkg_iro_tag": self.pkg_iro_tag.name if isinstance(self.pkg_iro_tag, IROTag) else str(self.pkg_iro_tag), |
| 161 | "pkg_drm_type_version": self.pkg_drm_type_version, |
| 162 | "Main Entry 1 Hash": self.digests[0] if len(self.digests) > 0 else "N/A", |
| 163 | "Main Entry 2 Hash": self.digests[1] if len(self.digests) > 1 else "N/A", |
| 164 | "Digest Table Hash": self.digests[2] if len(self.digests) > 2 else "N/A", |
| 165 | "Main Table Hash": self.digests[3] if len(self.digests) > 3 else "N/A", |
| 166 | }) |
| 167 | return info |
| 168 | |
| 169 | def dump(self, output_dir): |
| 170 | """ |
| 171 | Dumps all files in the package to the specified directory. |
| 172 | """ |
| 173 | os.makedirs(output_dir, exist_ok=True) |
| 174 | |
| 175 | for file_id, file_info in self.files.items(): |
| 176 | file_name = file_info.get('name', f'file_{file_id}') |
| 177 | output_path = os.path.join(output_dir, file_name) |
| 178 | |
| 179 | # Crea le directory necessarie |
| 180 | os.makedirs(os.path.dirname(output_path), exist_ok=True) |
| 181 | |
| 182 | try: |
| 183 | with open(self.original_file, 'rb') as pkg_file: |
| 184 | pkg_file.seek(file_info['offset']) |
| 185 | file_data = pkg_file.read(file_info['size']) |
| 186 | |
| 187 | with open(output_path, 'wb') as out_file: |
| 188 | out_file.write(file_data) |
| 189 | |
| 190 | Logger.log_information(f"File extracted: {file_name}") |
| 191 | except Exception as e: |
| 192 | Logger.log_error(f"Error during extraction of file {file_name}: {str(e)}") |
| 193 | |
| 194 | Logger.log_information(f"Dump completed. Extracted files in: {output_dir}") |
| 195 | return f"Dump completed. Extracted files in: {output_dir}" |
| 196 | |
| 197 | def get_file_data(self, file_id): |
| 198 | """ |
| 199 | Gets the raw data for a file from the package. |
| 200 | """ |
| 201 | if file_id not in self.files: |
| 202 | raise ValueError(f"File ID {file_id} not found in package") |
| 203 | |
| 204 | file_info = self.files[file_id] |
| 205 | |
| 206 | try: |
| 207 | with open(self.original_file, 'rb') as pkg_file: |
| 208 | pkg_file.seek(file_info['offset']) |
| 209 | return pkg_file.read(file_info['size']) |
| 210 | except Exception as e: |
| 211 | Logger.log_error(f"Error reading file data: {str(e)}") |
| 212 | raise |
| 213 | |
| 214 | def get_pfs_info(self, as_json: bool = False) -> str: |
| 215 | """Esegue 'shadPKG.exe pfs-info' sul PKG corrente e restituisce l'output. |
| 216 | |
| 217 | Se as_json è True, aggiunge l'opzione '--json' e restituisce lo stdout (stringa JSON). |
| 218 | Lancia un'eccezione se shadPKG non è disponibile o il comando fallisce. |
| 219 | """ |
| 220 | exe = self._find_shadpkg_exe() |
| 221 | if not exe: |
| 222 | raise FileNotFoundError("shadPKG.exe non trovato nel percorso previsto.") |
| 223 | |
| 224 | cmd = [exe, 'pfs-info'] |
| 225 | if as_json: |
| 226 | cmd.append('--json') |
| 227 | cmd.append(self.original_file) |
| 228 | |
| 229 | try: |
| 230 | Logger.log_information(f"Running shadPKG pfs-info: {' '.join(cmd)}") |
| 231 | proc = subprocess.run(cmd, cwd=os.path.dirname(exe), capture_output=True, text=True, timeout=120) |
| 232 | if proc.stdout: |
| 233 | Logger.log_information(proc.stdout.strip()) |
| 234 | if proc.stderr: |
| 235 | # shadPKG prints some info to stderr; keep as warning without failing |
| 236 | Logger.log_warning(proc.stderr.strip()) |
| 237 | if proc.returncode != 0: |
| 238 | raise RuntimeError(f"pfs-info failed with code {proc.returncode}") |
| 239 | return (proc.stdout or '').strip() |
| 240 | except subprocess.TimeoutExpired: |
| 241 | raise TimeoutError("pfs-info timeout") |
| 242 | except Exception as e: |
| 243 | raise RuntimeError(f"pfs-info error: {e}") |
| 244 | |
| 245 | def is_encrypted(self): |
| 246 | """Check if package is encrypted""" |
| 247 | try: |
| 248 | # Controlla se il PKG è cifrato verificando il flag di crittografia |
| 249 | with open(self.original_file, 'rb') as f: |
| 250 | # Vai all'offset del flag di crittografia (0x1A nel header PS4) |
| 251 | f.seek(0x1A) |
| 252 | # Leggi il flag (2 byte) |
| 253 | encryption_flag = int.from_bytes(f.read(2), byteorder='little') |
| 254 | # Se il flag è diverso da 0, il PKG è cifrato |
| 255 | return encryption_flag != 0 |
| 256 | except Exception as e: |
| 257 | logging.error(f"Error checking encryption: {str(e)}") |
| 258 | return False |
| 259 | |
| 260 | def extract_with_passcode(self, passcode, output_dir): |
| 261 | """Extract encrypted PKG with passcode""" |
| 262 | if not self.is_encrypted(): |
| 263 | raise ValueError("Package is not encrypted") |
| 264 | |
| 265 | try: |
| 266 | # Verifica il formato del passcode |
| 267 | if len(passcode) != 32: |
| 268 | raise ValueError("Invalid passcode length") |
| 269 | |
| 270 | # Converti il passcode in chiave AES |
| 271 | try: |
| 272 | # Prima prova a convertire da esadecimale |
| 273 | key = bytes.fromhex(passcode) |
| 274 | except ValueError: |
| 275 | # Se fallisce, usa il passcode direttamente come chiave |
| 276 | key = passcode.encode('utf-8') |
| 277 | |
| 278 | # Decripta il PKG usando la chiave |
| 279 | self.decrypt_pkg(key, output_dir) |
| 280 | |
| 281 | return True |
| 282 | except ValueError as e: |
| 283 | raise e |
| 284 | except Exception as e: |
| 285 | raise ValueError(f"Failed to decrypt with passcode: {str(e)}") |
| 286 | |
| 287 | def decrypt_pkg(self, key, output_dir): |
| 288 | """Decrypt PKG using AES key""" |
| 289 | try: |
| 290 | # Crea la directory di output |
| 291 | os.makedirs(output_dir, exist_ok=True) |
| 292 | |
| 293 | # Leggi il PKG cifrato |
| 294 | with open(self.original_file, 'rb') as f: |
| 295 | # Leggi l'header (non cifrato) |
| 296 | header = f.read(0x400) # I primi 0x400 bytes sono l'header |
| 297 | |
| 298 | # Leggi il contenuto cifrato |
| 299 | encrypted_data = f.read() |
| 300 | |
| 301 | # Decripta i dati |
| 302 | try: |
| 303 | # Inizializza il contesto AES |
| 304 | ctx = AES_ctx() |
| 305 | AES_set_key(ctx, key, AES_KEY_LEN_128) |
| 306 | |
| 307 | # Decripta i dati in blocchi di 16 bytes (AES block size) |
| 308 | decrypted = bytearray() |
| 309 | for i in range(0, len(encrypted_data), 16): |
| 310 | block = encrypted_data[i:i+16] |
| 311 | if len(block) < 16: # Padding per l'ultimo blocco |
| 312 | block = block.ljust(16, b'\x00') |
| 313 | |
| 314 | temp = bytearray(16) |
| 315 | AES_cbc_decrypt(ctx, block, temp) |
| 316 | decrypted.extend(temp) |
| 317 | |
| 318 | # Rimuovi il padding solo se necessario |
| 319 | if len(decrypted) > 0 and decrypted[-1] <= 16: |
| 320 | padding_len = decrypted[-1] |
| 321 | if all(x == padding_len for x in decrypted[-padding_len:]): |
| 322 | decrypted = decrypted[:-padding_len] |
| 323 | |
| 324 | # Combina header e dati decriptati |
| 325 | decrypted_pkg = header + bytes(decrypted) |
| 326 | |
| 327 | # Salva il PKG decifrato |
| 328 | decrypted_path = os.path.join(output_dir, os.path.basename(self.original_file)) |
| 329 | with open(decrypted_path, 'wb') as f: |
| 330 | f.write(decrypted_pkg) |
| 331 | |
| 332 | # Estrai i file dal PKG decifrato |
| 333 | # 1) Prova con lo strumento esterno shadPKG.exe se disponibile |
| 334 | used_external = False |
| 335 | try: |
| 336 | if self._extract_with_shadpkg(decrypted_path, output_dir): |
| 337 | Logger.log_information("Extraction via shadPKG.exe completed.") |
| 338 | used_external = True |
| 339 | except Exception as e: |
| 340 | Logger.log_warning(f"shadPKG.exe extraction failed, falling back to internal: {e}") |
| 341 | |
| 342 | # 2) In caso di indisponibilità/errore, fallback all'estrattore interno |
| 343 | if not used_external: |
| 344 | self.extract_all_files(output_dir) |
| 345 | |
| 346 | except Exception as e: |
| 347 | raise Exception(f"Error in AES decryption: {str(e)}") |
| 348 | |
| 349 | except Exception as e: |
| 350 | raise Exception(f"Error decrypting PKG: {str(e)}") |
| 351 | |
| 352 | def _find_shadpkg_exe(self): |
| 353 | """Restituisce il percorso di shadPKG.exe se presente nel progetto, altrimenti None.""" |
| 354 | try: |
| 355 | base_dir = os.path.dirname(__file__) |
| 356 | candidate = os.path.join(base_dir, 'ps3lib', 'shadPKG.exe') |
| 357 | if os.path.isfile(candidate): |
| 358 | return candidate |
| 359 | except Exception: |
| 360 | pass |
| 361 | return None |
| 362 | |
| 363 | def _extract_with_shadpkg(self, pkg_path: str, output_dir: str) -> bool: |
| 364 | """Prova ad estrarre usando shadPKG.exe con la sintassi corretta. |
| 365 | |
| 366 | Usa il comando 'extract' e specifica la directory di output con '-o' o come |
| 367 | argomento posizionale, come da help dello strumento. |
| 368 | Ritorna True su successo (exit code 0), altrimenti False. |
| 369 | """ |
| 370 | exe = self._find_shadpkg_exe() |
| 371 | if not exe: |
| 372 | return False |
| 373 | |
| 374 | commands = [ |
| 375 | [exe, 'extract', '-o', output_dir, pkg_path], |
| 376 | [exe, 'extract', pkg_path, output_dir], |
| 377 | [exe, 'extract', pkg_path], # lascia a shadPKG scegliere la cartella |
| 378 | ] |
| 379 | |
| 380 | for cmd in commands: |
| 381 | try: |
| 382 | Logger.log_information(f"Running shadPKG: {' '.join(cmd)}") |
| 383 | # Esegui dal suo folder per sicurezza |
| 384 | cwd = os.path.dirname(exe) |
| 385 | proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=300) |
| 386 | if proc.stdout: |
| 387 | Logger.log_information(proc.stdout.strip()) |
| 388 | if proc.stderr: |
| 389 | Logger.log_warning(proc.stderr.strip()) |
| 390 | if proc.returncode == 0: |
| 391 | # Considera riuscito se la tool è terminato con exit code 0 |
| 392 | return True |
| 393 | except FileNotFoundError: |
| 394 | # Non trovato o parametri non validi; prova il prossimo formato |
| 395 | continue |
| 396 | except subprocess.TimeoutExpired: |
| 397 | Logger.log_warning("shadPKG.exe timed out") |
| 398 | continue |
| 399 | except Exception as e: |
| 400 | Logger.log_warning(f"shadPKG.exe invocation error: {e}") |
| 401 | continue |
| 402 | |
| 403 | return False |
| 404 | |
| 405 | def extract_via_shadpkg(self, output_dir: str) -> str: |
| 406 | """Estrae direttamente dall'originale PKG usando shadPKG.exe. |
| 407 | |
| 408 | Usato dall'UI per instradare le operazioni di Extract/Dump su PS4 attraverso |
| 409 | lo strumento esterno quando disponibile. Non modifica il loader delle info di base. |
| 410 | """ |
| 411 | try: |
| 412 | os.makedirs(output_dir, exist_ok=True) |
| 413 | if self._extract_with_shadpkg(self.original_file, output_dir): |
| 414 | Logger.log_information(f"Extraction completed via shadPKG. Output: {output_dir}") |
| 415 | return f"Extraction completed. Output: {output_dir}" |
| 416 | raise ValueError("shadPKG.exe unavailable or extraction failed") |
| 417 | except Exception as e: |
| 418 | Logger.log_error(f"extract_via_shadpkg failed: {e}") |
| 419 | raise |