You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

302 lines
8.1 KiB

#!/usr/bin/python3
import subprocess
import argparse
import os
import re
import sys
import fcntl
# Good enough for now :/.
# TODO(ish):
#
# Make assorted detection hacks cleaner.
# Profile and optimize.
# Consider reimplmenting in perl or C.
# Produce more useful error messages :P.
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf'
LOCKFILE = os.getenv('HOME')+'/.config/keyd/lockfile'
LOGFILE = os.getenv('HOME')+'/.config/keyd/log'
def die(msg):
sys.stderr.write('ERROR: ')
sys.stderr.write(msg)
sys.stderr.write('\n')
exit(0)
def assert_env(var):
if not os.getenv(var):
raise Exception(f'Missing environment variable {var}')
def run_or_die(cmd, msg=''):
rc = subprocess.run(['/bin/sh', '-c', cmd],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL).returncode
if rc != 0:
die(msg)
def parse_config(path):
map = {}
for line in open(path):
line = line.strip()
if line.startswith('[') and line.endswith(']'):
window_identifier = line[1:-1]
bindings = []
map[window_identifier] = bindings
elif line == '':
continue
elif line.startswith('#'):
continue
else:
bindings.append(line)
return map
class SwayMonitor():
def __init__(self, on_window_change):
assert_env('SWAYSOCK')
self.on_window_change = on_window_change
def init(self):
pass
def run(self):
import json
import subprocess
swayproc = subprocess.Popen(
['swaymsg',
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
for ev in swayproc.stdout:
data = json.loads(ev)
try:
if data['container']['focused'] == True:
props = data['container']['window_properties']
self.on_window_change(props['class'], props['title'])
except:
cls = data['container']['app_id']
self.on_window_change(cls, "")
pass
class XMonitor():
def __init__(self, on_window_change):
assert_env('DISPLAY')
self.on_window_change = on_window_change
def init(self):
import Xlib
import Xlib.display
self.dpy = Xlib.display.Display()
self.dpy.screen().root.change_attributes(
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
def run(self):
last_active_class = ""
last_active_name = ""
while True:
self.dpy.next_event()
try:
focus = self.dpy.get_input_focus().focus
wm_class = focus.get_wm_class()
if wm_class:
wm_name = focus.get_wm_name()
if isinstance(wm_name, bytes):
wm_name = wm_name.decode('ascii')
cls = wm_class[1]
if cls != last_active_class or wm_name != last_active_name:
last_active_class = cls
last_active_name = wm_name
self.on_window_change(cls, wm_name)
except:
import traceback
traceback.print_exc()
pass
# :(
class GnomeMonitor():
def __init__(self, on_window_change):
assert_env('GNOME_SETUP_DISPLAY')
self.on_window_change = on_window_change
self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd'
self.fifo_path = self.extension_dir + '/keyd.fifo'
def _install(self):
os.makedirs(self.extension_dir, exist_ok=True)
extension = '''
const Shell = imports.gi.Shell;
// We have to keep an explicit reference around to prevent garbage collection :/.
let file = imports.gi.Gio.File.new_for_path('%s');
let pipe = file.append_to_async(0, 0, null, on_pipe_open);
function send(msg) {
if (!pipe)
return;
try {
pipe.write(msg, null);
} catch {
log('pipe closed, reopening...');
pipe = null;
file.append_to_async(0, 0, null, on_pipe_open);
}
}
function on_pipe_open(file, res) {
log('pipe opened');
pipe = file.append_to_finish(res);
}
function init() {
Shell.WindowTracker.get_default().connect('notify::focus-app', () => {
send(`${global.display.focus_window.get_wm_class()}\t${global.display.focus_window.get_wm_name()}\n`);
});
return { enable: ()=>{}, disable: ()=>{} };
}
''' % (self.fifo_path)
metadata = '''
{
"name": "keyd",
"description": "Used by keyd to obtain active window information.",
"uuid": "keyd",
"shell-version": [ "41" ]
}
'''
open(self.extension_dir + '/metadata.json', 'w').write(metadata)
open(self.extension_dir + '/extension.js', 'w').write(extension)
os.mkfifo(self.fifo_path)
def init(self):
if not os.path.exists(self.extension_dir):
print('keyd extension not found, installing...')
self._install()
print('Success! Please restart Gnome and rerun this script.')
exit(0)
run_or_die('gsettings set org.gnome.shell disable-user-extensions false');
run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.')
def run(self):
for line in open(self.fifo_path):
[cls, name] = line.strip().split('\t')
self.on_window_change(cls, name)
def get_monitor(on_window_change):
monitors = [
('Sway', SwayMonitor),
('Gnome', GnomeMonitor),
('X', XMonitor),
]
for name, mon in monitors:
try:
m = mon(on_window_change)
print(f'{name} detected')
return m
except:
pass
print('Could not detect app environment :(.')
sys.exit(-1)
def lock():
global lockfh
lockfh = open(LOCKFILE, 'w')
try:
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
die('only one instance may run at a time')
def ping_keyd():
run_or_die('keyd -e ping',
'could not connect to keyd instance, make sure it is running and you are a member of `keyd`')
def daemonize():
print('Daemonizing...')
fh = open(LOGFILE, 'w')
os.close(1)
os.close(2)
os.dup2(fh.fileno(), 1)
os.dup2(fh.fileno(), 2)
if os.fork(): exit(0)
if os.fork(): exit(0)
opt = argparse.ArgumentParser()
opt.add_argument('-v', '--verbose', default=False, action='store_true', help='print window class names in real time (useful for debugging and discovering window classes)')
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background')
args = opt.parse_args()
if not os.path.exists(CONFIG_PATH):
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf')
bindings = parse_config(CONFIG_PATH)
ping_keyd()
lock()
def normalize_identifier(s):
return re.sub('[^A-Za-z0-9]', '-', s).strip('-').lower()
last_mtime = os.path.getmtime(CONFIG_PATH)
def on_window_change(cls, name):
global last_mtime
global bindings
cls = normalize_identifier(cls)
name = normalize_identifier(name)
mtime = os.path.getmtime(CONFIG_PATH)
if mtime != last_mtime:
print(CONFIG_PATH + ': Updated, reloading config...')
bindings = parse_config(CONFIG_PATH)
last_mtime = mtime
if args.verbose:
print(f'Active window: {cls}, {name}')
expressions = ['reset']
if cls in bindings:
expressions.extend(bindings[cls])
pair = cls + '|' + name
if (pair) in bindings:
expressions.extend(bindings[pair])
# Apply the bindings.
subprocess.run(['keyd', '-e', *expressions])
mon = get_monitor(on_window_change)
mon.init()
if args.daemonize:
daemonize()
mon.run()