Last active 5 hours ago

python script to export razzmatazz compatible wav files

Revision 1352d7e4676c3f532957156bb9c26d4b8dda3eb1

razzmatazz Raw
1#!/usr/bin/env python3
2"""Fix audio files to meet Razzmatazz WAV requirements."""
3
4import json
5import os
6import re
7import shutil
8import subprocess
9import sys
10import time
11import wave
12from collections import defaultdict
13from shutil import get_terminal_size
14
15FORBIDDEN_CHARS = re.compile(r'["/\\?*<>:|]')
16ALLOWED_BIT_DEPTHS = {16, 24, 32}
17ALLOWED_CHANNELS = {1, 2}
18NATIVE_SAMPLE_RATE = 48000
19MAX_PATH_LENGTH = 255
20AUDIO_EXTENSIONS = {'.wav', '.mp3', '.flac', '.ogg', '.aiff', '.aif', '.wma', '.m4a', '.aac', '.ape', '.opus'}
21
22SAMPLE_FMT_TO_BITS = {
23 's16': 16, 's16le': 16, 's16be': 16, 'u16': 16,
24 's32': 32, 's32le': 32, 's32be': 32,
25 's24': 24, 's24le': 24, 's24be': 24,
26 'flt': 32, 'fltle': 32, 'fltbe': 32,
27 'dbl': 64,
28 'u8': 8, 's8': 8,
29}
30
31
32def ffprobe(path):
33 try:
34 r = subprocess.run(
35 ['ffprobe', '-v', 'quiet', '-print_format', 'json',
36 '-show_streams', path],
37 capture_output=True, text=True, timeout=30)
38 if r.returncode != 0:
39 return None
40 data = json.loads(r.stdout)
41 for s in data.get('streams', []):
42 if s.get('codec_type') == 'audio':
43 return s
44 except Exception:
45 return None
46 return None
47
48
49def check_file(path):
50 issues = []
51
52 if len(path) > MAX_PATH_LENGTH:
53 issues.append(f"path length {len(path)} > {MAX_PATH_LENGTH}")
54
55 basename = os.path.basename(path)
56 if FORBIDDEN_CHARS.search(basename):
57 issues.append("filename has forbidden characters")
58
59 ext = os.path.splitext(path)[1].lower()
60
61 if ext == '.wav':
62 try:
63 with wave.open(path, 'rb') as wf:
64 params = wf.getparams()
65 ch = params.nchannels
66 bd = params.sampwidth * 8
67 sr = params.framerate
68 if ch not in ALLOWED_CHANNELS:
69 issues.append(f"{ch} channels (need 1 or 2)")
70 if bd not in ALLOWED_BIT_DEPTHS:
71 issues.append(f"{bd}-bit (need 16, 24, or 32)")
72 if sr != NATIVE_SAMPLE_RATE:
73 issues.append(f"{sr} Hz sample rate (native is {NATIVE_SAMPLE_RATE})")
74 except Exception:
75 issues.append("not a valid WAV or unreadable")
76 else:
77 issues.append(f"not WAV ({ext})")
78 info = ffprobe(path)
79 if info is None:
80 issues.append("could not read audio info")
81 else:
82 ch = info.get('channels', 0)
83 sr = info.get('sample_rate', '?')
84 fmt = info.get('sample_fmt', '?')
85 bd = SAMPLE_FMT_TO_BITS.get(fmt, '?')
86 if ch not in ALLOWED_CHANNELS:
87 issues.append(f"{ch} channels (need 1 or 2)")
88 if isinstance(bd, int) and bd not in ALLOWED_BIT_DEPTHS:
89 issues.append(f"{bd}-bit (need 16, 24, or 32)")
90 if sr not in (None, '?') and int(sr) != NATIVE_SAMPLE_RATE:
91 issues.append(f"{sr} Hz sample rate (native is {NATIVE_SAMPLE_RATE})")
92
93 return issues
94
95
96def fix_filename(name):
97 root, ext = os.path.splitext(name)
98 root = FORBIDDEN_CHARS.sub('_', root)
99 root = re.sub(r'_+', '_', root).strip('_')
100 root = root or 'fixed'
101 return root + '.wav'
102
103
104def process_file(path, outpath, quiet=False):
105 rel = os.path.relpath(path)
106
107 issues = check_file(path)
108
109 ext = os.path.splitext(path)[1].lower()
110 is_wav = (ext == '.wav')
111
112 os.makedirs(os.path.dirname(outpath) or '.', exist_ok=True)
113
114 clean_name = os.path.basename(outpath)
115 name_changed = (os.path.basename(path) != clean_name)
116
117 if is_wav and not issues and not name_changed:
118 shutil.copy2(path, outpath)
119 if not quiet:
120 print(f"OK {rel}")
121 return True
122
123 if not quiet:
124 parts = issues[:]
125 if name_changed:
126 parts.append(f"filename → {clean_name}")
127 print(f"FIX {rel}: {', '.join(parts)}")
128
129 if is_wav:
130 try:
131 with wave.open(path, 'rb') as wf:
132 params = wf.getparams()
133 ch = params.nchannels
134 bd = params.sampwidth * 8
135 sr = params.framerate
136 except Exception:
137 if not quiet:
138 print(f"FAIL {rel}: cannot parse WAV header, copying as-is")
139 shutil.copy2(path, outpath)
140 return False
141 else:
142 info = ffprobe(path)
143 if info is None:
144 if not quiet:
145 print(f"FAIL {rel}: cannot read audio info, copying as-is")
146 shutil.copy2(path, outpath)
147 return False
148 ch = info.get('channels', 2)
149 sr = int(info.get('sample_rate', NATIVE_SAMPLE_RATE))
150 fmt = info.get('sample_fmt', 's16')
151 bd = SAMPLE_FMT_TO_BITS.get(fmt, 16)
152
153 target_ch = ch if ch in ALLOWED_CHANNELS else 2
154 target_bd = bd if bd in ALLOWED_BIT_DEPTHS else 16
155 acodec = {16: 'pcm_s16le', 24: 'pcm_s24le', 32: 'pcm_s32le'}[target_bd]
156
157 cmd = ['ffmpeg', '-y', '-i', path]
158
159 needs_recode = (target_bd != bd) or (sr != NATIVE_SAMPLE_RATE) or (target_ch != ch) or (not is_wav)
160
161 if needs_recode:
162 if target_ch != ch:
163 cmd += ['-ac', str(target_ch)]
164 cmd += ['-acodec', acodec]
165 if sr != NATIVE_SAMPLE_RATE:
166 cmd += ['-ar', str(NATIVE_SAMPLE_RATE)]
167 cmd += ['-af', 'aresample=resampler=soxr']
168 else:
169 cmd += ['-c', 'copy']
170 cmd.append(outpath)
171
172 result = subprocess.run(cmd, capture_output=True, text=True)
173 if result.returncode != 0:
174 if not quiet:
175 print(f"FAIL {rel}: ffmpeg error — {result.stderr.strip()}")
176 return False
177 return True
178
179
180class ProgressBar:
181 def __init__(self, total):
182 self.total = total
183 self.current = 0
184 self.success = 0
185 self.failed = 0
186 self.start_time = time.time()
187 self.last_draw = 0
188
189 def update(self, n=1, success=True):
190 self.current += n
191 if success:
192 self.success += 1
193 else:
194 self.failed += 1
195 self._draw()
196
197 def _draw(self):
198 now = time.time()
199 if now - self.last_draw < 0.1:
200 return
201 self.last_draw = now
202
203 cols, _ = get_terminal_size((80, 24))
204 elapsed = now - self.start_time
205
206 pct = self.current / self.total if self.total else 0
207 bar_w = cols - 30
208 if bar_w < 10:
209 bar_w = 10
210 filled = int(bar_w * pct)
211 bar = '[' + '=' * filled + '>' * min(1, bar_w - filled) + '.' * (bar_w - filled - min(1, bar_w - filled)) + ']'
212
213 eta = (elapsed / self.current * (self.total - self.current)) if self.current > 0 else 0
214
215 line = f"\r{bar} {pct * 100:5.1f}% {self.current}/{self.total} ETA {eta:.0f}s"
216 sys.stdout.write(line[:cols])
217 sys.stdout.flush()
218
219 def done(self):
220 elapsed = time.time() - self.start_time
221 line = f"\r{' ' * get_terminal_size((80, 24)).columns}\r"
222 sys.stdout.write(line)
223 sys.stdout.flush()
224 print(f"Done — {self.success} ok, {self.failed} failed ({elapsed:.1f}s)")
225
226
227def main():
228 import argparse
229 parser = argparse.ArgumentParser(description='Fix audio files to valid Razzmatazz WAVs.')
230 parser.add_argument('paths', nargs='+', help='Audio files or directories')
231 parser.add_argument('--out', '-o', default='fixed_wavs', help='Output directory (default: fixed_wavs)')
232 parser.add_argument('--dry-run', '-n', action='store_true', help='Preview without converting')
233 parser.add_argument('--no-progress', '-P', action='store_true', help='Disable progress bar (one line per file)')
234 args = parser.parse_args()
235
236 files = []
237 for p in args.paths:
238 if os.path.isdir(p):
239 for root, _, filenames in os.walk(p):
240 for f in filenames:
241 if os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS:
242 files.append(os.path.join(root, f))
243 else:
244 files.append(p)
245
246 if not files:
247 print("No supported audio files found.")
248 print(f"Extensions: {', '.join(sorted(AUDIO_EXTENSIONS))}")
249 sys.exit(0)
250
251 files.sort()
252
253 # Resolve output paths
254 out_paths = []
255 for path in files:
256 cwd = os.getcwd()
257 try:
258 rel_to_cwd = os.path.relpath(path, cwd)
259 subdir = '' if rel_to_cwd.startswith('..') else os.path.dirname(rel_to_cwd)
260 except ValueError:
261 subdir = ''
262 clean = fix_filename(os.path.basename(path))
263 d = os.path.join(args.out, subdir) if subdir else args.out
264 out_paths.append(os.path.join(d, clean))
265
266 used = defaultdict(int)
267 resolved = {}
268 for p, path in zip(out_paths, files):
269 if used[p] > 0:
270 root, ext = os.path.splitext(p)
271 p = f"{root}_{used[p]}{ext}"
272 used[p] += 1
273 resolved[path] = p
274 while True:
275 np = os.path.normpath(p)
276 cn = os.path.basename(np)
277 if len(cn) > 255:
278 root, ext = os.path.splitext(cn)
279 cn = root[:255 - len(ext)] + ext
280 if len(np) > 255:
281 d = os.path.dirname(np)
282 available = 255 - len(d) - 1
283 root, ext = os.path.splitext(cn)
284 cn = root[:available - len(ext)] + ext
285 resolved[path] = os.path.join(os.path.dirname(np), cn)
286 break
287
288 # Scan phase
289 use_progress = not args.dry_run and not args.no_progress
290 if not use_progress:
291 # Original mode: one line per file
292 ok = failed = 0
293 for path in files:
294 if process_file(path, resolved[path], quiet=False):
295 ok += 1
296 else:
297 failed += 1
298 print(f"\n{ok} processed, {failed} failed")
299 sys.exit(1 if failed else 0)
300
301 # --- Progress bar mode ---
302 n_total = len(files)
303
304 # Scan: check all files quickly
305 print(f"Scanning {n_total} files...")
306 ok_count = fix_count = skip_count = 0
307 for path in files:
308 issues = check_file(path)
309 clean_name = os.path.basename(resolved[path])
310 name_changed = (os.path.basename(path) != clean_name)
311 ext = os.path.splitext(path)[1].lower()
312 if issues or name_changed or ext != '.wav':
313 fix_count += 1
314 else:
315 ok_count += 1
316
317 print(f" {ok_count} already valid, {fix_count} need processing")
318 if fix_count == 0:
319 # Just copy everything
320 bar = ProgressBar(n_total)
321 for path in files:
322 ok = process_file(path, resolved[path], quiet=True)
323 bar.update(success=ok)
324 bar.done()
325 sys.exit(0)
326
327 # Process with progress bar
328 bar = ProgressBar(n_total)
329 for path in files:
330 ok = process_file(path, resolved[path], quiet=True)
331 bar.update(success=ok)
332 bar.done()
333 sys.exit(1 if bar.failed else 0)
334
335
336if __name__ == '__main__':
337 main()
338