infra/deploy_nixos.py

214 lines
6.4 KiB
Python
Raw Normal View History

2021-10-21 11:09:52 +02:00
#!/usr/bin/env python3
import os
from contextlib import contextmanager
2021-10-24 01:02:47 +02:00
from typing import List, Dict, Tuple, IO, Iterator, Optional, Callable, Any, Union
2021-10-21 11:09:52 +02:00
from threading import Thread
import subprocess
2021-10-24 01:02:47 +02:00
import fcntl
import select
from contextlib import ExitStack
2021-10-21 11:09:52 +02:00
@contextmanager
def pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
(pipe_r, pipe_w) = os.pipe()
read_end = os.fdopen(pipe_r, "r")
write_end = os.fdopen(pipe_w, "w")
2021-10-24 01:02:47 +02:00
2021-10-21 11:09:52 +02:00
try:
2021-10-24 01:02:47 +02:00
fl = fcntl.fcntl(read_end, fcntl.F_GETFL)
fcntl.fcntl(read_end, fcntl.F_SETFL, fl | os.O_NONBLOCK)
2021-10-21 11:09:52 +02:00
yield (read_end, write_end)
finally:
read_end.close()
write_end.close()
2021-10-24 01:02:47 +02:00
FILE = Union[None, int]
2021-10-21 11:09:52 +02:00
class DeployHost:
def __init__(
self,
host: str,
user: str = "root",
port: int = 22,
forward_agent: bool = False,
command_prefix: Optional[str] = None,
meta: Dict[str, Any] = {},
) -> None:
self.host = host
self.user = user
self.port = port
if command_prefix:
self.command_prefix = command_prefix
else:
self.command_prefix = host
self.forward_agent = forward_agent
self.meta = meta
2021-10-24 01:02:47 +02:00
def _prefix_output(
self, print_fd: IO[str], stdout: Optional[IO[str]], stderr: Optional[IO[str]]
) -> Tuple[str, str]:
rlist = [print_fd]
if stdout is not None:
rlist.append(stdout)
if stderr is not None:
rlist.append(stderr)
print_buf = ""
stdout_buf = ""
stderr_buf = ""
while len(rlist) != 0:
r, _, _ = select.select(rlist, [], [])
if print_fd in r:
read = os.read(print_fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(print_fd)
print_buf += read.decode("utf-8")
if read == "" or "\n" in print_buf:
for line in print_buf.split("\n"):
print(f"[{self.command_prefix}] {line}")
print_buf = ""
def handle_fd(fd: Optional[IO[Any]]) -> str:
if fd and fd in r:
read = os.read(fd.fileno(), 4096)
if len(read) == 0:
rlist.remove(fd)
else:
return read.decode("utf-8")
return ""
stdout_buf += handle_fd(stdout)
stderr_buf += handle_fd(stderr)
return stdout_buf, stderr_buf
def _run(
self, cmd: str, shell: bool, stdout: FILE = None, stderr: FILE = None
) -> subprocess.CompletedProcess:
with ExitStack() as stack:
if stdout is None or stderr is None:
read_fd, write_fd = stack.enter_context(pipe())
if stdout is None:
stdout_read = None
stdout_write = write_fd
elif stdout == subprocess.PIPE:
stdout_read, stdout_write = stack.enter_context(pipe())
if stderr is None:
stderr_read = None
stderr_write = write_fd
elif stderr == subprocess.PIPE:
stderr_read, stderr_write = stack.enter_context(pipe())
2021-10-21 11:09:52 +02:00
with subprocess.Popen(
2021-10-24 01:02:47 +02:00
cmd, text=True, shell=shell, stdout=stdout_write, stderr=stderr_write
2021-10-21 11:09:52 +02:00
) as p:
write_fd.close()
2021-10-24 01:02:47 +02:00
if stdout == subprocess.PIPE:
stdout_write.close()
if stderr == subprocess.PIPE:
stderr_write.close()
stdout, stderr = self._prefix_output(read_fd, stdout_read, stderr_read)
ret = p.wait()
return subprocess.CompletedProcess(
cmd, ret, stdout=stdout, stderr=stderr
)
def run_local(
self, cmd: str, stdout: FILE = None, stderr: FILE = None
) -> subprocess.CompletedProcess:
print(f"[{self.command_prefix}] {cmd}")
return self._run(cmd, shell=True, stdout=stdout, stderr=stderr)
2021-10-21 11:09:52 +02:00
2021-10-24 01:02:47 +02:00
def run(
self, cmd: str, stdout: FILE = None, stderr: FILE = None
) -> subprocess.CompletedProcess:
2021-10-21 11:09:52 +02:00
print(f"[{self.command_prefix}] {cmd}")
2021-10-24 01:02:47 +02:00
ssh_opts = ["-A"] if self.forward_agent else []
cmd = (
["ssh", "-4", f"{self.user}@{self.host}", "-p", str(self.port)]
+ ssh_opts
+ ["--", cmd]
)
return self._run(cmd, shell=False, stdout=stdout, stderr=stderr)
2021-10-21 11:09:52 +02:00
DeployResults = List[Tuple[DeployHost, int]]
2021-10-24 01:02:47 +02:00
2021-10-21 11:09:52 +02:00
class DeployGroup:
def __init__(self, hosts: List[DeployHost]) -> None:
self.hosts = hosts
2021-10-24 01:02:47 +02:00
def _run_local(
self,
cmd: str,
host: DeployHost,
results: DeployResults,
stdout: FILE = None,
stderr: FILE = None,
) -> None:
results.append((host, host.run_local(cmd, stdout=stdout, stderr=stderr)))
2021-10-21 11:09:52 +02:00
2021-10-24 01:02:47 +02:00
def _run_remote(
self,
cmd: str,
host: DeployHost,
results: DeployResults,
stdout: FILE = None,
stderr: FILE = None,
) -> None:
results.append((host, host.run(cmd, stdout=stdout, stderr=stderr)))
2021-10-21 11:09:52 +02:00
def _run(
2021-10-24 01:02:47 +02:00
self, cmd: str, local: bool = False, stdout: FILE = None, stderr: FILE = None
2021-10-21 11:09:52 +02:00
) -> DeployResults:
results: DeployResults = []
threads = []
for host in self.hosts:
fn = self._run_local if local else self._run_remote
thread = Thread(
target=fn,
2021-10-24 01:02:47 +02:00
kwargs=dict(
results=results, cmd=cmd, host=host, stdout=stdout, stderr=stderr
),
2021-10-21 11:09:52 +02:00
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return results
2021-10-24 01:02:47 +02:00
def run(self, cmd: str, stdout: FILE = None, stderr: FILE = None) -> DeployResults:
return self._run(cmd, stdout=stdout, stderr=stderr)
2021-10-21 11:09:52 +02:00
2021-10-24 01:02:47 +02:00
def run_local(
self, cmd: str, stdout: FILE = None, stderr: FILE = None
) -> DeployResults:
return self._run(cmd, local=True, stdout=stdout, stderr=stderr)
2021-10-21 11:09:52 +02:00
def run_function(self, func: Callable) -> None:
threads = []
for host in self.hosts:
thread = Thread(
target=func,
args=(host,),
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()