@ -3,17 +3,40 @@
import subprocess
import argparse
import os
import re
import sys
import fcntl
# Good enough for now :/.
# TODO(ish):
# TODO(ish):
#
# Make assorted detection hacks cleaner.
# Make assorted detection hacks cleaner.
# Profile and optimize.
# Consider reimplmenting in perl or C.
# Produce more useful error messages :P.
CONFIG_PATH = os.getenv('HOME') + '/.keyd-mappings'
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf'
LOCKFILE = os.getenv('HOME')+'/.config/keyd/lockfile'
LOGFILE = os.getenv('HOME')+'/.config/keyd/log'
def die(msg):
sys.stderr.write('ERROR: ')
sys.stderr.write(msg)
sys.stderr.write('\n')
exit(0)
def assert_env(var):
if not os.getenv(var):
raise Exception(f'Missing environment variable {var}')
def run_or_die(cmd, msg=''):
rc = subprocess.run(['/bin/sh', '-c', cmd],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL).returncode
if rc != 0:
die(msg)
def parse_config(path):
map = {}
@ -35,15 +58,14 @@ def parse_config(path):
return map
class SwayWindowChangeDetector():
class SwayMonitor():
def __init__(self, on_window_change):
import os
assert_env('SWAYSOCK')
self.on_window_change = on_window_change
if not os.getenv('SWAYSOCK' ):
raise Exception('SWAYSOCK not found, is sway running?')
def init(self ):
pass
def run(self):
import json
@ -51,11 +73,11 @@ class SwayWindowChangeDetector():
swayproc = subprocess.Popen(
['swaymsg',
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
for ev in swayproc.stdout:
data = json.loads(ev)
@ -63,141 +85,201 @@ class SwayWindowChangeDetector():
try:
if data['container']['focused'] == True:
cls = data['container']['window_properties']['class']
self.on_window_change([ cls] )
self.on_window_change(cls)
except:
self.on_window_change([data['container']['app_id']])
cls = data['container']['app_id']
self.on_window_change(cls)
pass
class XWindowChangeDetec tor():
class XMoni tor():
def __init__(self, on_window_change):
import os
assert_env('DISPLAY')
self.on_window_change = on_window_change
if not os.getenv('DISPLAY'):
raise Exception('DISPLAY not set, is X running?')
# TODO: make this less kludgy
def run(self):
import time
def init(self):
import Xlib
import Xlib.display
dpy = Xlib.display.Display()
class_cache = {}
def get_class(win):
hsh = str(win)
if hsh not in class_cache:
try:
cls = win.get_wm_class()
if not cls:
return []
class_cache[hsh] = cls
except:
return []
return class_cache[hsh]
self.dpy = Xlib.display.Display()
self.dpy.screen().root.change_attributes(
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
last = []
def run(self):
last_active_class = ""
while True:
win = dpy.get_input_focus().focus
classes = get_class(win)
if classes != last:
self.on_window_change(classes)
last = classes
time.sleep(0.1)
self.dpy.next_event()
class GnomeWindowChangeDetector():
def __init__(self, on_window_change):
import dbus
import dbus.mainloop.glib
try:
cls = self.dpy.get_input_focus().focus.get_wm_class()[1]
if cls != last_active_class:
last_active_class = cls
self.on_window_change(cls)
except:
import traceback
traceback.print_exc()
pass
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.con = dbus.SessionBus()
# :(
class GnomeMonitor():
def __init__(self, on_window_change):
assert_env('GNOME_SETUP_DISPLAY')
self.introspect = self.get_dbus_object("org.gnome.Shell.Introspect",
"/org/gnome/Shell/Introspect")
self.on_window_change = on_window_change
self.shell = self.get_dbus_object(
"org.gnome.Shell", "/org/gnome/Shell")
self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd'
self.fifo_path = self.extension_dir + '/keyd.fifo'
def _install(self):
os.makedirs(self.extension_dir, exist_ok=True)
extension = '''
const Shell = imports.gi.Shell;
// We have to keep an explicit reference around to prevent garbage collection :/.
let file = imports.gi.Gio.File.new_for_path('%s');
let pipe = file.append_to_async(0, 0, null, on_pipe_open);
function send(msg) {
if (!pipe)
return;
try {
pipe.write(msg, null);
} catch {
log('pipe closed, reopening...');
pipe = null;
file.append_to_async(0, 0, null, on_pipe_open);
}
}
function on_pipe_open(file, res) {
log('pipe opened');
pipe = file.append_to_finish(res);
}
function init() {
Shell.WindowTracker.get_default().connect('notify::focus-app', () => {
send(`${global.display.focus_window.get_wm_class()}\n`);
});
return { enable: ()=>{}, disable: ()=>{} };
}
''' % (self.fifo_path)
metadata = '''
{
"name": "keyd",
"description": "Used by keyd to obtain active window information.",
"uuid": "keyd",
"shell-version": [ "41" ]
}
'''
open(self.extension_dir + '/metadata.json', 'w').write(metadata)
open(self.extension_dir + '/extension.js', 'w').write(extension)
os.mkfifo(self.fifo_path)
def init(self):
if not os.path.exists(self.extension_dir):
print('keyd extension not found, installing...')
self._install()
print('Success! Please restart Gnome and rerun this script.')
exit(0)
run_or_die('gsettings set org.gnome.shell disable-user-extensions false');
run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.')
self.on_window_change = on_window_change
self.introspect.connect_to_signal(
"RunningApplicationsChanged", lambda: self._on_window_change())
def run(self):
for cls in open(self.fifo_path):
cls = cls.strip()
self.on_window_change(cls)
def get_monitor(on_window_change):
monitors = [
('Sway', SwayMonitor),
('Gnome', GnomeMonitor),
('X', XMonitor),
]
def get_dbus_object(self, interface, path):
import dbus
return dbus.Interface(self.con.get_object(interface, path), interface)
for name, mon in monitors:
try:
m = mon(on_window_change)
print(f'{name} detected')
return m
except:
pass
def get_window_class(self):
return self.shell.Eval('global.display.focus_window.get_wm_class()')[1].strip('"')
print('Could not detect app environment :(.')
sys.exit(-1 )
def _on_window_change(self):
self.on_window_change(self.get_window_class())
def lock():
global lockfh
lockfh = open(LOCKFILE, 'w')
try:
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
die('only one instance may run at a time')
def run(self):
import dbus
from gi.repository import GLib
def ping_keyd( ):
run_or_die('keyd -e ping',
'could not connect to keyd instance, make sure it is running and you are a member of `keyd`')
loop = GLib.MainLoop()
loop.run()
def daemonize():
print('Daemonizing...' )
fh = open(LOGFILE, 'w')
def get_detector(on_window_change):
detectors = [
('Sway', SwayWindowChangeDetector),
('Gnome', GnomeWindowChangeDetector),
('X', XWindowChangeDetector),
]
os.close(1)
os.close(2)
os.dup2(fh.fileno(), 1)
os.dup2(fh.fileno(), 2)
for name,detector in detectors:
try:
d = detector(on_window_change)
print(f'{name} detected')
return d
except:
pass
if os.fork(): exit(0)
if os.fork(): exit(0)
print('Could not detect app environment :(.')
exit(-1)
opt = argparse.ArgumentParser()
opt.add_argument('-m', '--monitor', default=False, action='store_true', help='print window class names in real time')
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background')
args = opt.parse_args()
if not os.path.exists(CONFIG_PATH):
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf')
parser = argparse.ArgumentParser()
parser.add_argument('-m', '--monitor', default=False, action='store_true')
args = parser.parse_args()
bindings = parse_config(CONFIG_PATH )
ping_keyd( )
lock ()
if subprocess.run(['keyd', '-e', 'ping'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode != 0:
print('Could not connect to keyd instance, make sure it is running and you are a member of `keyd`')
exit(-1)
def normalize_class(s):
return re.sub('[^A-Za-z0-9]', '-', s).strip('-').lower()
app_bindings = parse_config(CONFIG_PATH)
last_mtime = os.path.getmtime(CONFIG_PATH)
def on_window_change(classes):
def on_window_change(cls):
global last_mtime
global app_bindings
global bindings
cls = normalize_class(cls)
mtime = os.path.getmtime(CONFIG_PATH)
if mtime != last_mtime:
print(CONFIG_PATH + ': Updated, reloading config...')
app_ bindings = parse_config(CONFIG_PATH)
bindings = parse_config(CONFIG_PATH)
last_mtime = mtime
if args.monitor:
print(', '.join( cla sses) )
print(cls)
return
for cls in classes:
if cls in app_bindings:
# Apply the bindings.
subprocess.run(['keyd', '-e', *app_bindings[cls]])
if cls in bindings:
# Apply the bindings.
subprocess.run(['keyd', '-e', *bindings[cls]])
mon = get_monitor(on_window_change)
mon.init()
if args.daemonize:
daemonize()
get_detector(on_window_change).run()
mon .run()