You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
529 lines
15 KiB
529 lines
15 KiB
#!/usr/bin/python3 |
|
|
|
import subprocess |
|
import argparse |
|
import select |
|
import socket |
|
import struct |
|
import os |
|
import errno |
|
import shutil |
|
import re |
|
import sys |
|
import fcntl |
|
import signal |
|
from fnmatch import fnmatch |
|
|
|
# Good enough for now :/. |
|
|
|
# TODO(ish): |
|
# |
|
# Make assorted detection hacks cleaner. |
|
# Profile and optimize. |
|
# Consider reimplmenting in perl or C. |
|
# Produce more useful error messages :P. |
|
|
|
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf' |
|
LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock' |
|
LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log' |
|
|
|
debug_flag = os.getenv('KEYD_DEBUG') |
|
|
|
def dbg(s): |
|
if debug_flag: |
|
print(s) |
|
|
|
def die(msg): |
|
sys.stderr.write('ERROR: ') |
|
sys.stderr.write(msg) |
|
sys.stderr.write('\n') |
|
exit(0) |
|
|
|
def assert_env(var): |
|
if not os.getenv(var): |
|
raise Exception(f'Missing environment variable {var}') |
|
|
|
def assert_gnome(): |
|
if 'gnome' not in os.getenv('XDG_CURRENT_DESKTOP', '').lower() and \ |
|
not os.getenv('GNOME_SETUP_DISPLAY'): |
|
raise Exception(f'Gnome desktop environment not detected by inspecting' |
|
'XDG_CURRENT_DESKTOP and GNOME_SETUP_DISPLAY environment variables') |
|
|
|
def run(cmd): |
|
return subprocess.check_output(['/bin/sh', '-c', cmd]).decode('utf8') |
|
|
|
def run_or_die(cmd, msg=''): |
|
rc = subprocess.run(['/bin/sh', '-c', cmd], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL).returncode |
|
|
|
if rc != 0: |
|
die(msg) |
|
|
|
def parse_config(path): |
|
config = [] |
|
|
|
for line in open(path): |
|
line = line.strip() |
|
|
|
if line.startswith('[') and line.endswith(']'): |
|
a = line[1:-1].split('|') |
|
|
|
if len(a) < 2: |
|
cls = a[0] |
|
title = '*' |
|
else: |
|
cls = a[0] |
|
title = a[1] |
|
|
|
bindings = [] |
|
config.append((cls, title, bindings)) |
|
elif line == '': |
|
continue |
|
elif line.startswith('#'): |
|
continue |
|
else: |
|
bindings.append(line) |
|
|
|
return config |
|
|
|
def new_interruptible_generator(fd, event_fn, flushed_fn = None): |
|
intr, intw = os.pipe() |
|
|
|
def handler(s, _): |
|
os.write(intw, b'i') |
|
|
|
signal.signal(signal.SIGUSR1, handler) |
|
|
|
while True: |
|
r,_,_ = select.select([fd, intr], [], []) |
|
|
|
if intr in r: |
|
os.read(intr, 1) |
|
yield None |
|
if fd in r: |
|
if flushed_fn: |
|
while not flushed_fn(): |
|
yield event_fn() |
|
else: |
|
yield event_fn() |
|
|
|
class KDE(): |
|
def __init__(self, on_window_change): |
|
import os |
|
import dbus |
|
import dbus.mainloop.glib |
|
|
|
assert_env("KDE_SESSION_VERSION") |
|
|
|
self.on_window_change = on_window_change |
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
|
|
|
# Inject the kwin script |
|
def init(self): |
|
import dbus |
|
|
|
if os.getenv('KDE_SESSION_VERSION') == '6': |
|
api = 'windowActivated' |
|
else: |
|
api = 'clientActivated' |
|
|
|
kwin_script = '''workspace.%s.connect(client => { |
|
if (!client) return; |
|
callDBus("org.rvaiya.keyd", "/listener", "", "updateWindow", client.caption, client.resourceClass, client.resourceName); |
|
}); |
|
''' % api |
|
|
|
f = open(f'/tmp/keyd-kwin-{os.getuid()}.js', 'w') |
|
f.write(kwin_script) |
|
f.close() |
|
|
|
bus = dbus.SessionBus() |
|
|
|
kwin = bus.get_object('org.kde.KWin', '/Scripting') |
|
|
|
kwin.unloadScript(f.name) |
|
num = kwin.loadScript(f.name) |
|
|
|
if os.getenv('KDE_SESSION_VERSION') == '6': |
|
script_object = f'/Scripting/Script{num}' |
|
else: |
|
script_object = f'/{num}' |
|
|
|
script = bus.get_object('org.kde.KWin', script_object) |
|
script.run() |
|
|
|
def run(self): |
|
import dbus.service |
|
import gi.repository.GLib |
|
|
|
on_window_change = self.on_window_change |
|
class Listener(dbus.service.Object): |
|
def __init__(self): |
|
super().__init__(dbus.service.BusName('org.rvaiya.keyd', dbus.SessionBus()), '/listener') |
|
|
|
@dbus.service.method('org.rvaiya.keyd') |
|
def updateWindow(self, title, klass, id): |
|
on_window_change(klass, title) |
|
|
|
Listener() |
|
|
|
gi.repository.GLib.MainLoop().run() |
|
|
|
# Just enough wayland wire protocol to listen for interesting events. |
|
# |
|
# Sadly most of the useful protocols haven't been standardized, |
|
# so this only works for wlroots based compositors :(. |
|
|
|
class Wayland(): |
|
def __init__(self, interface_name): |
|
path = os.getenv("WAYLAND_DISPLAY") |
|
if path == None: |
|
raise Exception("WAYLAND_DISPLAY not set (is wayland running?)") |
|
|
|
if path[0] != '/': |
|
path = os.getenv("XDG_RUNTIME_DIR") + "/" + path |
|
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
|
self.sock.connect(path) |
|
self._bind_interface(interface_name) |
|
|
|
def send_msg(self, object_id, opcode, payload): |
|
opcode |= (len(payload)+8) << 16 |
|
|
|
self.sock.sendall(struct.pack(b'II', object_id, opcode)) |
|
self.sock.sendall(payload) |
|
|
|
def recv_msg(self): |
|
(object_id, evcode) = struct.unpack('II', self.sock.recv(8)) |
|
|
|
size = evcode >> 16 |
|
evcode = evcode & 0xFFFF |
|
|
|
data = self.sock.recv(size-8) |
|
|
|
return (object_id, evcode, data) |
|
|
|
def read_string(self, b): |
|
return b[4:4+struct.unpack('I', b[:4])[0]-1].decode('utf8') |
|
|
|
# Will bind the requested interface to object id 4 |
|
def _bind_interface(self, name): |
|
# bind the registry object to id 2 |
|
self.send_msg(1, 1, b'\x02\x00\x00\x00') |
|
# solicit a sync message to bookend the registry events |
|
self.send_msg(1, 0, b'\x03\x00\x00\x00') |
|
while True: |
|
(obj, event, payload) = self.recv_msg() |
|
if obj == 2 and event == 0: # registry.global event |
|
wl_name = struct.unpack('I', payload[0:4])[0] |
|
|
|
wl_interface = self.read_string(payload[4:]) |
|
|
|
if wl_interface == name: |
|
self.send_msg(2, 0, payload+b'\x04\x00\x00\x00') |
|
return |
|
|
|
if obj == 3: # sync message |
|
raise Exception(f"Could not find interface {name}") |
|
|
|
|
|
class Wlroots(): |
|
def __init__(self, on_window_change): |
|
self.wl = Wayland('zwlr_foreign_toplevel_manager_v1') |
|
self.on_window_change = on_window_change |
|
|
|
def init(self): |
|
pass |
|
|
|
def run(self): |
|
windows = {} |
|
while True: |
|
(obj, event, payload) = self.wl.recv_msg() |
|
if obj == 4 and event == 0: |
|
windows[struct.unpack('I', payload)[0]] = {} |
|
|
|
if obj in windows: |
|
if event == 0: |
|
windows[obj]['title'] = self.wl.read_string(payload) |
|
if event == 1: |
|
windows[obj]['appid'] = self.wl.read_string(payload) |
|
if event == 4 and payload[0] > 0 and payload[4] == 2: |
|
self.on_window_change(windows[obj].get('appid', ''), windows[obj].get('title', '')) |
|
|
|
class Cosmic(): |
|
def __init__(self, on_window_change): |
|
self.wl = Wayland('zcosmic_toplevel_info_v1') |
|
self.on_window_change = on_window_change |
|
|
|
def init(self): |
|
pass |
|
|
|
def run(self): |
|
windows = {} |
|
while True: |
|
(obj, event, payload) = self.wl.recv_msg() |
|
if obj not in windows: |
|
windows[obj]={} |
|
|
|
if event == 2: |
|
windows[obj]['title'] = self.wl.read_string(payload) |
|
if event == 3: |
|
windows[obj]['appid'] = self.wl.read_string(payload) |
|
if event == 8 and payload[0] > 0 and payload[4] == 2: |
|
self.on_window_change(windows[obj].get('appid', ''), windows[obj].get('title', '')) |
|
|
|
class XMonitor(): |
|
def __init__(self, on_window_change): |
|
assert_env('DISPLAY') |
|
|
|
self.on_window_change = on_window_change |
|
|
|
def init(self): |
|
import Xlib |
|
import Xlib.display |
|
|
|
self.dpy = Xlib.display.Display() |
|
self.dpy.screen().root.change_attributes( |
|
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) |
|
|
|
self._NET_WM_NAME = self.dpy.intern_atom('_NET_WM_NAME') |
|
self.WM_NAME = self.dpy.intern_atom('WM_NAME') |
|
|
|
|
|
def get_window_info(self, win): |
|
def get_title(win): |
|
title = '' |
|
try: |
|
title = win.get_full_property(self._NET_WM_NAME, 0).value.decode('utf8') |
|
except: |
|
try: |
|
title = win.get_full_property(self.WM_NAME, 0).value.decode('latin1', 'replace') |
|
except: |
|
pass |
|
|
|
return title |
|
|
|
while win: |
|
cls = win.get_wm_class() |
|
if cls: |
|
return (cls[1], get_title(win)) |
|
|
|
win = win.query_tree().parent |
|
|
|
return ("root", "") |
|
|
|
def run(self): |
|
import Xlib |
|
|
|
last_active_class = "" |
|
last_active_title = "" |
|
|
|
_NET_WM_STATE = self.dpy.intern_atom('_NET_WM_STATE', False) |
|
_NET_WM_STATE_ABOVE = self.dpy.intern_atom('_NET_WM_STATE_ABOVE', False) |
|
_NET_WM_WINDOW_TYPE_NOTIFICATION = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE_NOTIFICATION', False) |
|
_NET_WM_WINDOW_TYPE = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE', False) |
|
|
|
def get_floating_window(): |
|
q = [self.dpy.screen().root] |
|
while q: |
|
w = q.pop() |
|
q.extend(w.query_tree().children) |
|
|
|
v = w.get_full_property(_NET_WM_STATE, Xlib.Xatom.ATOM) |
|
|
|
if v and v.value and v.value[0] == _NET_WM_STATE_ABOVE: |
|
types = w.get_full_property(_NET_WM_WINDOW_TYPE, Xlib.Xatom.ATOM) |
|
|
|
# Ignore persistent notification windows like dunst |
|
if not types or _NET_WM_WINDOW_TYPE_NOTIFICATION not in types.value: |
|
return w |
|
|
|
return None |
|
|
|
def get_active_window(): |
|
win = get_floating_window() |
|
if win != None: |
|
return win |
|
|
|
return self.dpy.get_input_focus().focus |
|
|
|
for ev in new_interruptible_generator(self.dpy.fileno(), self.dpy.next_event, lambda: not self.dpy.pending_events()): |
|
if ev == None: |
|
self.on_window_change(last_active_class, last_active_title) |
|
else: |
|
try: |
|
win = get_active_window() |
|
|
|
if isinstance(win, int) or win == None: |
|
continue |
|
|
|
win.change_attributes(event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) |
|
|
|
cls, title = self.get_window_info(win) |
|
|
|
if cls != last_active_class or title != last_active_title: |
|
last_active_class = cls |
|
last_active_title = title |
|
|
|
self.on_window_change(cls, title) |
|
except: |
|
pass |
|
|
|
|
|
# :( |
|
class GnomeMonitor(): |
|
def __init__(self, on_window_change): |
|
assert_gnome() |
|
|
|
self.on_window_change = on_window_change |
|
|
|
self.fifo_path = (os.getenv('XDG_RUNTIME_DIR') or '/run/user/'+str(os.getuid())) + '/keyd.fifo' |
|
|
|
def init(self): |
|
if not os.path.exists(self.fifo_path): |
|
print("""Gnome extension doesn't appear to be running: |
|
|
|
You will need to install the keyd gnome extension in order for the |
|
application mapper to work correctly. |
|
|
|
This can usually be achieved by running: |
|
|
|
rm -r ~/.local/share/gnome-shell/extensions/keyd # Remove any older versions of the extension |
|
mkdir -p ~/.local/share/gnome-shell/extensions |
|
|
|
Followed by: |
|
|
|
Gnome 42-44: |
|
ln -s /usr/local/share/keyd/gnome-extension ~/.local/share/gnome-shell/extensions/keyd |
|
|
|
Gnome 45/46: |
|
ln -s /usr/local/share/keyd/gnome-extension-45 ~/.local/share/gnome-shell/extensions/keyd |
|
|
|
Finally restart Gnome and run: |
|
|
|
gnome-extensions enable keyd |
|
gnome-extensions show keyd (verify the extension is enabled) |
|
|
|
NOTE: |
|
You may need to adjust the above paths (e.g /usr/share/keyd/gnome-extension) |
|
depending on your distro. |
|
""") |
|
exit(0) |
|
|
|
def run(self): |
|
fh = open(self.fifo_path) |
|
last_cls = '' |
|
last_title = '' |
|
|
|
for line in new_interruptible_generator(fh.fileno(), fh.readline, None): |
|
if line == None: |
|
self.on_window_change(last_cls, last_title) |
|
continue |
|
|
|
try: |
|
(cls, title) = line.strip('\n').split('\t') |
|
last_cls = cls |
|
last_title = title |
|
except: |
|
cls = '' |
|
title = '' |
|
|
|
self.on_window_change(cls, title) |
|
|
|
def get_monitor(on_window_change): |
|
monitors = [ |
|
('kde', KDE), |
|
('wlroots', Wlroots), |
|
('cosmic', Cosmic), |
|
('Gnome', GnomeMonitor), |
|
('X', XMonitor), |
|
] |
|
|
|
for name, mon in monitors: |
|
try: |
|
m = mon(on_window_change) |
|
print(f'{name} detected') |
|
return m |
|
except: |
|
pass |
|
|
|
print('Could not detect app environment :(.') |
|
sys.exit(-1) |
|
|
|
def lock(): |
|
global lockfh |
|
lockfh = open(LOCKFILE, 'w') |
|
try: |
|
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB) |
|
except: |
|
die('only one instance may run at a time') |
|
|
|
def daemonize(): |
|
print(f'Daemonizing, log output will be stored in {LOGFILE}...') |
|
|
|
fh = open(LOGFILE, 'w') |
|
|
|
os.close(1) |
|
os.close(2) |
|
os.dup2(fh.fileno(), 1) |
|
os.dup2(fh.fileno(), 2) |
|
|
|
if os.fork(): exit(0) |
|
if os.fork(): exit(0) |
|
|
|
opt = argparse.ArgumentParser() |
|
opt.add_argument('-v', '--verbose', default=False, action='store_true', help='Log the active window (useful for discovering window and class names)') |
|
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background') |
|
args = opt.parse_args() |
|
|
|
if not os.path.exists(CONFIG_PATH): |
|
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf') |
|
|
|
config = parse_config(CONFIG_PATH) |
|
lock() |
|
|
|
def lookup_bindings(cls, title): |
|
bindings = [] |
|
for cexp, texp, b in config: |
|
if fnmatch(cls, cexp) and fnmatch(title, texp): |
|
dbg(f'\tMatched {cexp}|{texp}') |
|
bindings.extend(b) |
|
|
|
return bindings |
|
|
|
def normalize_class(s): |
|
return re.sub('[^A-Za-z0-9]+', '-', s).strip('-').lower() |
|
|
|
def normalize_title(s): |
|
return re.sub('[\W_]+', '-', s).strip('-').lower() |
|
|
|
last_mtime = os.path.getmtime(CONFIG_PATH) |
|
def on_window_change(cls, title): |
|
global last_mtime |
|
global config |
|
|
|
cls = normalize_class(cls) |
|
title = normalize_title(title) |
|
|
|
mtime = os.path.getmtime(CONFIG_PATH) |
|
|
|
if mtime != last_mtime: |
|
print(CONFIG_PATH + ': Updated, reloading config...') |
|
config = parse_config(CONFIG_PATH) |
|
last_mtime = mtime |
|
|
|
if args.verbose: |
|
print(f'Active window: {cls}|{title}') |
|
|
|
bindings = lookup_bindings(cls, title) |
|
subprocess.run(['keyd', 'bind', 'reset', *bindings], stdout=subprocess.DEVNULL) |
|
|
|
|
|
mon = get_monitor(on_window_change) |
|
mon.init() |
|
|
|
if args.daemonize: |
|
daemonize() |
|
|
|
mon.run()
|
|
|