Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | # This module is part of PKGToolBox, developed by seregonwar. It is a translated and adapted version from C++ to Python, based on the original implementation made by HoppersPS4. |
| 2 | # Credit to HoppersPS4 |
| 3 | # repository link: https://github.com/HoppersPS4/Waste_Ur_Time |
| 4 | |
| 5 | import os |
| 6 | import sys |
| 7 | import time |
| 8 | import random |
| 9 | import string |
| 10 | import shutil |
| 11 | import subprocess |
| 12 | from pathlib import Path |
| 13 | from packages import PackagePS4, PackagePS5, PackagePS3 |
| 14 | import struct |
| 15 | import logging |
| 16 | import json |
| 17 | import pickle |
| 18 | import base64 |
| 19 | import threading |
| 20 | import queue |
| 21 | import hashlib |
| 22 | |
| 23 | class PS4PasscodeBruteforcer: |
| 24 | def __init__(self): |
| 25 | self.passcode_found = False |
| 26 | self.found_passcode = "" |
| 27 | self.last_used_passcode = "" |
| 28 | self.package_name = "" |
| 29 | self.package_cid = "" |
| 30 | self.debug_mode = False |
| 31 | self.silence_mode = False |
| 32 | self.package = None |
| 33 | # Resumable state |
| 34 | self._rng = random.Random() |
| 35 | self._attempts_done = 0 |
| 36 | self._stop = False |
| 37 | self._state_path = None |
| 38 | self._checkpoint_every_attempts = 1000 |
| 39 | self._checkpoint_every_seconds = 5 |
| 40 | self._last_checkpoint_ts = 0.0 |
| 41 | |
| 42 | def generate_random_passcode(self, length=32): |
| 43 | """Generate random passcode (legacy random).""" |
| 44 | if self.debug_mode: |
| 45 | return "00000000000000000000000000000000" |
| 46 | |
| 47 | # Usa lettere, numeri, - e _ |
| 48 | characters = string.ascii_letters + string.digits + "-_" |
| 49 | return ''.join(self._rng.choice(characters) for _ in range(length)) |
| 50 | |
| 51 | # ---------------------- |
| 52 | # Generatore deterministico no-overlap |
| 53 | # ---------------------- |
| 54 | def _code_from_counter(self, counter: int, seed_bytes: bytes) -> str: |
| 55 | """Deriva un passcode di 32 caratteri URL-safe dal contatore e seed. |
| 56 | Usa SHA-256 -> base64 urlsafe e tronca a 32 senza padding. Deterministico. |
| 57 | """ |
| 58 | # 16 byte per grande spazio (2^128) + seed per sessione |
| 59 | ctr_bytes = counter.to_bytes(16, 'big', signed=False) |
| 60 | digest = hashlib.sha256(seed_bytes + ctr_bytes).digest() |
| 61 | b64 = base64.urlsafe_b64encode(digest).decode('ascii').rstrip('=') |
| 62 | # Garantisce almeno 32 caratteri |
| 63 | if len(b64) < 32: |
| 64 | b64 = (b64 * ((32 // len(b64)) + 1))[:32] |
| 65 | return b64[:32] |
| 66 | |
| 67 | def validate_passcode(self, passcode): |
| 68 | """Validate passcode format""" |
| 69 | # Verifica solo la lunghezza |
| 70 | if len(passcode) != 32: |
| 71 | raise ValueError("Passcode must be 32 characters long") |
| 72 | |
| 73 | return True |
| 74 | |
| 75 | # ---------------------- |
| 76 | # Integrazione orbis-pub-cmd.exe (PS4) |
| 77 | # ---------------------- |
| 78 | def _find_orbis_pub_cmd(self) -> str | None: |
| 79 | """Restituisce il percorso di orbis-pub-cmd.exe se presente in packages/ps3lib, altrimenti None.""" |
| 80 | try: |
| 81 | base_dir = os.path.dirname(os.path.abspath(__file__)) |
| 82 | exe = os.path.normpath(os.path.join(base_dir, '..', 'packages', 'ps3lib', 'orbis-pub-cmd.exe')) |
| 83 | if os.path.isfile(exe): |
| 84 | return exe |
| 85 | except Exception: |
| 86 | pass |
| 87 | return None |
| 88 | |
| 89 | def _orbis_validate(self, exe: str, pkg_path: str, passcode: str) -> tuple[bool, str]: |
| 90 | """Esegue img_file_list per validare la passcode. Ritorna (ok, output).""" |
| 91 | try: |
| 92 | cmd = [exe, 'img_file_list', '--passcode', passcode, pkg_path] |
| 93 | cwd = os.path.dirname(exe) |
| 94 | proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=300) |
| 95 | out = (proc.stdout or '') + ("\n" + proc.stderr if proc.stderr else '') |
| 96 | return (proc.returncode == 0, out.strip()) |
| 97 | except Exception as e: |
| 98 | return (False, f"orbis validate error: {e}") |
| 99 | |
| 100 | def _orbis_extract(self, exe: str, pkg_path: str, out_dir: str, passcode: str) -> tuple[bool, str]: |
| 101 | """Esegue img_extract per estrarre i file. Ritorna (ok, output).""" |
| 102 | try: |
| 103 | os.makedirs(out_dir, exist_ok=True) |
| 104 | cmd = [exe, 'img_extract', '--passcode', passcode, pkg_path, out_dir] |
| 105 | cwd = os.path.dirname(exe) |
| 106 | proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=1800) |
| 107 | out = (proc.stdout or '') + ("\n" + proc.stderr if proc.stderr else '') |
| 108 | return (proc.returncode == 0, out.strip()) |
| 109 | except Exception as e: |
| 110 | return (False, f"orbis extract error: {e}") |
| 111 | |
| 112 | def try_passcode(self, input_file, output_directory, passcode): |
| 113 | """Try to decrypt with a specific passcode""" |
| 114 | try: |
| 115 | # Prima: determina il formato e, se PS4, prova orbis-pub-cmd per un check/estrazione veloce |
| 116 | with open(input_file, "rb") as fp: |
| 117 | magic = struct.unpack(">I", fp.read(4))[0] |
| 118 | |
| 119 | if magic == PackagePS4.MAGIC_PS4: |
| 120 | # Prova orbis-pub-cmd se disponibile (solo per tentativi manuali/singoli) |
| 121 | exe = self._find_orbis_pub_cmd() |
| 122 | if exe: |
| 123 | if len(passcode) != 32: |
| 124 | return f"[-] Invalid passcode length: {len(passcode)}" |
| 125 | ok, msg = self._orbis_validate(exe, input_file, passcode) |
| 126 | if ok: |
| 127 | ok2, msg2 = self._orbis_extract(exe, input_file, output_directory, passcode) |
| 128 | if ok2: |
| 129 | self.passcode_found = True |
| 130 | self.found_passcode = passcode |
| 131 | return f"[+] Successfully decrypted (orbis) with passcode: {passcode}\n{msg2}" |
| 132 | # Se validata ma estrazione fallita, continua con fallback interno |
| 133 | # Se non ok o exe non presente, continua con fallback interno |
| 134 | |
| 135 | # Load package only if not already loaded or for a different file |
| 136 | if not self.package or getattr(self.package, 'original_file', None) != input_file: |
| 137 | if magic == PackagePS4.MAGIC_PS4: |
| 138 | self.package = PackagePS4(input_file) |
| 139 | elif magic == PackagePS5.MAGIC_PS5: |
| 140 | self.package = PackagePS5(input_file) |
| 141 | elif magic == PackagePS3.MAGIC_PS3: |
| 142 | self.package = PackagePS3(input_file) |
| 143 | else: |
| 144 | return f"[-] Unknown PKG format: {magic:08X}" |
| 145 | |
| 146 | if not self.package.is_encrypted(): |
| 147 | self.package.extract_all_files(output_directory) |
| 148 | return "[+] Package is not encrypted. Files extracted." |
| 149 | |
| 150 | try: |
| 151 | # Validate only length to avoid extra overhead |
| 152 | if len(passcode) != 32: |
| 153 | return f"[-] Invalid passcode length: {len(passcode)}" |
| 154 | |
| 155 | self.package.extract_with_passcode(passcode, output_directory) |
| 156 | self.passcode_found = True |
| 157 | self.found_passcode = passcode |
| 158 | return f"[+] Successfully decrypted with passcode: {passcode}" |
| 159 | except ValueError as e: |
| 160 | return f"[-] Failed to decrypt with passcode: {str(e)}" |
| 161 | |
| 162 | except Exception as e: |
| 163 | logging.error(f"Error trying passcode: {str(e)}") |
| 164 | return f"[-] Error: {str(e)}" |
| 165 | |
| 166 | def brute_force_passcode(self, input_file, output_directory, progress_callback=None, manual_passcode=None, num_workers: int = 1, tested_callback=None, seed: int | None = None): |
| 167 | """Brute force or try specific passcode""" |
| 168 | self.ensure_output_directory(output_directory) |
| 169 | self._state_path = self._get_state_path(input_file) |
| 170 | self._stop = False |
| 171 | num_workers = max(1, int(num_workers or 1)) |
| 172 | # Seed deterministico (persistente se riprende lo stato): |
| 173 | # se non fornito, deriviamo da tempo + path |
| 174 | if seed is None: |
| 175 | seed_material = f"{input_file}|{time.time()}".encode('utf-8') |
| 176 | seed_hash = hashlib.sha256(seed_material).digest() |
| 177 | else: |
| 178 | seed_hash = hashlib.sha256(str(int(seed)).encode('utf-8')).digest() |
| 179 | |
| 180 | try: |
| 181 | # Determine package type and create appropriate instance |
| 182 | with open(input_file, "rb") as fp: |
| 183 | magic = struct.unpack(">I", fp.read(4))[0] |
| 184 | if magic == PackagePS4.MAGIC_PS4: |
| 185 | self.package = PackagePS4(input_file) |
| 186 | elif magic == PackagePS5.MAGIC_PS5: |
| 187 | self.package = PackagePS5(input_file) |
| 188 | elif magic == PackagePS3.MAGIC_PS3: |
| 189 | self.package = PackagePS3(input_file) |
| 190 | else: |
| 191 | return f"[-] Unknown PKG format: {magic:08X}" |
| 192 | |
| 193 | if not self.package.is_encrypted(): |
| 194 | self.package.extract_all_files(output_directory) |
| 195 | return "[+] Package is not encrypted. Files extracted." |
| 196 | |
| 197 | if progress_callback: |
| 198 | progress_callback("[+] Package is encrypted. Starting decryption...") |
| 199 | |
| 200 | # Se è fornito un passcode manuale, prova solo quello |
| 201 | if manual_passcode: |
| 202 | try: |
| 203 | self.validate_passcode(manual_passcode) |
| 204 | # Per il tentativo manuale, try_passcode proverà prima orbis (se PS4) poi il fallback interno |
| 205 | result = self.try_passcode(input_file, output_directory, manual_passcode) |
| 206 | if progress_callback: |
| 207 | progress_callback(result) |
| 208 | return result |
| 209 | except ValueError as e: |
| 210 | return f"[-] Invalid passcode format: {str(e)}" |
| 211 | |
| 212 | # Prova a ripristinare lo stato precedente |
| 213 | self._maybe_load_state(input_file, progress_callback) |
| 214 | self._last_checkpoint_ts = time.time() |
| 215 | |
| 216 | # Progress tracking |
| 217 | start_ts = time.time() |
| 218 | last_report_ts = start_ts |
| 219 | last_report_attempts = self._attempts_done |
| 220 | last_test_emit_ts = start_ts |
| 221 | |
| 222 | if num_workers == 1: |
| 223 | # Single-threaded fast path: reuse loaded package |
| 224 | counter = 0 |
| 225 | while not self.passcode_found and not self._stop: |
| 226 | passcode = self._code_from_counter(counter, seed_hash) |
| 227 | self.last_used_passcode = passcode |
| 228 | self._attempts_done += 1 |
| 229 | |
| 230 | try: |
| 231 | # Directly attempt using the loaded package to avoid reload overhead |
| 232 | if len(passcode) == 32: |
| 233 | self.package.extract_with_passcode(passcode, output_directory) |
| 234 | self.passcode_found = True |
| 235 | self.found_passcode = passcode |
| 236 | if progress_callback: |
| 237 | progress_callback(f"[+] Successfully decrypted with passcode: {passcode}") |
| 238 | break |
| 239 | else: |
| 240 | if progress_callback and (self._attempts_done % 5000 == 0): |
| 241 | progress_callback(f"[-] Invalid passcode length: {len(passcode)}") |
| 242 | except ValueError as e: |
| 243 | # Only occasionally report failures to reduce UI overhead |
| 244 | if progress_callback and (self._attempts_done % 200 == 0): |
| 245 | progress_callback(f"[-] Failed attempt #{self._attempts_done}: {str(e)}") |
| 246 | |
| 247 | # Emit tested passcode rate-limited |
| 248 | now = time.time() |
| 249 | if tested_callback and (self._attempts_done % 50 == 0 or (now - last_test_emit_ts) >= 0.1): |
| 250 | tested_callback(passcode) |
| 251 | last_test_emit_ts = now |
| 252 | |
| 253 | # Periodic rate reporting and checkpoint |
| 254 | if now - last_report_ts >= 1.0: |
| 255 | delta_attempts = self._attempts_done - last_report_attempts |
| 256 | rate = delta_attempts / max(1e-9, (now - last_report_ts)) |
| 257 | if progress_callback: |
| 258 | progress_callback(f"[~] Attempts: {self._attempts_done} | Rate: {rate:.0f}/s") |
| 259 | last_report_ts = now |
| 260 | last_report_attempts = self._attempts_done |
| 261 | self._maybe_checkpoint(progress_callback) |
| 262 | counter += 1 |
| 263 | else: |
| 264 | # Multi-threaded deterministic: ogni worker ha (counter = worker_id; step = num_workers) |
| 265 | stop_flag = threading.Event() |
| 266 | |
| 267 | def worker(worker_id: int): |
| 268 | # Each worker has its own package instance |
| 269 | try: |
| 270 | with open(input_file, "rb") as fp2: |
| 271 | magic2 = struct.unpack(">I", fp2.read(4))[0] |
| 272 | if magic2 == PackagePS4.MAGIC_PS4: |
| 273 | pkg = PackagePS4(input_file) |
| 274 | elif magic2 == PackagePS5.MAGIC_PS5: |
| 275 | pkg = PackagePS5(input_file) |
| 276 | elif magic2 == PackagePS3.MAGIC_PS3: |
| 277 | pkg = PackagePS3(input_file) |
| 278 | else: |
| 279 | return |
| 280 | except Exception: |
| 281 | return |
| 282 | |
| 283 | counter_local = worker_id # offset diverso per worker |
| 284 | step = num_workers |
| 285 | last_emit_local = time.time() |
| 286 | while not stop_flag.is_set() and not self._stop and not self.passcode_found: |
| 287 | code = self._code_from_counter(counter_local, seed_hash) |
| 288 | try: |
| 289 | if len(code) == 32: |
| 290 | pkg.extract_with_passcode(code, output_directory) |
| 291 | # Found! |
| 292 | self.passcode_found = True |
| 293 | self.found_passcode = code |
| 294 | stop_flag.set() |
| 295 | if progress_callback: |
| 296 | progress_callback(f"[+] Successfully decrypted with passcode: {code}") |
| 297 | break |
| 298 | except ValueError: |
| 299 | pass |
| 300 | finally: |
| 301 | # Count attempts regardless of success |
| 302 | self._attempts_done += 1 |
| 303 | # Emit tested code rate-limited per worker |
| 304 | now_l = time.time() |
| 305 | if tested_callback and (self._attempts_done % 100 == 0 or (now_l - last_emit_local) >= 0.15): |
| 306 | tested_callback(code) |
| 307 | last_emit_local = now_l |
| 308 | counter_local += step |
| 309 | |
| 310 | # Start producer and workers |
| 311 | workers = [threading.Thread(target=worker, args=(i,), daemon=True) for i in range(num_workers)] |
| 312 | for t in workers: |
| 313 | t.start() |
| 314 | |
| 315 | # Monitor loop for progress and exit conditions |
| 316 | while not self.passcode_found and not self._stop: |
| 317 | now = time.time() |
| 318 | if now - last_report_ts >= 1.0: |
| 319 | delta_attempts = self._attempts_done - last_report_attempts |
| 320 | rate = delta_attempts / max(1e-9, (now - last_report_ts)) |
| 321 | if progress_callback: |
| 322 | progress_callback(f"[~] Attempts: {self._attempts_done} | Threads: {num_workers} | Rate: {rate:.0f}/s") |
| 323 | last_report_ts = now |
| 324 | last_report_attempts = self._attempts_done |
| 325 | self._maybe_checkpoint(progress_callback) |
| 326 | time.sleep(0.05) |
| 327 | |
| 328 | stop_flag.set() |
| 329 | |
| 330 | if self.passcode_found: |
| 331 | success_file_name = f"{input_file}.success" |
| 332 | try: |
| 333 | with open(success_file_name, "w") as success_file: |
| 334 | success_file.write(self.found_passcode) |
| 335 | # Rimuovi lo stato salvato poiché abbiamo finito |
| 336 | try: |
| 337 | if self._state_path and os.path.exists(self._state_path): |
| 338 | os.remove(self._state_path) |
| 339 | except Exception: |
| 340 | pass |
| 341 | return f"[+] Passcode found: {self.found_passcode}\n[+] Passcode has been saved to: {success_file_name}" |
| 342 | except Exception as e: |
| 343 | return f"[+] Passcode found: {self.found_passcode}\n[-] Failed to create/save the success file: {e}" |
| 344 | else: |
| 345 | # Salva lo stato anche in caso di stop o chiusura |
| 346 | self._save_state(input_file) |
| 347 | return "[-] Passcode not found or process stopped. Progress saved." |
| 348 | |
| 349 | except FileNotFoundError: |
| 350 | return f"[-] Package file not found: {input_file}" |
| 351 | except Exception as e: |
| 352 | logging.error(f"Error during brute force: {str(e)}") |
| 353 | return f"[-] Error: {str(e)}" |
| 354 | |
| 355 | def ensure_output_directory(self, output_directory): |
| 356 | """Assicura che la directory di output esista""" |
| 357 | os.makedirs(output_directory, exist_ok=True) |
| 358 | |
| 359 | def get_package(self): |
| 360 | """Restituisce l'oggetto package corrente""" |
| 361 | return self.package |
| 362 | |
| 363 | def set_debug_mode(self, enabled): |
| 364 | """Imposta la modalità debug""" |
| 365 | self.debug_mode = enabled |
| 366 | |
| 367 | def set_silence_mode(self, enabled): |
| 368 | """Imposta la modalità silenziosa""" |
| 369 | self.silence_mode = enabled |
| 370 | |
| 371 | def stop(self): |
| 372 | """Richiedi l'arresto del processo di brute force""" |
| 373 | self._stop = True |
| 374 | |
| 375 | # ---------------------- |
| 376 | # Stato e checkpointing |
| 377 | # ---------------------- |
| 378 | def _get_state_path(self, input_file): |
| 379 | """Restituisce il percorso del file di stato per l'input specificato""" |
| 380 | return f"{input_file}.brutestate.json" |
| 381 | |
| 382 | def _maybe_load_state(self, input_file, progress_callback=None): |
| 383 | try: |
| 384 | if not self._state_path or not os.path.exists(self._state_path): |
| 385 | return |
| 386 | with open(self._state_path, "r", encoding="utf-8") as f: |
| 387 | data = json.load(f) |
| 388 | if data.get("input_file") != os.path.abspath(input_file): |
| 389 | return |
| 390 | state_b64 = data.get("rng_state_b64") |
| 391 | if state_b64: |
| 392 | rng_state = pickle.loads(base64.b64decode(state_b64.encode("utf-8"))) |
| 393 | self._rng.setstate(rng_state) |
| 394 | self._attempts_done = int(data.get("attempts_done", 0)) |
| 395 | self.last_used_passcode = data.get("last_passcode", "") |
| 396 | if progress_callback: |
| 397 | progress_callback(f"[+] Resumed previous session: attempts={self._attempts_done}") |
| 398 | except Exception as e: |
| 399 | logging.error(f"Failed to load bruteforce state: {e}") |
| 400 | |
| 401 | def _maybe_checkpoint(self, progress_callback=None): |
| 402 | try: |
| 403 | now = time.time() |
| 404 | if (self._attempts_done % self._checkpoint_every_attempts == 0) or (now - self._last_checkpoint_ts >= self._checkpoint_every_seconds): |
| 405 | self._save_state(self.package.file_path if hasattr(self.package, 'file_path') else "") |
| 406 | self._last_checkpoint_ts = now |
| 407 | if progress_callback: |
| 408 | progress_callback(f"[+] Checkpoint saved: attempts={self._attempts_done}") |
| 409 | except Exception as e: |
| 410 | logging.error(f"Failed to checkpoint bruteforce state: {e}") |
| 411 | |
| 412 | def _save_state(self, input_file): |
| 413 | try: |
| 414 | if not self._state_path: |
| 415 | self._state_path = self._get_state_path(input_file) |
| 416 | data = { |
| 417 | "version": 1, |
| 418 | "input_file": os.path.abspath(input_file), |
| 419 | "attempts_done": self._attempts_done, |
| 420 | "last_passcode": self.last_used_passcode, |
| 421 | "rng_state_b64": base64.b64encode(pickle.dumps(self._rng.getstate())).decode("utf-8"), |
| 422 | "timestamp": time.time(), |
| 423 | } |
| 424 | with open(self._state_path, "w", encoding="utf-8") as f: |
| 425 | json.dump(data, f, indent=2) |
| 426 | except Exception as e: |
| 427 | logging.error(f"Failed to save bruteforce state: {e}") |
| 428 |