Skip to content
Snippets Groups Projects
Commit 76dba182 authored by James Vasile's avatar James Vasile
Browse files

Move launch to floatilla, adjust switch-sway-display

parent dae1110b
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
"""
Run-or-raise launcher
~/.config/launch.toml has the mapping for various applications.
"""
import argparse
from collections import UserList
import i3ipc # would that we could make this optional
import os
from pathlib import Path
import pprint
import shlex
import subprocess
import sys
import tomllib
from typing import Callable, Optional, Union
pp = pprint.PrettyPrinter(indent=4, width=120).pprint
pf = pprint.PrettyPrinter(indent=4, width=120).pformat
CONFIG_FNAME = Path("~/.config/launch.toml").expanduser()
VERBOSE = False
WAYLAND = os.environ['XDG_SESSION_TYPE'] == "wayland"
SWAY = os.environ['XDG_CURRENT_DESKTOP'] == "sway"
if SWAY:
i3 = i3ipc.Connection()
workspaces = i3.get_workspaces()
def err(msg:str, name:str=""):
if name:
stderr=open(f'/tmp/launch/{name}.stderr.log', 'a')
stderr.write(msg)
stderr.write("\n")
sys.stderr.write(msg)
sys.stderr.write("\n")
sys.exit(-1)
def out(msg, verbose=VERBOSE):
if verbose:
Log().write(out)
def print_wins(msg, wins):
out(f"{msg}:")
for win in wins:
out(win)
class wmctrl:
"wmctrl and xdotool runner"
def __init__(self):
if WAYLAND:
Exception("ERROR: Trying to use wmctrl but not in an X session\n")
# Get name of window manager
self.wm = self.run('-m')[0].split(": ")[1]
def run(self, cmd):
output = subprocess.check_output(f"wmctrl {cmd}", shell=True)
return output.decode("utf-8").strip().split("\n")
def list(self):
return self.run("-lxp")
def get_curr_desktop(self):
for line in self.run("-d"):
parts = line.split()
if parts[1] == "*":
return int(parts[0])
def switch_to(self, id):
Log().write(f"Switching to {id}")
self.run(f"-i -a {id}")
def get_curr_window_id(self):
"Returns id (in decimal, not hex) of current window"
output = subprocess.check_output("xdotool getwindowfocus", shell=True)
return int(output.decode("utf-8").strip())
class XProp(dict):
"""Model xprop results
This parses xprop output quite poorly. See, e.g. the _NET_WM_ICON(CARDINAL)
for kitty terminal. It's good enough for our use, though."""
def __init__(self, id):
dict.__init__(self)
self.id = id
self.raw = self.run(f"-id {self.id}")
state = "kv"
key = ""
for line in self.raw:
state, key = self.parse_line(line, state, key)
def parse_line(self, line, state, key):
"parse a line of xprop output and add to self dict"
if state == "kv":
if line.endswith(":"):
key = line[-1]
self[key] = {}
return "dict", key
else:
parts = line.split()
if not parts:
return "kv", ""
if parts[0].endswith(":"):
subparts = line.split(": ", 2)
self[parts[0]] = subparts[1]
return "kv", ""
parts = line.split(" = ", 2)
if parts[1].startswith('"') and parts[1].endswith('"'):
parts[1] = parts[1][1:-1]
self[parts[0]] = parts[1]
return "kv", ""
elif state == "dict":
if not line.startswith("\t"):
return self.parse_line(line, "kv", "")
parts = line.strip().split(": ", 1)
if len(parts) == 1:
return "dict", key
self[key][parts[0]] = parts[1]
return "dict", key
def run(self, cmd):
output = subprocess.check_output(f"xprop {cmd}", shell=True)
return output.decode("utf-8").strip().split("\n")
class Window:
"""Model a window, regardless of whether it's an x window or a wayland window
Note: we model sway windows by subclassing"""
def __init__(self, id:Union[int,str], desktop:Union[int,str], pid:Union[int,str], wm_class:str, title:str):
self.id = int(id)
self.desktop = str(desktop)
self.pid = int(pid)
self.wm_class = wm_class.split('.')[0]
self.title = title
def __str__(self):
return (
f"{self.id} ({self.pid}) on {self.desktop} [{self.wm_class}] {self.title}"
)
def __repr__(self):
return str(self)
def load_xprop(self):
if WAYLAND:
Exception("ERROR: Trying to use xprop but not in an X session\n")
self.xprop = XProp(self.id)
class SwayWin(Window):
def __init__(self, swaywin:i3ipc.con.Con):
self.swaywin = swaywin
self.id = swaywin.id
self.desktop = self.get_desktop()
self.pid = swaywin.pid
self.title = swaywin.name
# For wayland windows, the app id is the class, but for
# xwayland windows, the app_id is None and there is a class
# set.
self.wm_class = swaywin.app_id or swaywin.window_class or ""
self.wm_class = self.wm_class.lower()
def get_desktop(self)->Optional[str]:
"In sway, desktops get names, not numbers"
desktop = self.swaywin.workspace()
# Workspaces have names, but not every container is in a workspace. The
# root container, for example, isn't in one. Any others? Maybe? I
# multiple containers get a None desktop, but fi haven't investigated.
# They are early in the window tree, and I suppose we could collect
# them, but I haven't yet.
if desktop is None:
return None
return desktop.name
def dump(win:Window):
win = win.swaywin
if win.window_title != win.name:
print(f"Class/title/name: {win.window_class} / {win.window_title} / {win.name}")
else:
print(f"Class -=> title/name: {win.window_class} -=> {win.window_title}")
if win.type != "con":
print(" Type:", win.type)
print(f" Id (PID) App ID / role: {win.id} ({win.pid}) {win.app_id} / {win.window_role}")
if win.focused:
print(" Focused:", win.focused)
l = win.leaves()
if l:
print(" Leaves:", " | ".join([x.name for x in l]))
d = win.descendants()
if d:
print(" Descendants:", " | ".join([x.name for x in d]))
print(" Workspace:", win.workspace().name)
print(" Workspaces:", " | ".join([x.name for x in win.workspaces()]))
class Sway():
def __init__(self):
"""
This class models a tree (or sub-tree) of sway windows.
We can examine the tree in self.sway.tree.descendants. That func
returns a list of objects of type i3ipc.con.Con, which is short for
container. Each Con has its own descendants. """
self.tree = i3.get_tree()
self.current_window = self.tree.find_focused()
def get_curr_win_id(self) -> int:
"""Return id of current window. This id is relevant to sway, but not
elseshwere, AFAICT"""
return self.current_window.id
def get_window_list(self) -> list[Window]:
"""Sway keeps its windwows as a tree, but we want them as a list.
Maybe, though, we'd be better off just using self.tree. That might
require turning the wmctrl list of windows into a tree.
And on further inspection, it looks like the dscendants tree
has redundant info (see dump docstring). We'll just reap the
windows from the top level."""
# I thought leaves would be good, but these don't reflect
# floating windows!
# return [SwayWin(w) for w in self.tree.leaves()]
# This is what we were using before, but when we had nested
# containers and splits, it returned it returned some objects
# that had no class or app_id. We match on those, so it wasn't good.
return [SwayWin(w) for w in self.tree.descendants() if w.type not in "output|workspace"]
def search(self, predicate:Callable[[i3ipc.con.Con], bool] ) -> list[Window]:
"""Return all windows that match the predicate function.
We're not currently using this..."""
ret:list[Window] = []
def walk(tree:i3ipc.con.Con):
if predicate(tree):
ret.append(tree)
if tree.nodes:
for subtree in tree.nodes:
walk(subtree)
else:
for subtree in tree.descendants():
walk(subtree)
walk(self.tree)
return ret
def dump(self) -> str:
"""Tree is structured as nodes of i3ipc.con.Con
(i.e. containers). The windows are also of type
i3ipc.con.Con, but they are not in the node tree. They are
descendants of the container nodes. But the descendants field
is too broad for us. At the root, the descendants includes
windows, outpus and workspaces, then when you drill down into
outputs, you get workspcaes again. Then you drill down and
windows appear again multiple times because the descendants
tree has multiples of the workspaces.
As a result, we walk the nodes, then the descendants of the
node leaves to get to the windows."""
def walk(tree:i3ipc.con.Con, indent=0) -> str:
ret = " "*indent + str(SwayWin(tree)) + "\n"
for subtree in tree.nodes:
ret += walk(subtree, indent+1)
if not tree.nodes:
for subtree in tree.descendants():
ret += walk(subtree, indent+1)
return ret
ret = walk(self.tree)
print(ret)
return ret
class Windows(UserList):
wmctrl = None
"Model a list of windows, regardless of whether they're xwindows or wayland windows"
def __init__(self):
UserList.__init__(self)
self.data = [] # UserList stores data here, but we just access via self
if SWAY:
self.sway = Sway()
self.curr_win_id = self.get_curr_win_id()
self.curr_win:Window = SwayWin(self.sway.current_window)
self.curr_desktop = self.curr_win.desktop
# Build a list of windows
self.extend(self.sway.get_window_list())
elif WAYLAND:
raise Exception("On wayland, but not Sway. No way to get window info")
else:
self.wmctrl = wmctrl()
self.curr_win_id = self.get_curr_win_id()
# Build a list of windows
for win in self.wmctrl.list():
parts = [p for p in win.split(" ") if p]
parts[0] = int(parts[0], 16)
parts[1] = int(parts[1])
if parts[1] < 0:
continue
self.append(
Window(parts[0], parts[1], parts[2], parts[3], " ".join(parts[5:]))
)
self.curr_win:Window = self.get_win_by_id(self.curr_win_id)[0]
self.curr_desktop = self.wmctrl.get_curr_desktop()
if self.curr_desktop != self.curr_win.desktop:
# This is consistency check. It should never fail.
Exception(f"Assertion failed: {self.curr_desktop} != {self.curr_win.desktop}")
def get_curr_win_id(self):
if SWAY:
return self.sway.get_curr_win_id()
elif WAYLAND:
raise Exception("Getting curr win id on wayland not implemented yet")
else:
return self.wmctrl.get_curr_window_id()
def get_win_by_id(self, ident:int)->list[Window]:
"""This returns a list of windows matching the id, but it shouldn't. It
should return a window or None"""
return [w for w in self if w.id == ident]
def get_win_by_class(self, wm_class):
if VERBOSE:
pp(self)
return [w for w in self if w.wm_class == wm_class]
def get_closest_win_by_class(self, classes, skip_current:bool=True) -> list:
if VERBOSE:
print(f"get_closest_win_by_class: {classes}")
# Remove current window, if it's there
wins = []
for wm_class in classes:
for w in self.get_win_by_class(wm_class):
if VERBOSE:
print(f"Win by class {wm_class}:", w)
if skip_current and w.id == self.curr_win_id:
continue
wins.append(w)
desktop = self.curr_desktop
curr = [w for w in wins if w.desktop == desktop and w.id != self.curr_win_id]
if curr:
return curr
return wins
def get_win_by_desktop(self, desktop):
return [w for w in self if w.desktop == desktop]
def switch_to(self, win):
Log().write_win(win)
if SWAY:
i3.command(f'[pid="{win.pid}"] focus')
elif WAYLAND:
raise Exception("Unimplemented: Wayland switch to window by id")
else:
self.switch_to_by_id(win.id)
def switch_to_by_id(self, ident:int, log=False):
if log:
win = self.get_window_by_id(ident)
Log().write_win(win)
self.wmctrl.switch_to(ident)
def __str__(self):
return "\n".join([str(w) for w in self])
def dump(self):
for win in self:
win.dump()
class Launcher():
env:dict[str,str] = {}
key = ''
targets:list[str] = []
launch = ''
def __init__(self, key:str="", targets:Union[str,list[str]]=[], launch:str="") -> None:
self.key = key or self.key
if isinstance(targets, str):
targets = [targets]
self.targets = targets or self.targets
for target in self.targets:
if "." in target:
self.targets.extend(target.split("."))
self.launch = launch or self.launch
def __repr__(self) -> str:
return (f"Launcher({self.key}, {self.targets}, {self.launch})")
def __str__(self) -> str:
return self.__repr__()
class Launchers(dict[str, Launcher]):
def __init__(self):
ll = tomllib.load(open(CONFIG_FNAME, "rb"))
for key, launcher in ll.items():
self[key] = Launcher(key, launcher['target'], launcher['launch'])
class Runner():
def __init__(self, cli:Union[list[str],str]):
if isinstance(cli, list):
self.cli = cli
else:
self.cli = shlex.split(cli)
def already_switch(self, search_term:str, full=False):
"""Switch to win if our launch target is already running
Set FULL flag to check full command line
This doesn't actually switch. When it does, we can replcae already_exit
with it"""
full_flag = "f" if full else ""
try:
wins = subprocess.check_output(f"pgrep -{full_flag}xa '^{search_term}'", shell=True)
print("wins:", wins.decode().split(' ')[0])
print(wins)
Log().write("Already running. Switching...")
sys.exit()
except subprocess.CalledProcessError:
return
def already_exit(self, full=False):
"""Exit if our launch target is already running
Set FULL flag to check full command line """
full_flag = "f" if full else ""
try:
# subprocess.check_call(f"pgrep -{full_flag}xa '^{self.cli[0]}'", shell=True)
Log().write("Already running.")
sys.exit()
except subprocess.CalledProcessError:
return
def run(self):
parts = ['nohup'] + self.cli
log_name = parts[1]
if '/' in log_name:
log_name = log_name.split('/')[-1]
subprocess.Popen(parts,
stdout=open(f'/tmp/launch/{log_name}.stdout.log', 'a'),
stderr=open(f'/tmp/launch/{log_name}.stderr.log', 'a'),
preexec_fn=os.setpgrp
)
sys.exit()
def which(self) -> str:
name = os.path.split(self.cli[0])[1]
try:
binpath_b = subprocess.check_output(f"which {self.cli[0]}", shell=True)
return binpath_b.decode('utf-8').strip()
except subprocess.CalledProcessError:
err(f"Can't find executable for {self.cli[0]}", name)
return ""
def parse_cli():
parser = argparse.ArgumentParser(
prog='launch',
description='Run or raise program',
epilog='')
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--test', action='store_true', help="run doctests")
parser.add_argument('--dry-run', action='store_true', help="dry run")
parser.add_argument('program', nargs="?", default='')
opt = parser.parse_args()
if opt.verbose:
global VERBOSE
VERBOSE = True
print(opt)
if opt.test:
import doctest
print("Running doctest. There will be no output if all is well.")
doctest.testmod()
sys.exit()
return opt
def main(args):
opt = parse_cli()
# Load our windows
wins = Windows()
# If called with no args, print list of windows we know about and exit
if not opt.program:
if VERBOSE:
wins.dump()
else:
print(wins)
sys.exit()
try:
# If we have a launcher that matches the requested program, use it
launcher = Launchers()[opt.program]
except KeyError:
# no launcher found, try it as a command line
Log().write(f"No launcher found for {opt.program}")
runner = Runner(args[1:])
runner.already_exit()
cli = [runner.which()]
if len(args)>2:
cli.extend(args[2:])
runner = Runner(cli)
if not opt.dry_run:
runner.run()
# Non-window launcher. Only launch if not already launched.
if '/' in launcher.targets[0]: # / indicates it's a path to an executable
runner = Runner(launcher.target)
runner.already_exit(full=True)
if not opt.dry_run:
runner.run(launcher.launch)
sys.exit()
win = wins.get_closest_win_by_class(launcher.targets, skip_current=False)
if VERBOSE:
sys.stdout.write("Closest windows by class: ")
pp(win)
# If there's only one matching win and we're on it, bail
if len(win) == 0:
runner = Runner(launcher.launch)
# We used to check if it was already running, but we know it's not
# because there are no matching windows. Doh!
# Launch window because we cannot find it
Log().write("Cannot find window, so let's launch it")
os.system("mkdir -p /tmp/launch")
parts = shlex.split(launcher.launch)
if len(args) > 2:
parts.extend(args[2:])
Log().write(f"Cannot find {args[1]} window. Launching...\n"+shlex.join(parts))
parts = ['nohup'] + parts
if not opt.dry_run:
subprocess.Popen(parts,
stdout=open(f'/tmp/launch/{args[1]}.stdout.log', 'a'),
stderr=open(f'/tmp/launch/{args[1]}.stderr.log', 'a'),
preexec_fn=os.setpgrp
)
return 0
if len(win) == 1 and win[0].id == wins.curr_win_id:
Log().write("Already on that window.")
return 0
# Don't switch to tor browser
# ignore tor browser
win = [w for w in win if "Tor Browser" not in w.title]
if not win:
# No windows to choose from, so exit
Log().write("No windows to choose from, so exiting")
return 0
# If we're not in a window of the type requested, switch to it
if not wins.curr_win_id in [w.id for w in win]:
Log().write("We're not in a win of the type requested, so switch")
if not opt.dry_run:
wins.switch_to(win[0])
return 0
Log().write(win)
# But it looks like we're already on a window of the type requested and
# there are other candidates, so let's cycle through them.
# Remove current window from top of window list
# win = win[1:]
# Reorder so we take our least-recently used window first
ordered = LOG.read_windows(launcher.target, win)
Log().write("ORDERED", ordered)
# remove any windows that don't exist anymore
ordered = [o for o in ordered if o in win]
# Just switch to windows missing from our stack so they get into our history
not_ordered = [w for w in win if not w in ordered]
if not_ordered:
if not opt.dry_run:
wins.switch_to(not_ordered[0])
# Remove current window
ordered = [o for o in ordered if not o.id == wins.curr_win_id]
Log().write(wins.curr_win_id)
Log().write("ORDERED"+str(ordered))
# Switch to target window
if not opt.dry_run:
wins.switch_to(ordered[-1])
class Log():
fname = "/tmp/launch/log_win.txt"
def __init__(self):
os.system("mkdir -p /tmp/launch")
def write(self, *msgs):
msg = " ".join([str(s) for s in msgs])
print(msg)
fname = "/tmp/launch/log.txt"
with open(fname, "a") as FH:
FH.write(msg+"\n")
def write_win(self, win):
# Record window selection
with open(self.fname, "a") as FH:
# Trim title (which records names of web pages!) from log
log_line = " ".join(str(win).split(" ")[0:5]) + "\n"
FH.write(log_line)
def read(self, class_name):
"""Return log lines for window activity for a class"""
try:
with open(self.fname) as FH:
lines = FH.read().strip().split("\n")
except:
return []
lines = [line for line in lines if line.split(" ")[-1] == f"[{class_name}]"]
# Dedupe and take newest first
ret = []
for idx in range(len(lines)-1, -1, -1):
line = lines[idx]
if line in ret:
continue
else:
ret.append(line)
return ret
def read_windows(self, class_name, windows):
"""Return log lines for window activity for a class"""
lines = self.read(class_name)
ret = []
win_ids = [w.id for w in windows]
for line in lines:
ident = int(line.split(" ")[0])
for w in windows:
if ident == w.id:
ret.append(w)
return ret
if __name__ == "__main__":
# Don't ever run as root
if subprocess.check_output('whoami') == b'root\n':
os.system(f"sudo -u james {shlex.join(sys.argv)}")
sys.exit()
global log
LOG = Log()
Log().write(shlex.join(sys.argv))
if not WAYLAND:
# If we are coming from keyd, this isn't set, so we set it here to a default
os.environ['DISPLAY'] = os.environ.get("DISPLAY", ":0.0")
sys.exit(main(sys.argv))
......@@ -3,7 +3,9 @@
"""This is part of my sway setup.
It cycles through different combinations of turning two monitors on/off.
"""
We don't use wlr-randr. Instead, we're using sway-specific commands.
Might want to look at wlr-randr at some point, though. """
import json
import re
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment