You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

386 lines
10 KiB

#!/usr/bin/python3
import subprocess
import argparse
import os
import shutil
import re
import sys
import fcntl
from fnmatch import fnmatch
# Good enough for now :/.
# TODO(ish):
#
# Make assorted detection hacks cleaner.
# Profile and optimize.
# Consider reimplmenting in perl or C.
# Produce more useful error messages :P.
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf'
LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock'
LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log'
debug_flag = os.getenv('KEYD_DEBUG')
def dbg(s):
if debug_flag:
print(s)
def die(msg):
sys.stderr.write('ERROR: ')
sys.stderr.write(msg)
sys.stderr.write('\n')
exit(0)
def assert_env(var):
if not os.getenv(var):
raise Exception(f'Missing environment variable {var}')
def run(cmd):
return subprocess.check_output(['/bin/sh', '-c', cmd]).decode('utf8')
def run_or_die(cmd, msg=''):
rc = subprocess.run(['/bin/sh', '-c', cmd],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL).returncode
if rc != 0:
die(msg)
def parse_config(path):
config = []
for line in open(path):
line = line.strip()
if line.startswith('[') and line.endswith(']'):
a = line[1:-1].split('|')
if len(a) < 2:
cls = a[0]
title = '*'
else:
cls = a[0]
title = a[1]
bindings = []
config.append((cls, title, bindings))
elif line == '':
continue
elif line.startswith('#'):
continue
else:
bindings.append(line)
return config
class SwayMonitor():
def __init__(self, on_window_change):
assert_env('SWAYSOCK')
self.on_window_change = on_window_change
def init(self):
pass
def run(self):
import json
import subprocess
last_cls = ''
last_title = ''
swayproc = subprocess.Popen(
['swaymsg',
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
for ev in swayproc.stdout:
data = json.loads(ev)
title = ''
cls = ''
try:
if data['container']['focused'] == True:
props = data['container']['window_properties']
cls = props['class']
title = props['title']
except:
title = ''
cls = data['container']['app_id']
if title == '' and cls == '':
continue
if last_cls != cls or last_title != title:
last_cls = cls
last_title = title
self.on_window_change(cls, title)
class XMonitor():
def __init__(self, on_window_change):
assert_env('DISPLAY')
self.on_window_change = on_window_change
def init(self):
import Xlib
import Xlib.display
self.dpy = Xlib.display.Display()
self.dpy.screen().root.change_attributes(
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
self._NET_WM_NAME = self.dpy.intern_atom('_NET_WM_NAME')
self.WM_NAME = self.dpy.intern_atom('WM_NAME')
def get_window_info(self, win):
def get_title(win):
title = ''
try:
title = win.get_full_property(self._NET_WM_NAME, 0).value.decode('utf8')
except:
try:
title = win.get_full_property(self.WM_NAME, 0).value.decode('latin1', 'replace')
except:
pass
return title
while win:
cls = win.get_wm_class()
if cls:
return (cls[1], get_title(win))
win = win.query_tree().parent
return ("root", "")
def run(self):
import Xlib
last_active_class = ""
last_active_title = ""
while True:
self.dpy.next_event()
try:
win = self.dpy.get_input_focus().focus
if isinstance(win, int):
continue
win.change_attributes(event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
cls, title = self.get_window_info(win)
if cls != last_active_class or title != last_active_title:
last_active_class = cls
last_active_title = title
self.on_window_change(cls, title)
except:
pass
# :(
class GnomeMonitor():
def __init__(self, on_window_change):
assert_env('GNOME_SETUP_DISPLAY')
self.on_window_change = on_window_change
self.version = '1.2'
self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd'
self.fifo_path = self.extension_dir + '/keyd.fifo'
def _install(self):
shutil.rmtree(self.extension_dir, ignore_errors=True)
os.makedirs(self.extension_dir, exist_ok=True)
extension = '''
const Shell = imports.gi.Shell;
const GLib = imports.gi.GLib;
// We have to keep an explicit reference around to prevent garbage collection :/.
let file = imports.gi.Gio.File.new_for_path('%s');
let pipe = file.append_to_async(0, 0, null, on_pipe_open);
function send(msg) {
if (!pipe)
return;
try {
pipe.write(msg, null);
} catch {
log('pipe closed, reopening...');
pipe = null;
file.append_to_async(0, 0, null, on_pipe_open);
}
}
function on_pipe_open(file, res) {
log('pipe opened');
pipe = file.append_to_finish(res);
}
function init() {
Shell.WindowTracker.get_default().connect('notify::focus-app', () => {
const win = global.display.focus_window;
const cls = win ? win.get_wm_class() : 'root';
const title = win ? win.get_title() : '';
send(`${cls}\\t${title}\\n`);
});
return {
enable: ()=>{ GLib.spawn_command_line_async('keyd-application-mapper -d'); },
disable: ()=>{ GLib.spawn_command_line_async('pkill -f keyd-application-mapper'); }
};
}
''' % (self.fifo_path)
metadata = '''
{
"name": "keyd",
"description": "Used by keyd to obtain active window information.",
"uuid": "keyd",
"shell-version": [ "41", "42" ]
}
'''
open(self.extension_dir + '/version', 'w').write(self.version)
open(self.extension_dir + '/metadata.json', 'w').write(metadata)
open(self.extension_dir + '/extension.js', 'w').write(extension)
os.mkfifo(self.fifo_path)
def _is_installed(self):
try:
return open(self.extension_dir + '/version', 'r').read() == self.version
except:
return False
def init(self):
if not self._is_installed():
print('keyd extension not found, installing...')
self._install()
run_or_die('gsettings set org.gnome.shell disable-user-extensions false');
print('Success! Please restart Gnome and run this script one more time.')
exit(0)
if 'DISABLED' in run('gnome-extensions show keyd'):
run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.')
print(f'Successfully enabled keyd extension :). Output will be stored in {LOGFILE}')
exit(0)
def run(self):
for line in open(self.fifo_path):
(cls, title) = line.strip('\n').split('\t')
self.on_window_change(cls, title)
def get_monitor(on_window_change):
monitors = [
('Sway', SwayMonitor),
('Gnome', GnomeMonitor),
('X', XMonitor),
]
for name, mon in monitors:
try:
m = mon(on_window_change)
print(f'{name} detected')
return m
except:
pass
print('Could not detect app environment :(.')
sys.exit(-1)
def lock():
global lockfh
lockfh = open(LOCKFILE, 'w')
try:
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
die('only one instance may run at a time')
def daemonize():
print(f'Daemonizing, log output will be stored in {LOGFILE}...')
fh = open(LOGFILE, 'w')
os.close(1)
os.close(2)
os.dup2(fh.fileno(), 1)
os.dup2(fh.fileno(), 2)
if os.fork(): exit(0)
if os.fork(): exit(0)
opt = argparse.ArgumentParser()
opt.add_argument('-q', '--quiet', default=False, action='store_true', help='suppress logging of the active window')
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background')
args = opt.parse_args()
if not os.path.exists(CONFIG_PATH):
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf')
config = parse_config(CONFIG_PATH)
lock()
def lookup_bindings(cls, title):
bindings = []
for cexp, texp, b in config:
if fnmatch(cls, cexp) and fnmatch(title, texp):
dbg(f'\tMatched {cexp}|{texp}')
bindings.extend(b)
return bindings
def normalize_class(s):
return re.sub('[^A-Za-z0-9]+', '-', s).strip('-').lower()
def normalize_title(s):
return re.sub('[\W_]+', '-', s).strip('-').lower()
last_mtime = os.path.getmtime(CONFIG_PATH)
def on_window_change(cls, title):
global last_mtime
global config
cls = normalize_class(cls)
title = normalize_title(title)
mtime = os.path.getmtime(CONFIG_PATH)
if mtime != last_mtime:
print(CONFIG_PATH + ': Updated, reloading config...')
config = parse_config(CONFIG_PATH)
last_mtime = mtime
if not args.quiet:
print(f'Active window: {cls}|{title}')
bindings = lookup_bindings(cls, title)
subprocess.run(['keyd', '-e', 'reset', *bindings])
mon = get_monitor(on_window_change)
mon.init()
if args.daemonize:
daemonize()
mon.run()