#!/usr/bin/python3 import subprocess import argparse import os import re import sys import fcntl # Good enough for now :/. # TODO(ish): # # Make assorted detection hacks cleaner. # Profile and optimize. # Consider reimplmenting in perl or C. # Produce more useful error messages :P. CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf' LOCKFILE = os.getenv('HOME')+'/.config/keyd/lockfile' LOGFILE = os.getenv('HOME')+'/.config/keyd/log' def die(msg): sys.stderr.write('ERROR: ') sys.stderr.write(msg) sys.stderr.write('\n') exit(0) def assert_env(var): if not os.getenv(var): raise Exception(f'Missing environment variable {var}') def run_or_die(cmd, msg=''): rc = subprocess.run(['/bin/sh', '-c', cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode if rc != 0: die(msg) def parse_config(path): map = {} for line in open(path): line = line.strip() if line.startswith('[') and line.endswith(']'): window_class = line[1:-1] bindings = [] map[window_class] = bindings elif line == '': continue elif line.startswith('#'): continue else: bindings.append(line) return map class SwayMonitor(): def __init__(self, on_window_change): assert_env('SWAYSOCK') self.on_window_change = on_window_change def init(self): pass def run(self): import json import subprocess swayproc = subprocess.Popen( ['swaymsg', '--type', 'subscribe', '--monitor', '--raw', '["window"]'], stdout=subprocess.PIPE) for ev in swayproc.stdout: data = json.loads(ev) try: if data['container']['focused'] == True: cls = data['container']['window_properties']['class'] self.on_window_change(cls) except: cls = data['container']['app_id'] self.on_window_change(cls) pass class XMonitor(): def __init__(self, on_window_change): assert_env('DISPLAY') self.on_window_change = on_window_change def init(self): import Xlib import Xlib.display self.dpy = Xlib.display.Display() self.dpy.screen().root.change_attributes( event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) def run(self): last_active_class = "" while True: self.dpy.next_event() try: wm_class = self.dpy.get_input_focus().focus.get_wm_class() if wm_class: cls = wm_class[1] if cls != last_active_class: last_active_class = cls self.on_window_change(cls) except: import traceback traceback.print_exc() pass # :( class GnomeMonitor(): def __init__(self, on_window_change): assert_env('GNOME_SETUP_DISPLAY') self.on_window_change = on_window_change self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd' self.fifo_path = self.extension_dir + '/keyd.fifo' def _install(self): os.makedirs(self.extension_dir, exist_ok=True) extension = ''' const Shell = imports.gi.Shell; // We have to keep an explicit reference around to prevent garbage collection :/. let file = imports.gi.Gio.File.new_for_path('%s'); let pipe = file.append_to_async(0, 0, null, on_pipe_open); function send(msg) { if (!pipe) return; try { pipe.write(msg, null); } catch { log('pipe closed, reopening...'); pipe = null; file.append_to_async(0, 0, null, on_pipe_open); } } function on_pipe_open(file, res) { log('pipe opened'); pipe = file.append_to_finish(res); } function init() { Shell.WindowTracker.get_default().connect('notify::focus-app', () => { send(`${global.display.focus_window.get_wm_class()}\n`); }); return { enable: ()=>{}, disable: ()=>{} }; } ''' % (self.fifo_path) metadata = ''' { "name": "keyd", "description": "Used by keyd to obtain active window information.", "uuid": "keyd", "shell-version": [ "41" ] } ''' open(self.extension_dir + '/metadata.json', 'w').write(metadata) open(self.extension_dir + '/extension.js', 'w').write(extension) os.mkfifo(self.fifo_path) def init(self): if not os.path.exists(self.extension_dir): print('keyd extension not found, installing...') self._install() print('Success! Please restart Gnome and rerun this script.') exit(0) run_or_die('gsettings set org.gnome.shell disable-user-extensions false'); run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.') def run(self): for cls in open(self.fifo_path): cls = cls.strip() self.on_window_change(cls) def get_monitor(on_window_change): monitors = [ ('Sway', SwayMonitor), ('Gnome', GnomeMonitor), ('X', XMonitor), ] for name, mon in monitors: try: m = mon(on_window_change) print(f'{name} detected') return m except: pass print('Could not detect app environment :(.') sys.exit(-1) def lock(): global lockfh lockfh = open(LOCKFILE, 'w') try: fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB) except: die('only one instance may run at a time') def ping_keyd(): run_or_die('keyd -e ping', 'could not connect to keyd instance, make sure it is running and you are a member of `keyd`') def daemonize(): print('Daemonizing...') fh = open(LOGFILE, 'w') os.close(1) os.close(2) os.dup2(fh.fileno(), 1) os.dup2(fh.fileno(), 2) if os.fork(): exit(0) if os.fork(): exit(0) opt = argparse.ArgumentParser() opt.add_argument('-v', '--verbose', default=False, action='store_true', help='print window class names in real time (useful for debugging and discovering window classes)') opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background') args = opt.parse_args() if not os.path.exists(CONFIG_PATH): die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf') bindings = parse_config(CONFIG_PATH) ping_keyd() lock() def normalize_class(s): return re.sub('[^A-Za-z0-9]', '-', s).strip('-').lower() last_mtime = os.path.getmtime(CONFIG_PATH) def on_window_change(cls): global last_mtime global bindings cls = normalize_class(cls) mtime = os.path.getmtime(CONFIG_PATH) if mtime != last_mtime: print(CONFIG_PATH + ': Updated, reloading config...') bindings = parse_config(CONFIG_PATH) last_mtime = mtime if args.verbose: print(f'Active window class: {cls}') if cls in bindings: # Apply the bindings. subprocess.run(['keyd', '-e', *bindings[cls]]) mon = get_monitor(on_window_change) mon.init() if args.daemonize: daemonize() mon.run()