-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathengine.py
More file actions
254 lines (209 loc) · 9.22 KB
/
engine.py
File metadata and controls
254 lines (209 loc) · 9.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import sys
import os
import threading
import time
import numpy as np
import pyaudio
import keyboard
import pygame
import json
import pyperclip
from faster_whisper import WhisperModel
import hashlib
import ctypes
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
from comtypes import CLSCTX_ALL, CoInitialize, CoUninitialize
# --- CONFIG ---
SAMPLE_RATE = 16000
CHUNK_SIZE = 1024
DEFAULT_MODEL_SIZE = "tiny.en"
CONSOLE_TITLE = "PaceEngine"
class PaceEngine:
def __init__(self):
# 1. Name the process for the Electron killer
if sys.platform == 'win32':
ctypes.windll.kernel32.SetConsoleTitleW(CONSOLE_TITLE)
self.lock = threading.Lock()
self.is_recording = False
self.pa = pyaudio.PyAudio()
self.last_audio_hash = None
self.last_sound_time = 0
self.last_typed_text = ""
self.last_typed_time = 0
self.model_size = DEFAULT_MODEL_SIZE
self.pre_mute_state = False
# Audio Feedback
pygame.mixer.init()
try:
# Use relative paths for portability and privacy
base_dir = os.path.dirname(os.path.abspath(__file__))
self.start_snd = pygame.mixer.Sound(os.path.join(base_dir, "start.mp3"))
self.stop_snd = pygame.mixer.Sound(os.path.join(base_dir, "stop.mp3"))
except:
self.start_snd = self.stop_snd = None
self.model = self._load_model()
def mute_pc(self, mute=True):
if sys.platform != 'win32': return
# --- NEW WAY: Hardware Virtual Keys ---
# 0xAD is VK_VOLUME_MUTE. We simulate a physical key press.
# This bypasses all library/COM/object errors because it's at the OS input level.
VK_VOLUME_MUTE = 0xAD
try:
# We use keybd_event to send the MUTE command directly to the Windows shell
# This is exactly what happens when you press the 'Mute' button on your keyboard.
ctypes.windll.user32.keybd_event(VK_VOLUME_MUTE, 0, 0, 0) # Press
ctypes.windll.user32.keybd_event(VK_VOLUME_MUTE, 0, 2, 0) # Release
if mute:
print("DEBUG: Hardware Mute Toggled (ON)", file=sys.stderr)
else:
print("DEBUG: Hardware Mute Toggled (OFF)", file=sys.stderr)
except Exception as e:
print(f"DEBUG: Hardware Mute failed: {e}", file=sys.stderr)
def _load_model(self):
self.log("status", {"text": f"Pace AI Warming Up ({self.model_size})..."})
# Use a persistent path for models (appdata on windows)
if sys.platform == 'win32':
appdata = os.environ.get('APPDATA', os.path.expanduser('~'))
path = os.path.join(appdata, "Pace", "models")
else:
path = os.path.join(os.path.expanduser('~'), ".pace", "models")
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
model = WhisperModel(self.model_size, device="cpu", compute_type="int8", download_root=path, cpu_threads=4)
# Warmup
list(model.transcribe(np.zeros(16000, dtype=np.float32), beam_size=1))
self.log("engine_ready", {"model": self.model_size})
return model
def switch_model(self, size):
if size == self.model_size: return
self.model_size = size
self.model = self._load_model()
def log(self, msg_type, data):
print(json.dumps({"type": msg_type, **data}))
sys.stdout.flush()
def play_snd(self, snd):
if not snd: return
now = time.time()
if now - self.last_sound_time < 0.3: return # 300ms cooldown
self.last_sound_time = now
snd.play()
def run_session(self):
# ATOMIC LOCK: Only one session can EVER run at a time
if not self.lock.acquire(blocking=False):
return
session_id = int(time.time() * 1000)
try:
self.is_recording = True
self.log("status", {"isRecording": True, "sessionId": session_id})
self.play_snd(self.start_snd)
self.mute_pc(True)
# Fresh stream for every session
stream = self.pa.open(format=pyaudio.paInt16, channels=1, rate=SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE)
buffer = []
# RECORDING LOOP
while self.is_recording:
data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
buffer.append(data)
# Level for UI (throttled)
samples = np.frombuffer(data, dtype=np.int16)
rms = np.sqrt(np.mean(samples.astype(np.float32)**2))
self.log("level", {"level": float(rms / 1200.0)})
# STOPPING
stream.stop_stream()
stream.close()
self.mute_pc(False)
self.play_snd(self.stop_snd)
self.log("status", {"isRecording": False})
# TRANSCRIBING
if len(buffer) > 15:
raw_data = b"".join(buffer)
# Binary Hash Check: If audio is identical to last time, it's a driver cache leak
current_hash = hashlib.md5(raw_data).hexdigest()
if current_hash == self.last_audio_hash:
return
self.last_audio_hash = current_hash
audio_np = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32) / 32768.0
segments, _ = self.model.transcribe(audio_np, beam_size=1, vad_filter=True)
text = " ".join([s.text for s in segments]).strip()
if text:
now = time.time()
# FINAL TEXT GUARD: Don't type same text twice within 2 seconds
if text == self.last_typed_text and (now - self.last_typed_time) < 2.0:
print(f"DEBUG: Blocking duplicate text: '{text}'", file=sys.stderr)
return
print(f"DEBUG: Result {session_id}: '{text}'", file=sys.stderr)
self.last_typed_text = text
self.last_typed_time = now
# DIRECT TYPING
time.sleep(0.05)
keyboard.write(text, delay=0)
self.log("transcription", {"text": text, "id": session_id})
finally:
self.is_recording = False
self.lock.release()
def hotkey_monitor(engine):
hotkey_ready = True
# Windows Virtual Key Codes
VK_CONTROL = 0x11
VK_MENU = 0x12 # ALT key
def is_pressed(vk):
# Direct Windows API call for physical key state
# 0x8000 is the bitmask for "currently held down"
return (ctypes.windll.user32.GetAsyncKeyState(vk) & 0x8000) != 0
print("Hotkey loop started (Low-Level Windows API).", file=sys.stderr)
while True:
try:
# Check physical state directly from Windows hardware buffer
ctrl = is_pressed(VK_CONTROL)
alt = is_pressed(VK_MENU)
is_held = ctrl and alt
# START logic
if is_held and not engine.is_recording and hotkey_ready:
hotkey_ready = False
print("DEBUG: Hotkey Triggered START", file=sys.stderr)
threading.Thread(target=engine.run_session, daemon=True).start()
# STOP logic
if not is_held and engine.is_recording:
print("DEBUG: Hotkey Triggered STOP", file=sys.stderr)
engine.is_recording = False # Signal session to stop
# RESET logic: Ready to trigger again as soon as the COMBO is broken
if not is_held:
hotkey_ready = True
time.sleep(0.01) # Ultra-fast polling
except:
continue
def command_monitor(engine):
for line in sys.stdin:
cmd = line.strip()
if cmd == "toggle":
if not engine.is_recording:
threading.Thread(target=engine.run_session, daemon=True).start()
else:
engine.is_recording = False
elif cmd.startswith("model:"):
size = cmd.split(":")[1]
threading.Thread(target=engine.switch_model, args=(size,), daemon=True).start()
def suicide_watch():
# Kill ourselves if Electron (our parent) dies
parent_pid = os.getppid()
while True:
try:
if sys.platform == 'win32':
# Use Windows tasklist to check if parent is still alive
# This is more robust than getppid() == 1 on Windows
process_check = os.popen(f'tasklist /FI "PID eq {parent_pid}"').read()
if str(parent_pid) not in process_check:
os._exit(0)
else:
if os.getppid() != parent_pid:
os._exit(0)
except:
pass
time.sleep(2)
if __name__ == "__main__":
print(f"PaceEngine PID: {os.getpid()}", file=sys.stderr)
engine = PaceEngine()
threading.Thread(target=command_monitor, args=(engine,), daemon=True).start()
threading.Thread(target=suicide_watch, daemon=True).start()
# Run hotkey loop in main thread
hotkey_monitor(engine)