Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
packages/package_base.py
PkgToolBox / packages / package_base.py
1import os
2import struct
3import io
4import json
5import re
6import unicodedata
7from PIL import Image
8from .enums import DRMType, ContentType, IROTag
9from Utilities import Logger
10 
11class PackageBase:
12 TYPE_MASK = 0x0000FFFF
13 FLAG_RETAIL = 1 << 31
14 FLAG_ENCRYPTED = 0x80000000
15 
16 def __init__(self, file: str):
17 if not os.path.isfile(file):
18 raise FileNotFoundError(f"The PKG file '{file}' does not exist.")
19
20 self.original_file = file
21 self.pkg_info = {}
22 self.files = {}
23 self.content_id = None
24 self.drm_type = None
25 self.content_type = None
26 self.content_flags = None
27 self.iro_tag = None
28 self.version_date = None
29 self.version_hash = None
30 self.digest_table_hash = None
31 self.entry_table_offset = None
32 self.entry_table_size = None
33 
34 def _safe_decode(self, data):
35 if isinstance(data, str):
36 return data.rstrip('\x00')
37 elif isinstance(data, bytes):
38 try:
39 return data.decode('utf-8', errors='ignore').rstrip('\x00')
40 except UnicodeDecodeError:
41 return data.decode('latin-1', errors='ignore').rstrip('\x00')
42 elif isinstance(data, int):
43 return str(data)
44 else:
45 return str(data)
46 
47 def _read_null_terminated_string(self, fp):
48 result = bytearray()
49 while True:
50 try:
51 char = fp.read(1)
52 if char == b'\x00' or len(char) == 0:
53 break
54 result.extend(char)
55 except (OverflowError, OSError) as e:
56 Logger.log_warning(f"Error reading string: {e}")
57 break
58 return bytes(result)
59 
60 def get_info(self):
61 return {
62 "content_id": self.content_id,
63 "drm_type": self.drm_type,
64 "content_type": self.content_type,
65 "content_flags": self.content_flags,
66 "iro_tag": self.iro_tag,
67 "version_date": self.version_date,
68 "version_hash": self.version_hash,
69 "digest_table_hash": self.digest_table_hash,
70 "entry_table_offset": self.entry_table_offset,
71 "entry_table_size": self.entry_table_size,
72 }
73 
74 def read_file(self, file_id):
75 file_info = self.files.get(file_id)
76 if not file_info:
77 raise ValueError(f"File with ID {file_id} not found in the package.")
78
79 with open(self.original_file, 'rb') as f:
80 f.seek(file_info['offset'])
81 data = f.read(file_info['size'])
82
83 return data
84 
85 def extract_all_files(self, output_dir: str):
86 """Extract all files listed in self.files to the specified output directory.
87 
88 This is a generic implementation used by package types. It expects
89 `self.files` to be a mapping of file_id -> dict with at least:
90 - 'offset': byte offset in the source package
91 - 'size': size in bytes
92 - optional 'name': output relative path
93 """
94 try:
95 os.makedirs(output_dir, exist_ok=True)
96 
97 if not isinstance(self.files, dict) or not self.files:
98 Logger.log_warning("No files table available for extraction.")
99 return f"No files to extract. Output: {output_dir}"
100 
101 with open(self.original_file, 'rb') as src:
102 for file_id, info in self.files.items():
103 try:
104 name = info.get('name', f'file_{file_id}')
105 # Normalize any path-like names to avoid traversal
106 safe_name = os.path.normpath(name).lstrip(os.sep).replace('..', '_')
107 out_path = os.path.join(output_dir, safe_name)
108 os.makedirs(os.path.dirname(out_path), exist_ok=True)
109 
110 offset = info.get('offset')
111 size = info.get('size')
112 if offset is None or size is None:
113 Logger.log_warning(f"Skipping file_id {file_id}: missing offset/size")
114 continue
115 
116 src.seek(offset)
117 chunk = src.read(size)
118 with open(out_path, 'wb') as dst:
119 dst.write(chunk)
120 Logger.log_information(f"Extracted: {safe_name}")
121 except Exception as e:
122 Logger.log_error(f"Error extracting file_id {file_id}: {e}")
123 
124 Logger.log_information(f"Extraction completed. Output: {output_dir}")
125 return f"Extraction completed. Output: {output_dir}"
126 except Exception as e:
127 Logger.log_error(f"Extraction failed: {e}")
128 raise
129