You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
285 lines
7.4 KiB
285 lines
7.4 KiB
#!/usr/bin/python3 |
|
|
|
import subprocess |
|
import argparse |
|
import os |
|
import re |
|
import sys |
|
import fcntl |
|
|
|
# Good enough for now :/. |
|
|
|
# TODO(ish): |
|
# |
|
# Make assorted detection hacks cleaner. |
|
# Profile and optimize. |
|
# Consider reimplmenting in perl or C. |
|
# Produce more useful error messages :P. |
|
|
|
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf' |
|
LOCKFILE = os.getenv('HOME')+'/.config/keyd/lockfile' |
|
LOGFILE = os.getenv('HOME')+'/.config/keyd/log' |
|
|
|
def die(msg): |
|
sys.stderr.write('ERROR: ') |
|
sys.stderr.write(msg) |
|
sys.stderr.write('\n') |
|
exit(0) |
|
|
|
def assert_env(var): |
|
if not os.getenv(var): |
|
raise Exception(f'Missing environment variable {var}') |
|
|
|
def run_or_die(cmd, msg=''): |
|
rc = subprocess.run(['/bin/sh', '-c', cmd], |
|
stdout=subprocess.DEVNULL, |
|
stderr=subprocess.DEVNULL).returncode |
|
|
|
if rc != 0: |
|
die(msg) |
|
|
|
def parse_config(path): |
|
map = {} |
|
|
|
for line in open(path): |
|
line = line.strip() |
|
|
|
if line.startswith('[') and line.endswith(']'): |
|
window_class = line[1:-1] |
|
|
|
bindings = [] |
|
map[window_class] = bindings |
|
elif line == '': |
|
continue |
|
elif line.startswith('#'): |
|
continue |
|
else: |
|
bindings.append(line) |
|
|
|
return map |
|
|
|
class SwayMonitor(): |
|
def __init__(self, on_window_change): |
|
assert_env('SWAYSOCK') |
|
|
|
self.on_window_change = on_window_change |
|
|
|
def init(self): |
|
pass |
|
|
|
def run(self): |
|
import json |
|
import subprocess |
|
|
|
swayproc = subprocess.Popen( |
|
['swaymsg', |
|
'--type', |
|
'subscribe', |
|
'--monitor', |
|
'--raw', |
|
'["window"]'], stdout=subprocess.PIPE) |
|
|
|
for ev in swayproc.stdout: |
|
data = json.loads(ev) |
|
|
|
try: |
|
if data['container']['focused'] == True: |
|
cls = data['container']['window_properties']['class'] |
|
self.on_window_change(cls) |
|
except: |
|
cls = data['container']['app_id'] |
|
self.on_window_change(cls) |
|
pass |
|
|
|
|
|
class XMonitor(): |
|
def __init__(self, on_window_change): |
|
assert_env('DISPLAY') |
|
|
|
self.on_window_change = on_window_change |
|
|
|
def init(self): |
|
import Xlib |
|
import Xlib.display |
|
|
|
self.dpy = Xlib.display.Display() |
|
self.dpy.screen().root.change_attributes( |
|
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) |
|
|
|
def run(self): |
|
last_active_class = "" |
|
while True: |
|
self.dpy.next_event() |
|
|
|
try: |
|
cls = self.dpy.get_input_focus().focus.get_wm_class()[1] |
|
if cls != last_active_class: |
|
last_active_class = cls |
|
self.on_window_change(cls) |
|
except: |
|
import traceback |
|
traceback.print_exc() |
|
pass |
|
|
|
# :( |
|
class GnomeMonitor(): |
|
def __init__(self, on_window_change): |
|
assert_env('GNOME_SETUP_DISPLAY') |
|
|
|
self.on_window_change = on_window_change |
|
|
|
self.extension_dir = os.getenv('HOME') + '/.local/share/gnome-shell/extensions/keyd' |
|
self.fifo_path = self.extension_dir + '/keyd.fifo' |
|
|
|
def _install(self): |
|
os.makedirs(self.extension_dir, exist_ok=True) |
|
|
|
extension = ''' |
|
const Shell = imports.gi.Shell; |
|
|
|
// We have to keep an explicit reference around to prevent garbage collection :/. |
|
let file = imports.gi.Gio.File.new_for_path('%s'); |
|
let pipe = file.append_to_async(0, 0, null, on_pipe_open); |
|
|
|
function send(msg) { |
|
if (!pipe) |
|
return; |
|
|
|
try { |
|
pipe.write(msg, null); |
|
} catch { |
|
log('pipe closed, reopening...'); |
|
pipe = null; |
|
file.append_to_async(0, 0, null, on_pipe_open); |
|
} |
|
} |
|
|
|
function on_pipe_open(file, res) { |
|
log('pipe opened'); |
|
pipe = file.append_to_finish(res); |
|
} |
|
|
|
function init() { |
|
Shell.WindowTracker.get_default().connect('notify::focus-app', () => { |
|
send(`${global.display.focus_window.get_wm_class()}\n`); |
|
}); |
|
|
|
return { enable: ()=>{}, disable: ()=>{} }; |
|
} |
|
''' % (self.fifo_path) |
|
|
|
metadata = ''' |
|
{ |
|
"name": "keyd", |
|
"description": "Used by keyd to obtain active window information.", |
|
"uuid": "keyd", |
|
"shell-version": [ "41" ] |
|
} |
|
''' |
|
|
|
open(self.extension_dir + '/metadata.json', 'w').write(metadata) |
|
open(self.extension_dir + '/extension.js', 'w').write(extension) |
|
os.mkfifo(self.fifo_path) |
|
|
|
def init(self): |
|
if not os.path.exists(self.extension_dir): |
|
print('keyd extension not found, installing...') |
|
self._install() |
|
print('Success! Please restart Gnome and rerun this script.') |
|
exit(0) |
|
|
|
run_or_die('gsettings set org.gnome.shell disable-user-extensions false'); |
|
run_or_die('gnome-extensions enable keyd', 'Failed to enable keyd extension.') |
|
|
|
def run(self): |
|
for cls in open(self.fifo_path): |
|
cls = cls.strip() |
|
self.on_window_change(cls) |
|
|
|
def get_monitor(on_window_change): |
|
monitors = [ |
|
('Sway', SwayMonitor), |
|
('Gnome', GnomeMonitor), |
|
('X', XMonitor), |
|
] |
|
|
|
for name, mon in monitors: |
|
try: |
|
m = mon(on_window_change) |
|
print(f'{name} detected') |
|
return m |
|
except: |
|
pass |
|
|
|
print('Could not detect app environment :(.') |
|
sys.exit(-1) |
|
|
|
def lock(): |
|
global lockfh |
|
lockfh = open(LOCKFILE, 'w') |
|
try: |
|
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB) |
|
except: |
|
die('only one instance may run at a time') |
|
|
|
def ping_keyd(): |
|
run_or_die('keyd -e ping', |
|
'could not connect to keyd instance, make sure it is running and you are a member of `keyd`') |
|
|
|
def daemonize(): |
|
print('Daemonizing...') |
|
|
|
fh = open(LOGFILE, 'w') |
|
|
|
os.close(1) |
|
os.close(2) |
|
os.dup2(fh.fileno(), 1) |
|
os.dup2(fh.fileno(), 2) |
|
|
|
if os.fork(): exit(0) |
|
if os.fork(): exit(0) |
|
|
|
opt = argparse.ArgumentParser() |
|
opt.add_argument('-m', '--monitor', default=False, action='store_true', help='print window class names in real time') |
|
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background') |
|
args = opt.parse_args() |
|
|
|
if not os.path.exists(CONFIG_PATH): |
|
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf') |
|
|
|
bindings = parse_config(CONFIG_PATH) |
|
ping_keyd() |
|
lock() |
|
|
|
def normalize_class(s): |
|
return re.sub('[^A-Za-z0-9]', '-', s).strip('-').lower() |
|
|
|
last_mtime = os.path.getmtime(CONFIG_PATH) |
|
def on_window_change(cls): |
|
global last_mtime |
|
global bindings |
|
|
|
cls = normalize_class(cls) |
|
mtime = os.path.getmtime(CONFIG_PATH) |
|
|
|
if mtime != last_mtime: |
|
print(CONFIG_PATH + ': Updated, reloading config...') |
|
bindings = parse_config(CONFIG_PATH) |
|
last_mtime = mtime |
|
|
|
if args.monitor: |
|
print(cls) |
|
return |
|
|
|
if cls in bindings: |
|
# Apply the bindings. |
|
subprocess.run(['keyd', '-e', *bindings[cls]]) |
|
|
|
|
|
mon = get_monitor(on_window_change) |
|
mon.init() |
|
|
|
if args.daemonize: |
|
daemonize() |
|
|
|
mon.run()
|
|
|