You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

304 lines
8.0 KiB

#!/usr/bin/python3
import subprocess
import argparse
import os
import shutil
import re
import sys
import fcntl
# Good enough for now :/.
# TODO(ish):
#
# Make assorted detection hacks cleaner.
# Profile and optimize.
# Consider reimplmenting in perl or C.
# Produce more useful error messages :P.
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf'
LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock'
LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log'
def die(msg):
sys.stderr.write('ERROR: ')
sys.stderr.write(msg)
sys.stderr.write('\n')
exit(0)
def assert_env(var):
if not os.getenv(var):
raise Exception(f'Missing environment variable {var}')
def run_or_die(cmd, msg=''):
rc = subprocess.run(['/bin/sh', '-c', cmd],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL).returncode
if rc != 0:
die(msg)
def parse_config(path):
map = {}
for line in open(path):
line = line.strip()
if line.startswith('[') and line.endswith(']'):
window_class = line[1:-1]
bindings = []
map[window_class] = bindings
elif line == '':
continue
elif line.startswith('#'):
continue
else:
bindings.append(line)
return map
class SwayMonitor():
def __init__(self, on_window_change):
assert_env('SWAYSOCK')
self.on_window_change = on_window_change
def init(self):
pass
def run(self):
import json
import subprocess
swayproc = subprocess.Popen(
['swaymsg',
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
for ev in swayproc.stdout:
data = json.loads(ev)
try:
if data['container']['focused'] == True:
cls = data['container']['window_properties']['class']
self.on_window_change(cls)
except:
cls = data['container']['app_id']
self.on_window_change(cls)
pass
class XMonitor():
def __init__(self, on_window_change):
assert_env('DISPLAY')
self.on_window_change = on_window_change
def init(self):
import Xlib
import Xlib.display
self.dpy = Xlib.display.Display()
self.dpy.screen().root.change_attributes(
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
def run(self):
last_active_class = ""
while True:
self.dpy.next_event()
try:
wm_class = self.dpy.get_input_focus().focus.get_wm_class()
if wm_class == None:
cls = 'root'
else:
cls = wm_class[1]
if cls != last_active_class:
last_active_class = cls
self.on_window_change(cls)
except:
pass
# :(
class GnomeMonitor():
def __init__(self, on_window_change):
assert_env('GNOME_SETUP_DISPLAY')
self.on_window_change = on_window_change
self.version = '1'
self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd'
self.fifo_path = self.extension_dir + '/keyd.fifo'
def _install(self):
shutil.rmtree(self.extension_dir, ignore_errors=True)
os.makedirs(self.extension_dir, exist_ok=True)
extension = '''
const Shell = imports.gi.Shell;
const GLib = imports.gi.GLib;
// We have to keep an explicit reference around to prevent garbage collection :/.
let file = imports.gi.Gio.File.new_for_path('%s');
let pipe = file.append_to_async(0, 0, null, on_pipe_open);
function send(msg) {
if (!pipe)
return;
try {
pipe.write(msg, null);
} catch {
log('pipe closed, reopening...');
pipe = null;
file.append_to_async(0, 0, null, on_pipe_open);
}
}
function on_pipe_open(file, res) {
log('pipe opened');
pipe = file.append_to_finish(res);
}
function init() {
Shell.WindowTracker.get_default().connect('notify::focus-app', () => {
const win = global.display.focus_window;
const cls = win ? win.get_wm_class() : 'root';
send(`${cls}\n`);
});
return {
enable: ()=>{ GLib.spawn_command_line_async('keyd-application-mapper -d'); },
disable: ()=>{ GLib.spawn_command_line_async('pkill -f keyd-application-mapper'); }
};
}
''' % (self.fifo_path)
metadata = '''
{
"name": "keyd",
"description": "Used by keyd to obtain active window information.",
"uuid": "keyd",
"shell-version": [ "41" ]
}
'''
open(self.extension_dir + '/version', 'w').write(self.version)
open(self.extension_dir + '/metadata.json', 'w').write(metadata)
open(self.extension_dir + '/extension.js', 'w').write(extension)
os.mkfifo(self.fifo_path)
def _is_installed(self):
try:
return open(self.extension_dir + '/version', 'r').read() == self.version
except:
return False
def init(self):
if not self._is_installed():
print('keyd extension not found, installing...')
self._install()
print('Success! Please restart Gnome.')
exit(0)
run_or_die('gsettings set org.gnome.shell disable-user-extensions false');
run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.')
def run(self):
for cls in open(self.fifo_path):
cls = cls.strip()
self.on_window_change(cls)
def get_monitor(on_window_change):
monitors = [
('Sway', SwayMonitor),
('Gnome', GnomeMonitor),
('X', XMonitor),
]
for name, mon in monitors:
try:
m = mon(on_window_change)
print(f'{name} detected')
return m
except:
pass
print('Could not detect app environment :(.')
sys.exit(-1)
def lock():
global lockfh
lockfh = open(LOCKFILE, 'w')
try:
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
die('only one instance may run at a time')
def ping_keyd():
run_or_die('keyd -e ping',
'could not connect to keyd instance, make sure it is running and you are a member of `keyd`')
def daemonize():
print(f'Daemonizing, log output will be stored in {LOGFILE}...')
fh = open(LOGFILE, 'w')
os.close(1)
os.close(2)
os.dup2(fh.fileno(), 1)
os.dup2(fh.fileno(), 2)
if os.fork(): exit(0)
if os.fork(): exit(0)
opt = argparse.ArgumentParser()
opt.add_argument('-q', '--quiet', default=False, action='store_true', help='suppress logging of the active window')
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background')
args = opt.parse_args()
if not os.path.exists(CONFIG_PATH):
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf')
map = parse_config(CONFIG_PATH)
ping_keyd()
lock()
def normalize_class(s):
return re.sub('[^A-Za-z0-9]', '-', s).strip('-').lower()
last_mtime = os.path.getmtime(CONFIG_PATH)
def on_window_change(cls):
global last_mtime
global map
cls = normalize_class(cls)
mtime = os.path.getmtime(CONFIG_PATH)
if mtime != last_mtime:
print(CONFIG_PATH + ': Updated, reloading config...')
map = parse_config(CONFIG_PATH)
last_mtime = mtime
if not args.quiet:
print(f'Active window class: {cls}')
bindings = map.get(cls, [])
subprocess.run(['keyd', '-e', 'reset', *bindings])
mon = get_monitor(on_window_change)
mon.init()
if args.daemonize:
daemonize()
mon.run()