@ -3,6 +3,8 @@
import subprocess
import argparse
import select
import socket
import struct
import os
import shutil
import re
@ -101,99 +103,87 @@ def new_interruptible_generator(monfd, genfn):
if monfd in r:
yield genfn()
class SwayMonitor():
def __init__(self, on_window_change):
assert_env('SWAYSOCK')
self.on_window_change = on_window_change
# Just enough wayland wire protocol to listen for interesting events.
#
# Sadly most of the useful protocols haven't been standardized,
# so this only works for wlroots based compositors :(.
def init(self):
pass
class Wayland():
def __init__(self, interface_name):
path = os.getenv("WAYLAND_DISPLAY")
if path == None:
raise Exception("WAYLAND_DISPLAY not set (is wayland running?)")
def run(self):
import json
import subprocess
if path[0] != '/':
path = os.getenv("XDG_RUNTIME_DIR") + "/" + path
last_cls = ''
last_title = ''
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(path)
self._bind_interface(interface_name)
swayproc = subprocess.Popen(
['swaymsg',
'--type',
'subscribe',
'--monitor',
'--raw',
'["window"]'], stdout=subprocess.PIPE)
def send_msg(self, object_id, opcode, payload):
opcode |= (len(payload)+8) << 16
for line in new_interruptible_generator(swayproc.stdout.fileno(), swayproc.stdout.readline):
if line == None:
self.on_window_change(last_cls, last_title)
continue
self.sock.sendall(struct.pack(b'II', object_id, opcode))
self.sock.sendall(payload)
data = json.loads(line)
def recv_msg(self):
(object_id, evcode) = struct.unpack('II', self.sock.recv(8))
title = ''
cls = ''
size = evcode >> 16
evcode = evcode & 0xFFFF
try:
if data['container']['focused'] == True:
props = data['container']['window_properties']
data = self.sock.recv(size-8)
cls = props['class']
title = props['title']
except:
try:
title = ''
cls = data['container']['app_id']
except:
pass
return (object_id, evcode, data)
if title == '' and cls == '':
continue
def read_string(self, b):
return b[4:4+struct.unpack('I', b[:4])[0]-1].decode('utf8')
# Will bind the requested interface to object id 4
def _bind_interface(self, name):
# bind the registry object to id 2
self.send_msg(1, 1, b'\x02\x00\x00\x00')
# solicit a sync message to bookend the registry events
self.send_msg(1, 0, b'\x03\x00\x00\x00')
while True:
(obj, event, payload) = self.recv_msg()
if obj == 2 and event == 0: # registry.global event
wl_name = struct.unpack('I', payload[0:4])[0]
if last_cls != cls or last_title != title:
last_cls = cls
last_title = title
wl_interface = self.read_string(payload[4:])
self.on_window_change(cls, title)
if wl_interface == name:
self.send_msg(2, 0, payload+b'\x04\x00\x00\x00')
return
class HyprMonitor():
def __init__(self, on_window_change):
assert_env('HYPRLAND_INSTANCE_SIGNATURE')
if obj == 3: # sync message
raise Exception(f"Could not find interface {name}")
class Wlroots():
def __init__(self, on_window_change):
self.wl = Wayland('zwlr_foreign_toplevel_manager_v1')
self.on_window_change = on_window_change
def init(self):
pass
def run(self):
import socket
last_cls = ''
last_title = ''
s = "/tmp/hypr/" + str(os.getenv('HYPRLAND_INSTANCE_SIGNATURE')) + "/.socket2.sock"
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(s)
windows = {}
while True:
data = client.recv(4096)
msg = data.decode('utf-8')
tmp = msg.split('>>')
if tmp[0] == 'activewindow' and len(tmp) > 1:
cls, title = tmp[1].split(',')
if title == '' and cls == '':
continue
if last_cls != cls or last_title != title:
last_cls = cls
last_title = title
self.on_window_change(cls, title)
client.close()
(obj, event, payload) = self.wl.recv_msg()
if obj == 4 and event == 0:
windows[struct.unpack('I', payload)[0]] = {}
if obj in windows:
if event == 0:
windows[obj]['title'] = self.wl.read_string(payload)
if event == 1:
windows[obj]['appid'] = self.wl.read_string(payload)
if event == 4 and payload[0] > 0 and payload[4] == 2:
self.on_window_change(windows[obj]['appid'], windows[obj]['title'])
class XMonitor():
def __init__(self, on_window_change):
@ -411,8 +401,7 @@ class GnomeMonitor():
def get_monitor(on_window_change):
monitors = [
('Sway', SwayMonitor),
('Hyprland', HyprMonitor),
('wlroots', Wlroots),
('Gnome', GnomeMonitor),
('X', XMonitor),
]