You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
365 lines
9.7 KiB
365 lines
9.7 KiB
#!/usr/bin/python3 |
|
|
|
# Copyright © 2019 Raheman Vaiya. |
|
# |
|
# Permission is hereby granted, free of charge, to any person obtaining a |
|
# copy of this software and associated documentation files (the "Software"), |
|
# to deal in the Software without restriction, including without limitation |
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense, |
|
# and/or sell copies of the Software, and to permit persons to whom the |
|
# Software is furnished to do so, subject to the following conditions: |
|
# |
|
# The above copyright notice and this permission notice (including the next |
|
# paragraph) shall be included in all copies or substantial portions of the |
|
# Software. |
|
# |
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
|
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
|
# DEALINGS IN THE SOFTWARE. |
|
|
|
import selectors |
|
import fcntl |
|
import glob |
|
import time |
|
import struct |
|
import os |
|
from ctypes import * |
|
import keys |
|
import sys |
|
import signal |
|
import random |
|
import re |
|
|
|
|
|
class VirtualKeyboard(): |
|
def __init__(self, name, product_id=0x9234, vendor_id=0x567a): |
|
EV_SYN = 0x00 |
|
EV_KEY = 0x01 |
|
UI_SET_EVBIT = 0x40045564 |
|
UI_SET_KEYBIT = 0x40045565 |
|
UI_DEV_SETUP = 0x405c5503 |
|
UI_DEV_CREATE = 0x5501 |
|
|
|
BUS_USB = 0x03 |
|
version = 0 |
|
|
|
self.uinp = os.open("/dev/uinput", os.O_WRONLY | os.O_NONBLOCK) |
|
fcntl.ioctl(self.uinp, UI_SET_EVBIT, EV_KEY) |
|
fcntl.ioctl(self.uinp, UI_SET_EVBIT, EV_SYN) |
|
|
|
for _, key in keys.names.items(): |
|
if not keys.is_mouse_button(key): |
|
fcntl.ioctl(self.uinp, UI_SET_KEYBIT, key.code) |
|
|
|
setup_struct = struct.pack('HHHH80bI', |
|
BUS_USB, |
|
vendor_id, |
|
product_id, |
|
version, |
|
*([ord(c) for c in name] + |
|
([0] * (80 - len(name)))), |
|
0) |
|
|
|
fcntl.ioctl(self.uinp, UI_DEV_SETUP, setup_struct) |
|
fcntl.ioctl(self.uinp, UI_DEV_CREATE) |
|
|
|
# Kludge to give the new device some time to propagate up the |
|
# input stack |
|
time.sleep(.3) |
|
|
|
def write_code(self, code, pressed): |
|
EV_KEY = 0x01 |
|
EV_SYN = 0x00 |
|
|
|
b = struct.pack("llHHi", 0, 0, EV_KEY, code, pressed) |
|
os.write(self.uinp, b) |
|
b = struct.pack("llHHi", 0, 0, EV_SYN, 0, 0) |
|
|
|
os.write(self.uinp, b) |
|
|
|
def send_key(self, name, pressed): |
|
code = 0 |
|
|
|
if name in keys.names: |
|
code = keys.names[name].code |
|
elif name in keys.alt_names: |
|
code = keys.alt_names[name].code |
|
else: |
|
raise Exception(f'Could not find corresponding key for \"{name}\"') |
|
|
|
self.write_code(code, pressed) |
|
|
|
def send_string(self, s): |
|
for c in s: |
|
shifted = False |
|
code = 0 |
|
|
|
if c in keys.names: |
|
code = keys.names[c].code |
|
elif c in keys.alt_names: |
|
code = keys.alt_names[c].code |
|
else: |
|
code = keys.shifted_names[c].code |
|
shifted = True |
|
|
|
if shifted: |
|
self.write_code(keys.names["shift"].code, 1) |
|
|
|
self.write_code(code, 1) |
|
self.write_code(code, 0) |
|
|
|
if shifted: |
|
self.write_code(keys.names["shift"].code, 0) |
|
|
|
|
|
class KeyStream(): |
|
def grab(self): |
|
EVIOCGRAB = 0x40044590 |
|
fcntl.ioctl(self.fh, EVIOCGRAB, 1) |
|
|
|
def ungrab(self): |
|
EVIOCGRAB = 0x40044590 |
|
fcntl.ioctl(self.fh, EVIOCGRAB, 0) |
|
|
|
def get_name(self, fh): |
|
EVIOCGNAME = 0x81004506 |
|
buf = bytes(256) |
|
|
|
name = fcntl.ioctl(fh, EVIOCGNAME, buf) |
|
return c_char_p(name).value.decode('utf8') |
|
|
|
def get_ids(self, fh): |
|
EVIOCGID = 0x80084502 |
|
buf = bytes(8) |
|
|
|
resp = fcntl.ioctl(fh, EVIOCGID, buf) |
|
|
|
(_, vendor, product, _) = struct.unpack("HHHH", resp) |
|
return (product, vendor) |
|
|
|
def __init__(self, product=0x00, vendor=0x09, name=""): |
|
self.fh = None |
|
for f in glob.glob("/dev/input/event*"): |
|
fh = open(f, 'rb') |
|
fh.devname = self.get_name(fh) |
|
p, v = self.get_ids(fh) |
|
|
|
if (p == product and v == vendor) or (name != "" and fh.devname == name): |
|
self.fh = fh |
|
|
|
if not self.fh: |
|
raise Exception( |
|
'Could not find keyboard with id %04x:%04x' % (vendor, product)) |
|
|
|
# Collect all events currently sitting on the input stream. |
|
def collect(self): |
|
EV_KEY = 0x01 |
|
|
|
events = [] |
|
|
|
flags = fcntl.fcntl(self.fh, fcntl.F_GETFL) |
|
fcntl.fcntl(self.fh, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
|
|
|
while True: |
|
ev = self.fh.read(24) |
|
if not ev: |
|
fcntl.fcntl(self.fh, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) |
|
return events |
|
|
|
_, _, type, code, value = struct.unpack("llhhi", ev) |
|
|
|
if type == EV_KEY: |
|
key = keys.codes[code] |
|
|
|
events.append((key, value)) |
|
|
|
# Block until the next event |
|
def next(self): |
|
EV_KEY = 0x01 |
|
|
|
while True: |
|
ev = self.fh.read(24) |
|
_, _, type, code, value = struct.unpack("llhhi", ev) |
|
|
|
if type == EV_KEY: |
|
key = keys.codes[code] |
|
|
|
return key, value |
|
|
|
|
|
output = '' |
|
|
|
|
|
def on_timeout(a, b): |
|
print('ERROR: test timed out') |
|
exit(-1) |
|
|
|
|
|
# If we don't terminate within 5 seconds something has gone |
|
# horribly wrong... |
|
signal.signal(signal.SIGALRM, on_timeout) |
|
signal.alarm(20) |
|
|
|
vkbd = VirtualKeyboard('test keyboard', vendor_id=0x2fac, product_id=0x2ade) |
|
stream = KeyStream(name="keyd virtual keyboard") |
|
|
|
# Grab the virtual keyboard so we don't |
|
# cause pandemonium. |
|
|
|
stream.grab() |
|
|
|
exit_on_fail = False |
|
|
|
|
|
def diff(output, expected): |
|
n = max(len(expected), len(output)) |
|
|
|
s = '' |
|
for i in range(n): |
|
e = expected[i] if i < len(expected) else "" |
|
o = output[i] if i < len(output) else "" |
|
|
|
if e != o: |
|
s += '\x1b[33m%-20s\x1b[0m \x1b[31m%s\x1b[0m\n' % (e, o) |
|
else: |
|
s += '%-20s %s\n' % (e, o) |
|
|
|
return s |
|
|
|
|
|
class TestElement: |
|
def __init__(self, type, code, val): |
|
self.type = type |
|
self.code = code |
|
self.value = val |
|
|
|
|
|
# Busy wait to minimize imprecision |
|
# (sleep() is inaccurate). |
|
def sleep(ms): |
|
us = ms * 1000 |
|
start = time.time() |
|
|
|
while True: |
|
if ((time.time() - start) * 1E6) >= us: |
|
return |
|
|
|
|
|
def run_test(name, input, output, verbose): |
|
def printerr(s): |
|
print(f'{name}: \x1b[31mERROR\x1b[0m: {s}') |
|
|
|
if verbose: |
|
sys.stdout.write('Input:\n%s\n\n%-20s %s\n%s' % |
|
(input, "Expected Output:", "Output:", diff(result, expected))) |
|
|
|
elements = [] |
|
|
|
for line in input.strip().split('\n'): |
|
line = line.strip() |
|
try: |
|
timeout = int(re.match('^([0-9]+)ms$', line).group(1)) |
|
elements.append(TestElement('timeout', 0, timeout)) |
|
continue |
|
except: |
|
pass |
|
|
|
key, state = line.split(' ') |
|
depress = 0 |
|
|
|
if state == "down": |
|
depress = 1 |
|
|
|
code = 0 |
|
if key in keys.names: |
|
code = keys.names[key].code |
|
else: |
|
code = keys.alt_names[key].code |
|
|
|
elements.append(TestElement('code', code, depress)) |
|
|
|
# Actually run the test, keep this separate from parsing to minimize |
|
# latency. The system may still preempt the thread causing spurious time |
|
# dependent test failures. There isn't much that can be done to mitigate |
|
# this :/. |
|
|
|
for e in elements: |
|
if e.type == 'timeout': |
|
sleep(e.value) |
|
continue |
|
else: |
|
vkbd.write_code(e.code, e.value) |
|
|
|
expected = output.strip().split('\n') |
|
result = [] |
|
|
|
time.sleep(0.00003) |
|
results = stream.collect() |
|
|
|
# Try again, timeout may have been insufficient. |
|
if len(results) != len(expected): |
|
print('WARNING: Insufficient output, timing out one more time...') |
|
time.sleep(.05) |
|
results += stream.collect() |
|
|
|
for k, v in results: |
|
result.append(f'{k} {"up" if v == 0 else "down"}') |
|
|
|
if len(result) > len(expected): |
|
printerr('Extraneous keys.') |
|
return False |
|
|
|
if len(result) < len(expected): |
|
printerr('Missing keys.') |
|
return False |
|
|
|
for i in range(len(expected)): |
|
if result[i] != expected[i]: |
|
printerr( |
|
f'mismatch: expected \033[33m{expected[i]}\033[0m got \033[31m{result[i]}\033[0m.') |
|
return False |
|
|
|
print(f'{name}: \x1b[33mPASSED\x1b[0m') |
|
return True |
|
|
|
|
|
import argparse |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('-v', '--verbose', default=False, action='store_true') |
|
parser.add_argument('-e', '--exit-on-fail', default=False, action='store_true') |
|
parser.add_argument('files', nargs=argparse.REMAINDER) |
|
args = parser.parse_args() |
|
|
|
|
|
# Prevent gc from interfering with |
|
# timeout precision. |
|
import gc |
|
import os |
|
|
|
gc.collect() |
|
gc.disable() |
|
os.nice(-20) |
|
|
|
tests = [] |
|
failed = False |
|
|
|
for file in args.files: |
|
name = file |
|
input, output = open(file, 'r').read().split('\n\n') |
|
|
|
tests.append((name, input, output)) |
|
|
|
if not run_test(name, input, output, args.verbose): |
|
if args.exit_on_fail: |
|
exit(-1) |
|
|
|
failed = True |
|
|
|
if failed: |
|
exit(-1) |
|
|
|
#tests = tests * 1000
|
|
|