Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
tools/PS4_Passcode_Bruteforcer.py
PkgToolBox / tools / PS4_Passcode_Bruteforcer.py
1# This module is part of PKGToolBox, developed by seregonwar. It is a translated and adapted version from C++ to Python, based on the original implementation made by HoppersPS4.
2# Credit to HoppersPS4
3# repository link: https://github.com/HoppersPS4/Waste_Ur_Time
4 
5import os
6import sys
7import time
8import random
9import string
10import shutil
11import subprocess
12from pathlib import Path
13from packages import PackagePS4, PackagePS5, PackagePS3
14import struct
15import logging
16import json
17import pickle
18import base64
19import threading
20import queue
21import hashlib
22 
23class PS4PasscodeBruteforcer:
24 def __init__(self):
25 self.passcode_found = False
26 self.found_passcode = ""
27 self.last_used_passcode = ""
28 self.package_name = ""
29 self.package_cid = ""
30 self.debug_mode = False
31 self.silence_mode = False
32 self.package = None
33 # Resumable state
34 self._rng = random.Random()
35 self._attempts_done = 0
36 self._stop = False
37 self._state_path = None
38 self._checkpoint_every_attempts = 1000
39 self._checkpoint_every_seconds = 5
40 self._last_checkpoint_ts = 0.0
41 
42 def generate_random_passcode(self, length=32):
43 """Generate random passcode (legacy random)."""
44 if self.debug_mode:
45 return "00000000000000000000000000000000"
46 
47 # Usa lettere, numeri, - e _
48 characters = string.ascii_letters + string.digits + "-_"
49 return ''.join(self._rng.choice(characters) for _ in range(length))
50 
51 # ----------------------
52 # Generatore deterministico no-overlap
53 # ----------------------
54 def _code_from_counter(self, counter: int, seed_bytes: bytes) -> str:
55 """Deriva un passcode di 32 caratteri URL-safe dal contatore e seed.
56 Usa SHA-256 -> base64 urlsafe e tronca a 32 senza padding. Deterministico.
57 """
58 # 16 byte per grande spazio (2^128) + seed per sessione
59 ctr_bytes = counter.to_bytes(16, 'big', signed=False)
60 digest = hashlib.sha256(seed_bytes + ctr_bytes).digest()
61 b64 = base64.urlsafe_b64encode(digest).decode('ascii').rstrip('=')
62 # Garantisce almeno 32 caratteri
63 if len(b64) < 32:
64 b64 = (b64 * ((32 // len(b64)) + 1))[:32]
65 return b64[:32]
66 
67 def validate_passcode(self, passcode):
68 """Validate passcode format"""
69 # Verifica solo la lunghezza
70 if len(passcode) != 32:
71 raise ValueError("Passcode must be 32 characters long")
72
73 return True
74 
75 # ----------------------
76 # Integrazione orbis-pub-cmd.exe (PS4)
77 # ----------------------
78 def _find_orbis_pub_cmd(self) -> str | None:
79 """Restituisce il percorso di orbis-pub-cmd.exe se presente in packages/ps3lib, altrimenti None."""
80 try:
81 base_dir = os.path.dirname(os.path.abspath(__file__))
82 exe = os.path.normpath(os.path.join(base_dir, '..', 'packages', 'ps3lib', 'orbis-pub-cmd.exe'))
83 if os.path.isfile(exe):
84 return exe
85 except Exception:
86 pass
87 return None
88 
89 def _orbis_validate(self, exe: str, pkg_path: str, passcode: str) -> tuple[bool, str]:
90 """Esegue img_file_list per validare la passcode. Ritorna (ok, output)."""
91 try:
92 cmd = [exe, 'img_file_list', '--passcode', passcode, pkg_path]
93 cwd = os.path.dirname(exe)
94 proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=300)
95 out = (proc.stdout or '') + ("\n" + proc.stderr if proc.stderr else '')
96 return (proc.returncode == 0, out.strip())
97 except Exception as e:
98 return (False, f"orbis validate error: {e}")
99 
100 def _orbis_extract(self, exe: str, pkg_path: str, out_dir: str, passcode: str) -> tuple[bool, str]:
101 """Esegue img_extract per estrarre i file. Ritorna (ok, output)."""
102 try:
103 os.makedirs(out_dir, exist_ok=True)
104 cmd = [exe, 'img_extract', '--passcode', passcode, pkg_path, out_dir]
105 cwd = os.path.dirname(exe)
106 proc = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, timeout=1800)
107 out = (proc.stdout or '') + ("\n" + proc.stderr if proc.stderr else '')
108 return (proc.returncode == 0, out.strip())
109 except Exception as e:
110 return (False, f"orbis extract error: {e}")
111 
112 def try_passcode(self, input_file, output_directory, passcode):
113 """Try to decrypt with a specific passcode"""
114 try:
115 # Prima: determina il formato e, se PS4, prova orbis-pub-cmd per un check/estrazione veloce
116 with open(input_file, "rb") as fp:
117 magic = struct.unpack(">I", fp.read(4))[0]
118 
119 if magic == PackagePS4.MAGIC_PS4:
120 # Prova orbis-pub-cmd se disponibile (solo per tentativi manuali/singoli)
121 exe = self._find_orbis_pub_cmd()
122 if exe:
123 if len(passcode) != 32:
124 return f"[-] Invalid passcode length: {len(passcode)}"
125 ok, msg = self._orbis_validate(exe, input_file, passcode)
126 if ok:
127 ok2, msg2 = self._orbis_extract(exe, input_file, output_directory, passcode)
128 if ok2:
129 self.passcode_found = True
130 self.found_passcode = passcode
131 return f"[+] Successfully decrypted (orbis) with passcode: {passcode}\n{msg2}"
132 # Se validata ma estrazione fallita, continua con fallback interno
133 # Se non ok o exe non presente, continua con fallback interno
134 
135 # Load package only if not already loaded or for a different file
136 if not self.package or getattr(self.package, 'original_file', None) != input_file:
137 if magic == PackagePS4.MAGIC_PS4:
138 self.package = PackagePS4(input_file)
139 elif magic == PackagePS5.MAGIC_PS5:
140 self.package = PackagePS5(input_file)
141 elif magic == PackagePS3.MAGIC_PS3:
142 self.package = PackagePS3(input_file)
143 else:
144 return f"[-] Unknown PKG format: {magic:08X}"
145 
146 if not self.package.is_encrypted():
147 self.package.extract_all_files(output_directory)
148 return "[+] Package is not encrypted. Files extracted."
149 
150 try:
151 # Validate only length to avoid extra overhead
152 if len(passcode) != 32:
153 return f"[-] Invalid passcode length: {len(passcode)}"
154 
155 self.package.extract_with_passcode(passcode, output_directory)
156 self.passcode_found = True
157 self.found_passcode = passcode
158 return f"[+] Successfully decrypted with passcode: {passcode}"
159 except ValueError as e:
160 return f"[-] Failed to decrypt with passcode: {str(e)}"
161 
162 except Exception as e:
163 logging.error(f"Error trying passcode: {str(e)}")
164 return f"[-] Error: {str(e)}"
165 
166 def brute_force_passcode(self, input_file, output_directory, progress_callback=None, manual_passcode=None, num_workers: int = 1, tested_callback=None, seed: int | None = None):
167 """Brute force or try specific passcode"""
168 self.ensure_output_directory(output_directory)
169 self._state_path = self._get_state_path(input_file)
170 self._stop = False
171 num_workers = max(1, int(num_workers or 1))
172 # Seed deterministico (persistente se riprende lo stato):
173 # se non fornito, deriviamo da tempo + path
174 if seed is None:
175 seed_material = f"{input_file}|{time.time()}".encode('utf-8')
176 seed_hash = hashlib.sha256(seed_material).digest()
177 else:
178 seed_hash = hashlib.sha256(str(int(seed)).encode('utf-8')).digest()
179 
180 try:
181 # Determine package type and create appropriate instance
182 with open(input_file, "rb") as fp:
183 magic = struct.unpack(">I", fp.read(4))[0]
184 if magic == PackagePS4.MAGIC_PS4:
185 self.package = PackagePS4(input_file)
186 elif magic == PackagePS5.MAGIC_PS5:
187 self.package = PackagePS5(input_file)
188 elif magic == PackagePS3.MAGIC_PS3:
189 self.package = PackagePS3(input_file)
190 else:
191 return f"[-] Unknown PKG format: {magic:08X}"
192 
193 if not self.package.is_encrypted():
194 self.package.extract_all_files(output_directory)
195 return "[+] Package is not encrypted. Files extracted."
196 
197 if progress_callback:
198 progress_callback("[+] Package is encrypted. Starting decryption...")
199 
200 # Se è fornito un passcode manuale, prova solo quello
201 if manual_passcode:
202 try:
203 self.validate_passcode(manual_passcode)
204 # Per il tentativo manuale, try_passcode proverà prima orbis (se PS4) poi il fallback interno
205 result = self.try_passcode(input_file, output_directory, manual_passcode)
206 if progress_callback:
207 progress_callback(result)
208 return result
209 except ValueError as e:
210 return f"[-] Invalid passcode format: {str(e)}"
211 
212 # Prova a ripristinare lo stato precedente
213 self._maybe_load_state(input_file, progress_callback)
214 self._last_checkpoint_ts = time.time()
215 
216 # Progress tracking
217 start_ts = time.time()
218 last_report_ts = start_ts
219 last_report_attempts = self._attempts_done
220 last_test_emit_ts = start_ts
221 
222 if num_workers == 1:
223 # Single-threaded fast path: reuse loaded package
224 counter = 0
225 while not self.passcode_found and not self._stop:
226 passcode = self._code_from_counter(counter, seed_hash)
227 self.last_used_passcode = passcode
228 self._attempts_done += 1
229 
230 try:
231 # Directly attempt using the loaded package to avoid reload overhead
232 if len(passcode) == 32:
233 self.package.extract_with_passcode(passcode, output_directory)
234 self.passcode_found = True
235 self.found_passcode = passcode
236 if progress_callback:
237 progress_callback(f"[+] Successfully decrypted with passcode: {passcode}")
238 break
239 else:
240 if progress_callback and (self._attempts_done % 5000 == 0):
241 progress_callback(f"[-] Invalid passcode length: {len(passcode)}")
242 except ValueError as e:
243 # Only occasionally report failures to reduce UI overhead
244 if progress_callback and (self._attempts_done % 200 == 0):
245 progress_callback(f"[-] Failed attempt #{self._attempts_done}: {str(e)}")
246 
247 # Emit tested passcode rate-limited
248 now = time.time()
249 if tested_callback and (self._attempts_done % 50 == 0 or (now - last_test_emit_ts) >= 0.1):
250 tested_callback(passcode)
251 last_test_emit_ts = now
252 
253 # Periodic rate reporting and checkpoint
254 if now - last_report_ts >= 1.0:
255 delta_attempts = self._attempts_done - last_report_attempts
256 rate = delta_attempts / max(1e-9, (now - last_report_ts))
257 if progress_callback:
258 progress_callback(f"[~] Attempts: {self._attempts_done} | Rate: {rate:.0f}/s")
259 last_report_ts = now
260 last_report_attempts = self._attempts_done
261 self._maybe_checkpoint(progress_callback)
262 counter += 1
263 else:
264 # Multi-threaded deterministic: ogni worker ha (counter = worker_id; step = num_workers)
265 stop_flag = threading.Event()
266 
267 def worker(worker_id: int):
268 # Each worker has its own package instance
269 try:
270 with open(input_file, "rb") as fp2:
271 magic2 = struct.unpack(">I", fp2.read(4))[0]
272 if magic2 == PackagePS4.MAGIC_PS4:
273 pkg = PackagePS4(input_file)
274 elif magic2 == PackagePS5.MAGIC_PS5:
275 pkg = PackagePS5(input_file)
276 elif magic2 == PackagePS3.MAGIC_PS3:
277 pkg = PackagePS3(input_file)
278 else:
279 return
280 except Exception:
281 return
282 
283 counter_local = worker_id # offset diverso per worker
284 step = num_workers
285 last_emit_local = time.time()
286 while not stop_flag.is_set() and not self._stop and not self.passcode_found:
287 code = self._code_from_counter(counter_local, seed_hash)
288 try:
289 if len(code) == 32:
290 pkg.extract_with_passcode(code, output_directory)
291 # Found!
292 self.passcode_found = True
293 self.found_passcode = code
294 stop_flag.set()
295 if progress_callback:
296 progress_callback(f"[+] Successfully decrypted with passcode: {code}")
297 break
298 except ValueError:
299 pass
300 finally:
301 # Count attempts regardless of success
302 self._attempts_done += 1
303 # Emit tested code rate-limited per worker
304 now_l = time.time()
305 if tested_callback and (self._attempts_done % 100 == 0 or (now_l - last_emit_local) >= 0.15):
306 tested_callback(code)
307 last_emit_local = now_l
308 counter_local += step
309 
310 # Start producer and workers
311 workers = [threading.Thread(target=worker, args=(i,), daemon=True) for i in range(num_workers)]
312 for t in workers:
313 t.start()
314 
315 # Monitor loop for progress and exit conditions
316 while not self.passcode_found and not self._stop:
317 now = time.time()
318 if now - last_report_ts >= 1.0:
319 delta_attempts = self._attempts_done - last_report_attempts
320 rate = delta_attempts / max(1e-9, (now - last_report_ts))
321 if progress_callback:
322 progress_callback(f"[~] Attempts: {self._attempts_done} | Threads: {num_workers} | Rate: {rate:.0f}/s")
323 last_report_ts = now
324 last_report_attempts = self._attempts_done
325 self._maybe_checkpoint(progress_callback)
326 time.sleep(0.05)
327 
328 stop_flag.set()
329 
330 if self.passcode_found:
331 success_file_name = f"{input_file}.success"
332 try:
333 with open(success_file_name, "w") as success_file:
334 success_file.write(self.found_passcode)
335 # Rimuovi lo stato salvato poiché abbiamo finito
336 try:
337 if self._state_path and os.path.exists(self._state_path):
338 os.remove(self._state_path)
339 except Exception:
340 pass
341 return f"[+] Passcode found: {self.found_passcode}\n[+] Passcode has been saved to: {success_file_name}"
342 except Exception as e:
343 return f"[+] Passcode found: {self.found_passcode}\n[-] Failed to create/save the success file: {e}"
344 else:
345 # Salva lo stato anche in caso di stop o chiusura
346 self._save_state(input_file)
347 return "[-] Passcode not found or process stopped. Progress saved."
348 
349 except FileNotFoundError:
350 return f"[-] Package file not found: {input_file}"
351 except Exception as e:
352 logging.error(f"Error during brute force: {str(e)}")
353 return f"[-] Error: {str(e)}"
354 
355 def ensure_output_directory(self, output_directory):
356 """Assicura che la directory di output esista"""
357 os.makedirs(output_directory, exist_ok=True)
358 
359 def get_package(self):
360 """Restituisce l'oggetto package corrente"""
361 return self.package
362 
363 def set_debug_mode(self, enabled):
364 """Imposta la modalità debug"""
365 self.debug_mode = enabled
366 
367 def set_silence_mode(self, enabled):
368 """Imposta la modalità silenziosa"""
369 self.silence_mode = enabled
370 
371 def stop(self):
372 """Richiedi l'arresto del processo di brute force"""
373 self._stop = True
374 
375 # ----------------------
376 # Stato e checkpointing
377 # ----------------------
378 def _get_state_path(self, input_file):
379 """Restituisce il percorso del file di stato per l'input specificato"""
380 return f"{input_file}.brutestate.json"
381 
382 def _maybe_load_state(self, input_file, progress_callback=None):
383 try:
384 if not self._state_path or not os.path.exists(self._state_path):
385 return
386 with open(self._state_path, "r", encoding="utf-8") as f:
387 data = json.load(f)
388 if data.get("input_file") != os.path.abspath(input_file):
389 return
390 state_b64 = data.get("rng_state_b64")
391 if state_b64:
392 rng_state = pickle.loads(base64.b64decode(state_b64.encode("utf-8")))
393 self._rng.setstate(rng_state)
394 self._attempts_done = int(data.get("attempts_done", 0))
395 self.last_used_passcode = data.get("last_passcode", "")
396 if progress_callback:
397 progress_callback(f"[+] Resumed previous session: attempts={self._attempts_done}")
398 except Exception as e:
399 logging.error(f"Failed to load bruteforce state: {e}")
400 
401 def _maybe_checkpoint(self, progress_callback=None):
402 try:
403 now = time.time()
404 if (self._attempts_done % self._checkpoint_every_attempts == 0) or (now - self._last_checkpoint_ts >= self._checkpoint_every_seconds):
405 self._save_state(self.package.file_path if hasattr(self.package, 'file_path') else "")
406 self._last_checkpoint_ts = now
407 if progress_callback:
408 progress_callback(f"[+] Checkpoint saved: attempts={self._attempts_done}")
409 except Exception as e:
410 logging.error(f"Failed to checkpoint bruteforce state: {e}")
411 
412 def _save_state(self, input_file):
413 try:
414 if not self._state_path:
415 self._state_path = self._get_state_path(input_file)
416 data = {
417 "version": 1,
418 "input_file": os.path.abspath(input_file),
419 "attempts_done": self._attempts_done,
420 "last_passcode": self.last_used_passcode,
421 "rng_state_b64": base64.b64encode(pickle.dumps(self._rng.getstate())).decode("utf-8"),
422 "timestamp": time.time(),
423 }
424 with open(self._state_path, "w", encoding="utf-8") as f:
425 json.dump(data, f, indent=2)
426 except Exception as e:
427 logging.error(f"Failed to save bruteforce state: {e}")
428