diff --git a/scripts/keyd-application-mapper b/scripts/keyd-application-mapper index afc1439..2aa6924 100755 --- a/scripts/keyd-application-mapper +++ b/scripts/keyd-application-mapper @@ -3,6 +3,8 @@ import subprocess import argparse import select +import socket +import struct import os import shutil import re @@ -101,99 +103,87 @@ def new_interruptible_generator(monfd, genfn): if monfd in r: yield genfn() -class SwayMonitor(): - def __init__(self, on_window_change): - assert_env('SWAYSOCK') - self.on_window_change = on_window_change +# Just enough wayland wire protocol to listen for interesting events. +# +# Sadly most of the useful protocols haven't been standardized, +# so this only works for wlroots based compositors :(. - def init(self): - pass +class Wayland(): + def __init__(self, interface_name): + path = os.getenv("WAYLAND_DISPLAY") + if path == None: + raise Exception("WAYLAND_DISPLAY not set (is wayland running?)") - def run(self): - import json - import subprocess + if path[0] != '/': + path = os.getenv("XDG_RUNTIME_DIR") + "/" + path - last_cls = '' - last_title = '' + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(path) + self._bind_interface(interface_name) - swayproc = subprocess.Popen( - ['swaymsg', - '--type', - 'subscribe', - '--monitor', - '--raw', - '["window"]'], stdout=subprocess.PIPE) + def send_msg(self, object_id, opcode, payload): + opcode |= (len(payload)+8) << 16 - for line in new_interruptible_generator(swayproc.stdout.fileno(), swayproc.stdout.readline): - if line == None: - self.on_window_change(last_cls, last_title) - continue + self.sock.sendall(struct.pack(b'II', object_id, opcode)) + self.sock.sendall(payload) - data = json.loads(line) + def recv_msg(self): + (object_id, evcode) = struct.unpack('II', self.sock.recv(8)) - title = '' - cls = '' + size = evcode >> 16 + evcode = evcode & 0xFFFF - try: - if data['container']['focused'] == True: - props = data['container']['window_properties'] + data = self.sock.recv(size-8) - cls = props['class'] - title = props['title'] - except: - try: - title = '' - cls = data['container']['app_id'] - except: - pass + return (object_id, evcode, data) - if title == '' and cls == '': - continue + def read_string(self, b): + return b[4:4+struct.unpack('I', b[:4])[0]-1].decode('utf8') + + # Will bind the requested interface to object id 4 + def _bind_interface(self, name): + # bind the registry object to id 2 + self.send_msg(1, 1, b'\x02\x00\x00\x00') + # solicit a sync message to bookend the registry events + self.send_msg(1, 0, b'\x03\x00\x00\x00') + while True: + (obj, event, payload) = self.recv_msg() + if obj == 2 and event == 0: # registry.global event + wl_name = struct.unpack('I', payload[0:4])[0] - if last_cls != cls or last_title != title: - last_cls = cls - last_title = title + wl_interface = self.read_string(payload[4:]) - self.on_window_change(cls, title) + if wl_interface == name: + self.send_msg(2, 0, payload+b'\x04\x00\x00\x00') + return -class HyprMonitor(): - def __init__(self, on_window_change): - assert_env('HYPRLAND_INSTANCE_SIGNATURE') + if obj == 3: # sync message + raise Exception(f"Could not find interface {name}") + +class Wlroots(): + def __init__(self, on_window_change): + self.wl = Wayland('zwlr_foreign_toplevel_manager_v1') self.on_window_change = on_window_change def init(self): pass def run(self): - import socket - - last_cls = '' - last_title = '' - - s = "/tmp/hypr/" + str(os.getenv('HYPRLAND_INSTANCE_SIGNATURE')) + "/.socket2.sock" - - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client.connect(s) - + windows = {} while True: - data = client.recv(4096) - msg = data.decode('utf-8') - tmp = msg.split('>>') - if tmp[0] == 'activewindow' and len(tmp) > 1: - cls, title = tmp[1].split(',') - - if title == '' and cls == '': - continue - - if last_cls != cls or last_title != title: - last_cls = cls - last_title = title - - self.on_window_change(cls, title) - - client.close() + (obj, event, payload) = self.wl.recv_msg() + if obj == 4 and event == 0: + windows[struct.unpack('I', payload)[0]] = {} + + if obj in windows: + if event == 0: + windows[obj]['title'] = self.wl.read_string(payload) + if event == 1: + windows[obj]['appid'] = self.wl.read_string(payload) + if event == 4 and payload[0] > 0 and payload[4] == 2: + self.on_window_change(windows[obj]['appid'], windows[obj]['title']) class XMonitor(): def __init__(self, on_window_change): @@ -411,8 +401,7 @@ class GnomeMonitor(): def get_monitor(on_window_change): monitors = [ - ('Sway', SwayMonitor), - ('Hyprland', HyprMonitor), + ('wlroots', Wlroots), ('Gnome', GnomeMonitor), ('X', XMonitor), ]