Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
tools/repack.py
PkgToolBox / tools / repack.py
1import struct
2import os
3import logging
4import shutil
5from Utilities import Logger
6from tools.PS5_Game_Info import PS5GameInfo
7import json
8 
9class Repack:
10 FLAG_ENCRYPTED = 0x1
11 FLAG_DECRYPTED = 0x2
12 
13 def __init__(self, original_file, pkg_table_offset, pkg_entry_count, files):
14 self.original_file = original_file
15 self.pkg_table_offset = pkg_table_offset
16 self.pkg_entry_count = pkg_entry_count
17 self.files = files
18 
19 def repack(self, input_dir, output_pkg_file, log_file_path, progress_callback=None):
20 """
21 Repack the modified files into a new package.
22 :param input_dir: Directory containing the modified files.
23 :param output_pkg_file: Path where the new package will be saved.
24 :param log_file_path: Path where the log file will be saved.
25 :param progress_callback: Optional callback for reporting progress.
26 """
27 temp_pkg_file = output_pkg_file + ".tmp"
28 new_offset = self.pkg_table_offset + (self.pkg_entry_count * struct.calcsize(">6IQ"))
29 
30 with open(self.original_file, 'rb') as pkg_file, open(temp_pkg_file, 'wb') as new_pkg:
31 # Copy the package header
32 header_size = self.pkg_table_offset
33 new_pkg.write(pkg_file.read(header_size))
34 
35 # Repack the modified files
36 for file_id, file_info in self.files.items():
37 input_file_path = os.path.join(input_dir, file_info.get("name", f"file_{file_id}"))
38
39 # Read the content from the file system or the original package
40 if os.path.exists(input_file_path):
41 with open(input_file_path, 'rb') as input_file:
42 file_content = input_file.read()
43 else:
44 pkg_file.seek(file_info['offset'])
45 file_content = pkg_file.read(file_info['size'])
46 
47 # Update the offset and size in file_info
48 file_info['offset'] = new_offset
49 file_info['size'] = len(file_content)
50 
51 # Write the file content to the new package
52 new_pkg.write(file_content)
53 new_offset += len(file_content)
54 
55 if progress_callback:
56 progress_callback({"status": f"Processed: {file_info.get('name', f'file_{file_id}')}"})
57 
58 
59 # Rewrite the updated file table
60 self._write_file_table(new_pkg)
61 
62 # Replace the original package file with the new one
63 os.remove(self.original_file)
64 os.rename(temp_pkg_file, self.original_file)
65 
66 logging.info(f"Repack completed. Log saved to: {log_file_path}")
67 return f"Repack completed. Log saved to: {log_file_path}"
68 
69 def reverse_dump(self, input_dir):
70 """
71 Reinsert the dumped files into a new package file in a new directory.
72 :param input_dir: Directory containing the dumped files.
73 """
74 try:
75 original_dir, original_filename = os.path.split(self.original_file)
76 modified_dir = os.path.join(original_dir, "modified_pkg")
77 os.makedirs(modified_dir, exist_ok=True)
78
79 new_pkg_file = os.path.join(modified_dir, original_filename)
80 log_file_path = os.path.join(modified_dir, "reverse_dump_log.txt")
81
82 with open(log_file_path, 'w') as log_file, open(self.original_file, 'rb') as pkg_file, open(new_pkg_file, 'wb') as new_pkg:
83 # Copy the entire original PKG file first
84 pkg_file.seek(0)
85 shutil.copyfileobj(pkg_file, new_pkg)
86
87 # Now, update only the modified files
88 for file_id, file_info in self.files.items():
89 file_name = file_info.get("name", f"file_{file_id}")
90 input_file_path = os.path.join(input_dir, file_name)
91
92 if os.path.exists(input_file_path):
93 with open(input_file_path, 'rb') as input_file:
94 file_content = input_file.read()
95
96 # Update the file in the new PKG
97 new_pkg.seek(file_info['offset'])
98 new_pkg.write(file_content)
99
100 # Update file info if size has changed
101 new_size = len(file_content)
102 if new_size != file_info['size']:
103 file_info['size'] = new_size
104 log_message = f"Updated: {file_name}, offset: {file_info['offset']}, new size: {new_size}\n"
105 else:
106 log_message = f"Replaced: {file_name}, offset: {file_info['offset']}, size: {new_size}\n"
107 else:
108 log_message = f"Kept original: {file_name}, offset: {file_info['offset']}, size: {file_info['size']}\n"
109 
110 log_file.write(log_message)
111 Logger.log_information(log_message.strip())
112 
113 # Update the file table
114 self._write_file_table(new_pkg)
115 
116 Logger.log_information(f"Reverse dump completed. New package saved as: {new_pkg_file}")
117 Logger.log_information(f"Log saved in: {log_file_path}")
118 return f"Reverse dump completed. New package saved as: {new_pkg_file}\nLog saved in: {log_file_path}"
119 except Exception as e:
120 Logger.log_error(f"Error during reverse dump: {str(e)}")
121 return f"Reverse dump failed. Error: {str(e)}"
122 
123 def verify_and_adapt_file(self, file_name, file_content, file_info, ps5_info):
124 if file_name == "eboot.bin":
125 return self.adapt_eboot(file_content, file_info, ps5_info)
126 elif file_name.startswith("sce_sys/"):
127 return self.adapt_sce_sys_file(file_name, file_content, file_info, ps5_info)
128 else:
129 return file_content
130 
131 def adapt_eboot(self, file_content, file_info, ps5_info):
132 Logger.log_information("Verifying and adapting eboot.bin")
133 if ps5_info.Fcheck == '(<span style=" color:#55aa00;">Fake</span>)':
134 Logger.log_warning("eboot.bin is detected as fake. Proceeding with caution.")
135 # Aggiungi qui ulteriori controlli e adattamenti per eboot.bin
136 return file_content
137 
138 def adapt_sce_sys_file(self, file_name, file_content, file_info, ps5_info):
139 Logger.log_information(f"Verifying and adapting {file_name}")
140 if file_name == "sce_sys/param.json":
141 # Verifica che i dati in param.json corrispondano a quelli in ps5_info.main_dict
142 try:
143 param_data = json.loads(file_content.decode('utf-8'))
144 for key, value in ps5_info.main_dict.items():
145 if key in param_data and str(param_data[key]) != str(value):
146 Logger.log_warning(f"Mismatch in param.json for {key}: expected {value}, found {param_data[key]}")
147 except json.JSONDecodeError:
148 Logger.log_error("Error decoding param.json")
149 # Aggiungi qui ulteriori controlli e adattamenti per altri file in sce_sys
150 return file_content
151 
152 def _write_file_table(self, pkg_file):
153 """
154 Write the updated file table to the package.
155 :param pkg_file: Package file opened in write mode.
156 """
157 pkg_file.seek(self.pkg_table_offset)
158 for file_id, file_info in self.files.items():
159 entry = struct.pack(">6IQ",
160 file_id,
161 file_info['fn_offset'],
162 file_info['flags1'],
163 file_info['flags2'],
164 file_info['offset'],
165 file_info['size'],
166 file_info.get('padding', 0))
167 pkg_file.write(entry)
168 
169 def verify_file_integrity(self, file_path, expected_size):
170 if os.path.exists(file_path):
171 actual_size = os.path.getsize(file_path)
172 if actual_size != expected_size:
173 Logger.log_error(f"File integrity check failed for {file_path}. Expected size: {expected_size}, Actual size: {actual_size}")
174 return False
175 return True
176 else:
177 Logger.log_error(f"File not found: {file_path}")
178 return False