Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
packages/package_ps4.py
PkgToolBox / packages / package_ps4.py
1import struct
2from .package_base import PackageBase
3from .enums import DRMType, ContentType, IROTag
4from Utilities import Logger
5import os
6import shutil
7import logging
8import subprocess
9from .crypto_utils import AES_ctx, AES_set_key, AES_cbc_decrypt, AES_KEY_LEN_128
10 
11 
12 
13class PackagePS4(PackageBase):
14 MAGIC_PS4 = 0x7f434E54 # ?CNT for PS4
15 
16 def __init__(self, file: str):
17 super().__init__(file)
18 self.is_ps4 = False
19 self.pkg_magic = None
20 self.pkg_type = None
21 self.pkg_file_count = 0
22 self.pkg_entry_count = 0
23 self.pkg_sc_entry_count = 0
24 self.pkg_entry_count_2 = 0
25 self.pkg_table_offset = 0
26 self.pkg_entry_data_size = 0
27 self.pkg_body_offset = 0
28 self.pkg_body_size = 0
29 self.pkg_content_offset = 0
30 self.pkg_content_size = 0
31 self.pkg_drm_type = None
32 self.pkg_content_type = None
33 self.pkg_content_flags = None
34 self.pkg_promote_size = 0
35 self.pkg_version_date = None
36 self.pkg_version_hash = None
37 self.pkg_iro_tag = None
38 self.pkg_drm_type_version = None
39 self.pkg_content_id = None
40 
41 with open(file, "rb") as fp:
42 magic = struct.unpack(">I", fp.read(4))[0]
43 Logger.log_information(f"Read magic number: {magic:08X}")
44 if magic == self.MAGIC_PS4:
45 self.is_ps4 = True
46 self._load_ps4_pkg(fp)
47 else:
48 raise ValueError(f"Unknown PKG format: {magic:08X}")
49 
50 def _load_ps4_pkg(self, fp):
51 try:
52 header_format = ">5I2H2I4Q36s12s12I"
53 fp.seek(0)
54 data = fp.read(struct.calcsize(header_format))
55
56 (self.pkg_magic, self.pkg_type, self.pkg_0x008, self.pkg_file_count, self.pkg_entry_count,
57 self.pkg_sc_entry_count, self.pkg_entry_count_2, self.pkg_table_offset, self.pkg_entry_data_size,
58 self.pkg_body_offset, self.pkg_body_size, self.pkg_content_offset, self.pkg_content_size,
59 self.pkg_content_id, self.pkg_padding, self.pkg_drm_type, self.pkg_content_type,
60 self.pkg_content_flags, self.pkg_promote_size, self.pkg_version_date, self.pkg_version_hash,
61 self.pkg_0x088, self.pkg_0x08C, self.pkg_0x090, self.pkg_0x094, self.pkg_iro_tag,
62 self.pkg_drm_type_version) = struct.unpack(header_format, data)
63 
64 Logger.log_information(f"Loaded header data: {self.pkg_magic}, {self.pkg_type}, {self.pkg_drm_type}, {self.pkg_content_type}")
65 
66 self.pkg_content_id = self._safe_decode(self.pkg_content_id)
67 self.content_id = self.pkg_content_id
68 
69 fp.seek(0x100, os.SEEK_SET)
70 data = fp.read(128)
71 self.digests = [data[i:i+32].hex() for i in range(0, 128, 32)]
72 
73 try:
74 self.pkg_content_type = ContentType(self.pkg_content_type)
75 except ValueError:
76 Logger.log_warning(f"Warning: {self.pkg_content_type} is not a valid ContentType. Setting to None.")
77 self.pkg_content_type = None
78 
79 try:
80 self.pkg_iro_tag = IROTag(self.pkg_iro_tag)
81 except ValueError:
82 Logger.log_warning(f"Warning: {self.pkg_iro_tag} is not a valid IROTag. Setting to None.")
83 self.pkg_iro_tag = None
84 self.invalid_irotag = True
85 else:
86 self.invalid_irotag = False
87 
88 self.__load_files(fp)
89 self.files = self.files
90 except Exception as e:
91 Logger.log_error(f"Error loading PS4 PKG file: {str(e)}")
92 raise ValueError(f"Error loading PS4 PKG file: {str(e)}")
93 
94 def __load_files(self, fp):
95 old_pos = fp.tell()
96 fp.seek(self.pkg_table_offset, os.SEEK_SET)
97 
98 entry_format = ">6IQ"
99 self.files = {}
100 for i in range(self.pkg_entry_count):
101 entry_data = fp.read(struct.calcsize(entry_format))
102 file_id, filename_offset, flags1, flags2, offset, size, padding = struct.unpack(entry_format, entry_data)
103 self.files[file_id] = {
104 "id": file_id,
105 "fn_offset": filename_offset,
106 "flags1": flags1,
107 "flags2": flags2,
108 "offset": offset,
109 "size": size,
110 "padding": padding,
111 "key_idx": (flags2 & 0xF00) >> 12,
112 "encrypted": (flags1 & PackageBase.FLAG_ENCRYPTED) == PackageBase.FLAG_ENCRYPTED
113 }
114 
115 for key, file in self.files.items():
116 try:
117 fp.seek(self.files[0x200]["offset"] + file["fn_offset"])
118 fn = self._read_null_terminated_string(fp)
119 if fn:
120 try:
121 self.files[key]["name"] = self._safe_decode(fn)
122 except Exception as e:
123 Logger.log_warning(f"Failed to decode filename for file ID {key}: {e}")
124 self.files[key]["name"] = f"file_{key}"
125 except (OverflowError, OSError) as e:
126 Logger.log_warning(f"Error seeking to filename for file ID {key}: {e}")
127 self.files[key]["name"] = f"file_{key}"
128 
129 fp.seek(old_pos)
130 
131 def get_info(self):
132 info = super().get_info()
133 if self.is_ps4:
134 info.update({
135 "pkg_magic": f"0x{self.pkg_magic:X}",
136 "pkg_type": f"0x{self.pkg_type:X}",
137 "pkg_0x008": self.pkg_0x008,
138 "pkg_file_count": self.pkg_file_count,
139 "pkg_entry_count": self.pkg_entry_count,
140 "pkg_sc_entry_count": self.pkg_sc_entry_count,
141 "pkg_entry_count_2": self.pkg_entry_count_2,
142 "pkg_table_offset": f"0x{self.pkg_table_offset:X}",
143 "pkg_entry_data_size": self.pkg_entry_data_size,
144 "pkg_body_offset": f"0x{self.pkg_body_offset:X}",
145 "pkg_body_size": self.pkg_body_size,
146 "pkg_content_offset": f"0x{self.pkg_content_offset:X}",
147 "pkg_content_size": self.pkg_content_size,
148 "pkg_content_id": self.pkg_content_id,
149 "pkg_padding": self.pkg_padding.hex() if isinstance(self.pkg_padding, bytes) else str(self.pkg_padding),
150 "pkg_drm_type": self.pkg_drm_type.name if isinstance(self.pkg_drm_type, DRMType) else str(self.pkg_drm_type),
151 "pkg_content_type": self.pkg_content_type.name if isinstance(self.pkg_content_type, ContentType) else str(self.pkg_content_type),
152 "pkg_content_flags": f"0x{self.pkg_content_flags:X}" if self.pkg_content_flags is not None else "None",
153 "pkg_promote_size": self.pkg_promote_size,
154 "pkg_version_date": self.pkg_version_date,
155 "pkg_version_hash": self.pkg_version_hash.hex() if isinstance(self.pkg_version_hash, bytes) else f"0x{self.pkg_version_hash:X}" if self.pkg_version_hash is not None else "None",
156 "pkg_0x088": f"0x{self.pkg_0x088:X}" if self.pkg_0x088 is not None else "None",
157 "pkg_0x08C": f"0x{self.pkg_0x08C:X}" if self.pkg_0x08C is not None else "None",
158 "pkg_0x090": f"0x{self.pkg_0x090:X}" if self.pkg_0x090 is not None else "None",
159 "pkg_0x094": f"0x{self.pkg_0x094:X}" if self.pkg_0x094 is not None else "None",
160 "pkg_iro_tag": self.pkg_iro_tag.name if isinstance(self.pkg_iro_tag, IROTag) else str(self.pkg_iro_tag),
161 "pkg_drm_type_version": self.pkg_drm_type_version,
162 "Main Entry 1 Hash": self.digests[0] if len(self.digests) > 0 else "N/A",
163 "Main Entry 2 Hash": self.digests[1] if len(self.digests) > 1 else "N/A",
164 "Digest Table Hash": self.digests[2] if len(self.digests) > 2 else "N/A",
165 "Main Table Hash": self.digests[3] if len(self.digests) > 3 else "N/A",
166 })
167 return info
168 
169 def dump(self, output_dir):
170 """
171 Dumps all files in the package to the specified directory.
172 """
173 os.makedirs(output_dir, exist_ok=True)
174
175 for file_id, file_info in self.files.items():
176 file_name = file_info.get('name', f'file_{file_id}')
177 output_path = os.path.join(output_dir, file_name)
178
179 # Crea le directory necessarie
180 os.makedirs(os.path.dirname(output_path), exist_ok=True)
181
182 try:
183 with open(self.original_file, 'rb') as pkg_file:
184 pkg_file.seek(file_info['offset'])
185 file_data = pkg_file.read(file_info['size'])
186
187 with open(output_path, 'wb') as out_file:
188 out_file.write(file_data)
189
190 Logger.log_information(f"File extracted: {file_name}")
191 except Exception as e:
192 Logger.log_error(f"Error during extraction of file {file_name}: {str(e)}")
193
194 Logger.log_information(f"Dump completed. Extracted files in: {output_dir}")
195 return f"Dump completed. Extracted files in: {output_dir}"
196 
197 def get_file_data(self, file_id):
198 """
199 Gets the raw data for a file from the package.
200 """
201 if file_id not in self.files:
202 raise ValueError(f"File ID {file_id} not found in package")
203 
204 file_info = self.files[file_id]
205
206 try:
207 with open(self.original_file, 'rb') as pkg_file:
208 pkg_file.seek(file_info['offset'])
209 return pkg_file.read(file_info['size'])
210 except Exception as e:
211 Logger.log_error(f"Error reading file data: {str(e)}")
212 raise
213 
214 def get_pfs_info(self, as_json: bool = False) -> str:
215 """Esegue 'shadPKG.exe pfs-info' sul PKG corrente e restituisce l'output.
216 
217 Se as_json è True, aggiunge l'opzione '--json' e restituisce lo stdout (stringa JSON).
218 Lancia un'eccezione se shadPKG non è disponibile o il comando fallisce.
219 """
220 exe = self._find_shadpkg_exe()
221 if not exe:
222 raise FileNotFoundError("shadPKG.exe non trovato nel percorso previsto.")
223 
224 cmd = [exe, 'pfs-info']
225 if as_json:
226 cmd.append('--json')
227 cmd.append(self.original_file)
228 
229 try:
230 Logger.log_information(f"Running shadPKG pfs-info: {' '.join(cmd)}")
231 proc = subprocess.run(cmd, cwd=os.path.dirname(exe), capture_output=True, text=True, timeout=120)
232 if proc.stdout:
233 Logger.log_information(proc.stdout.strip())
234 if proc.stderr:
235 # shadPKG prints some info to stderr; keep as warning without failing
236 Logger.log_warning(proc.stderr.strip())
237 if proc.returncode != 0:
238 raise RuntimeError(f"pfs-info failed with code {proc.returncode}")
239 return (proc.stdout or '').strip()
240 except subprocess.TimeoutExpired:
241 raise TimeoutError("pfs-info timeout")
242 except Exception as e:
243 raise RuntimeError(f"pfs-info error: {e}")
244 
245 def is_encrypted(self):
246 """Check if package is encrypted"""
247 try:
248 # Controlla se il PKG è cifrato verificando il flag di crittografia
249 with open(self.original_file, 'rb') as f:
250 # Vai all'offset del flag di crittografia (0x1A nel header PS4)
251 f.seek(0x1A)
252 # Leggi il flag (2 byte)
253 encryption_flag = int.from_bytes(f.read(2), byteorder='little')
254 # Se il flag è diverso da 0, il PKG è cifrato
255 return encryption_flag != 0
256 except Exception as e:
257 logging.error(f"Error checking encryption: {str(e)}")
258 return False
259 
260 def extract_with_passcode(self, passcode, output_dir):
261 """Extract encrypted PKG with passcode"""
262 if not self.is_encrypted():
263 raise ValueError("Package is not encrypted")
264
265 try:
266 # Verifica il formato del passcode
267 if len(passcode) != 32:
268 raise ValueError("Invalid passcode length")
269
270 # Converti il passcode in chiave AES
271 try:
272 # Prima prova a convertire da esadecimale
273 key = bytes.fromhex(passcode)
274 except ValueError:
275 # Se fallisce, usa il passcode direttamente come chiave
276 key = passcode.encode('utf-8')
277
278 # Decripta il PKG usando la chiave
279 self.decrypt_pkg(key, output_dir)
280
281 return True
282 except ValueError as e:
283 raise e
284 except Exception as e:
285 raise ValueError(f"Failed to decrypt with passcode: {str(e)}")
286 
287 def decrypt_pkg(self, key, output_dir):
288 """Decrypt PKG using AES key"""
289 try:
290 # Crea la directory di output
291 os.makedirs(output_dir, exist_ok=True)
292
293 # Leggi il PKG cifrato
294 with open(self.original_file, 'rb') as f:
295 # Leggi l'header (non cifrato)
296 header = f.read(0x400) # I primi 0x400 bytes sono l'header
297
298 # Leggi il contenuto cifrato
299 encrypted_data = f.read()
300
301 # Decripta i dati
302 try:
303 # Inizializza il contesto AES
304 ctx = AES_ctx()
305 AES_set_key(ctx, key, AES_KEY_LEN_128)
306
307 # Decripta i dati in blocchi di 16 bytes (AES block size)
308 decrypted = bytearray()
309 for i in range(0, len(encrypted_data), 16):
310 block = encrypted_data[i:i+16]
311 if len(block) < 16: # Padding per l'ultimo blocco
312 block = block.ljust(16, b'\x00')
313
314 temp = bytearray(16)
315 AES_cbc_decrypt(ctx, block, temp)
316 decrypted.extend(temp)
317
318 # Rimuovi il padding solo se necessario
319 if len(decrypted) > 0 and decrypted[-1] <= 16:
320 padding_len = decrypted[-1]
321 if all(x == padding_len for x in decrypted[-padding_len:]):
322 decrypted = decrypted[:-padding_len]
323
324 # Combina header e dati decriptati
325 decrypted_pkg = header + bytes(decrypted)
326
327 # Salva il PKG decifrato
328 decrypted_path = os.path.join(output_dir, os.path.basename(self.original_file))
329 with open(decrypted_path, 'wb') as f:
330 f.write(decrypted_pkg)
331
332 # Estrai i file dal PKG decifrato
333 # 1) Prova con lo strumento esterno shadPKG.exe se disponibile
334 used_external = False
335 try:
336 if self._extract_with_shadpkg(decrypted_path, output_dir):
337 Logger.log_information("Extraction via shadPKG.exe completed.")
338 used_external = True
339 except Exception as e:
340 Logger.log_warning(f"shadPKG.exe extraction failed, falling back to internal: {e}")
341 
342 # 2) In caso di indisponibilità/errore, fallback all'estrattore interno
343 if not used_external:
344 self.extract_all_files(output_dir)
345
346 except Exception as e:
347 raise Exception(f"Error in AES decryption: {str(e)}")
348
349 except Exception as e:
350 raise Exception(f"Error decrypting PKG: {str(e)}")
351 
352 def _find_shadpkg_exe(self):
353 """Restituisce il percorso di shadPKG.exe se presente nel progetto, altrimenti None."""
354 try:
355 base_dir = os.path.dirname(__file__)
356 candidate = os.path.join(base_dir, 'ps3lib', 'shadPKG.exe')
357 if os.path.isfile(candidate):
358 return candidate
359 except Exception:
360 pass
361 return None
362 
363 def _extract_with_shadpkg(self, pkg_path: str, output_dir: str) -> bool:
364 """Prova ad estrarre usando shadPKG.exe con la sintassi corretta.
365 
366 Usa il comando 'extract' e specifica la directory di output con '-o' o come
367 argomento posizionale, come da help dello strumento.
368 Ritorna True su successo (exit code 0), altrimenti False.
369 """
370 exe = self._find_shadpkg_exe()
371 if not exe:
372 return False
373 
374 commands = [
375 [exe, 'extract', '-o', output_dir, pkg_path],
376 [exe, 'extract', pkg_path, output_dir],
377 [exe, 'extract', pkg_path], # lascia a shadPKG scegliere la cartella
378 ]
379 
380 for cmd in commands:
381 try:
382 Logger.log_information(f"Running shadPKG: {' '.join(cmd)}")
383 # Esegui dal suo folder per sicurezza
384 cwd = os.path.dirname(exe)
385 proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=300)
386 if proc.stdout:
387 Logger.log_information(proc.stdout.strip())
388 if proc.stderr:
389 Logger.log_warning(proc.stderr.strip())
390 if proc.returncode == 0:
391 # Considera riuscito se la tool è terminato con exit code 0
392 return True
393 except FileNotFoundError:
394 # Non trovato o parametri non validi; prova il prossimo formato
395 continue
396 except subprocess.TimeoutExpired:
397 Logger.log_warning("shadPKG.exe timed out")
398 continue
399 except Exception as e:
400 Logger.log_warning(f"shadPKG.exe invocation error: {e}")
401 continue
402 
403 return False
404 
405 def extract_via_shadpkg(self, output_dir: str) -> str:
406 """Estrae direttamente dall'originale PKG usando shadPKG.exe.
407 
408 Usato dall'UI per instradare le operazioni di Extract/Dump su PS4 attraverso
409 lo strumento esterno quando disponibile. Non modifica il loader delle info di base.
410 """
411 try:
412 os.makedirs(output_dir, exist_ok=True)
413 if self._extract_with_shadpkg(self.original_file, output_dir):
414 Logger.log_information(f"Extraction completed via shadPKG. Output: {output_dir}")
415 return f"Extraction completed. Output: {output_dir}"
416 raise ValueError("shadPKG.exe unavailable or extraction failed")
417 except Exception as e:
418 Logger.log_error(f"extract_via_shadpkg failed: {e}")
419 raise