#!/usr/bin/python3 import subprocess import argparse import os import shutil import re import sys import fcntl from fnmatch import fnmatch # Good enough for now :/. # TODO(ish): # # Make assorted detection hacks cleaner. # Profile and optimize. # Consider reimplmenting in perl or C. # Produce more useful error messages :P. CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf' LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock' LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log' debug_flag = os.getenv('KEYD_DEBUG') def dbg(s): if debug_flag: print(s) def die(msg): sys.stderr.write('ERROR: ') sys.stderr.write(msg) sys.stderr.write('\n') exit(0) def assert_env(var): if not os.getenv(var): raise Exception(f'Missing environment variable {var}') def run(cmd): return subprocess.check_output(['/bin/sh', '-c', cmd]).decode('utf8') def run_or_die(cmd, msg=''): rc = subprocess.run(['/bin/sh', '-c', cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode if rc != 0: die(msg) def parse_config(path): config = [] for line in open(path): line = line.strip() if line.startswith('[') and line.endswith(']'): a = line[1:-1].split('|') if len(a) < 2: cls = a[0] title = '*' else: cls = a[0] title = a[1] bindings = [] config.append((cls, title, bindings)) elif line == '': continue elif line.startswith('#'): continue else: bindings.append(line) return config class SwayMonitor(): def __init__(self, on_window_change): assert_env('SWAYSOCK') self.on_window_change = on_window_change def init(self): pass def run(self): import json import subprocess last_cls = '' last_title = '' swayproc = subprocess.Popen( ['swaymsg', '--type', 'subscribe', '--monitor', '--raw', '["window"]'], stdout=subprocess.PIPE) for ev in swayproc.stdout: data = json.loads(ev) title = '' cls = '' try: if data['container']['focused'] == True: props = data['container']['window_properties'] cls = props['class'] title = props['title'] except: try: title = '' cls = data['container']['app_id'] except: pass if title == '' and cls == '': continue if last_cls != cls or last_title != title: last_cls = cls last_title = title self.on_window_change(cls, title) class XMonitor(): def __init__(self, on_window_change): assert_env('DISPLAY') self.on_window_change = on_window_change def init(self): import Xlib import Xlib.display self.dpy = Xlib.display.Display() self.dpy.screen().root.change_attributes( event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) self._NET_WM_NAME = self.dpy.intern_atom('_NET_WM_NAME') self.WM_NAME = self.dpy.intern_atom('WM_NAME') def get_window_info(self, win): def get_title(win): title = '' try: title = win.get_full_property(self._NET_WM_NAME, 0).value.decode('utf8') except: try: title = win.get_full_property(self.WM_NAME, 0).value.decode('latin1', 'replace') except: pass return title while win: cls = win.get_wm_class() if cls: return (cls[1], get_title(win)) win = win.query_tree().parent return ("root", "") def run(self): import Xlib last_active_class = "" last_active_title = "" _NET_WM_STATE = self.dpy.intern_atom('_NET_WM_STATE', False) _NET_WM_STATE_ABOVE = self.dpy.intern_atom('_NET_WM_STATE_ABOVE', False) def get_floating_window(): for w in self.dpy.screen().root.query_tree().children: v = w.get_full_property(_NET_WM_STATE, Xlib.Xatom.ATOM) if v != None and v.value[0] == _NET_WM_STATE_ABOVE: return w return None def get_active_window(): win = get_floating_window() if win != None: return win return self.dpy.get_input_focus().focus while True: ev = self.dpy.next_event() try: win = get_active_window() if isinstance(win, int) or win == None: continue win.change_attributes(event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) cls, title = self.get_window_info(win) if cls != last_active_class or title != last_active_title: last_active_class = cls last_active_title = title self.on_window_change(cls, title) except: pass # :( class GnomeMonitor(): def __init__(self, on_window_change): assert_env('GNOME_SETUP_DISPLAY') self.on_window_change = on_window_change self.version = '1.2' self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd' self.fifo_path = self.extension_dir + '/keyd.fifo' def _install(self): shutil.rmtree(self.extension_dir, ignore_errors=True) os.makedirs(self.extension_dir, exist_ok=True) extension = ''' const Shell = imports.gi.Shell; const GLib = imports.gi.GLib; // We have to keep an explicit reference around to prevent garbage collection :/. let file = imports.gi.Gio.File.new_for_path('%s'); let pipe = file.append_to_async(0, 0, null, on_pipe_open); function send(msg) { if (!pipe) return; try { pipe.write(msg, null); } catch { log('pipe closed, reopening...'); pipe = null; file.append_to_async(0, 0, null, on_pipe_open); } } function on_pipe_open(file, res) { log('pipe opened'); pipe = file.append_to_finish(res); } function init() { Shell.WindowTracker.get_default().connect('notify::focus-app', () => { const win = global.display.focus_window; const cls = win ? win.get_wm_class() : 'root'; const title = win ? win.get_title() : ''; send(`${cls}\\t${title}\\n`); }); return { enable: ()=>{ GLib.spawn_command_line_async('keyd-application-mapper -d'); }, disable: ()=>{ GLib.spawn_command_line_async('pkill -f keyd-application-mapper'); } }; } ''' % (self.fifo_path) metadata = ''' { "name": "keyd", "description": "Used by keyd to obtain active window information.", "uuid": "keyd", "shell-version": [ "41", "42" ] } ''' open(self.extension_dir + '/version', 'w').write(self.version) open(self.extension_dir + '/metadata.json', 'w').write(metadata) open(self.extension_dir + '/extension.js', 'w').write(extension) os.mkfifo(self.fifo_path) def _is_installed(self): try: return open(self.extension_dir + '/version', 'r').read() == self.version except: return False def init(self): if not self._is_installed(): print('keyd extension not found, installing...') self._install() run_or_die('gsettings set org.gnome.shell disable-user-extensions false'); print('Success! Please restart Gnome and run this script one more time.') exit(0) if 'DISABLED' in run('gnome-extensions show keyd'): run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.') print(f'Successfully enabled keyd extension :). Output will be stored in {LOGFILE}') exit(0) def run(self): for line in open(self.fifo_path): (cls, title) = line.strip('\n').split('\t') self.on_window_change(cls, title) def get_monitor(on_window_change): monitors = [ ('Sway', SwayMonitor), ('Gnome', GnomeMonitor), ('X', XMonitor), ] for name, mon in monitors: try: m = mon(on_window_change) print(f'{name} detected') return m except: pass print('Could not detect app environment :(.') sys.exit(-1) def lock(): global lockfh lockfh = open(LOCKFILE, 'w') try: fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB) except: die('only one instance may run at a time') def daemonize(): print(f'Daemonizing, log output will be stored in {LOGFILE}...') fh = open(LOGFILE, 'w') os.close(1) os.close(2) os.dup2(fh.fileno(), 1) os.dup2(fh.fileno(), 2) if os.fork(): exit(0) if os.fork(): exit(0) opt = argparse.ArgumentParser() opt.add_argument('-q', '--quiet', default=False, action='store_true', help='suppress logging of the active window') opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background') args = opt.parse_args() if not os.path.exists(CONFIG_PATH): die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf') config = parse_config(CONFIG_PATH) lock() def lookup_bindings(cls, title): bindings = [] for cexp, texp, b in config: if fnmatch(cls, cexp) and fnmatch(title, texp): dbg(f'\tMatched {cexp}|{texp}') bindings.extend(b) return bindings def normalize_class(s): return re.sub('[^A-Za-z0-9]+', '-', s).strip('-').lower() def normalize_title(s): return re.sub('[\W_]+', '-', s).strip('-').lower() last_mtime = os.path.getmtime(CONFIG_PATH) def on_window_change(cls, title): global last_mtime global config cls = normalize_class(cls) title = normalize_title(title) mtime = os.path.getmtime(CONFIG_PATH) if mtime != last_mtime: print(CONFIG_PATH + ': Updated, reloading config...') config = parse_config(CONFIG_PATH) last_mtime = mtime if not args.quiet: print(f'Active window: {cls}|{title}') bindings = lookup_bindings(cls, title) subprocess.run(['keyd', 'bind', 'reset', *bindings]) mon = get_monitor(on_window_change) mon.init() if args.daemonize: daemonize() mon.run()