build02/nixpkgs-update: replace fifo with supervisor service
This commit is contained in:
parent
8fd1e0819f
commit
20cd2e4226
3 changed files with 1309 additions and 26 deletions
hosts/build02
|
@ -14,12 +14,13 @@ let
|
|||
getent # used by hub
|
||||
cachix
|
||||
apacheHttpd # for rotatelogs, used by worker script
|
||||
socat # used by worker script
|
||||
];
|
||||
|
||||
nixpkgs-update-github-releases' = "${inputs.nixpkgs-update-github-releases}/main.py";
|
||||
|
||||
mkWorker = name: {
|
||||
after = [ "network-online.target" ];
|
||||
after = [ "network-online.target" "nixpkgs-update-supervisor.service" ];
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
description = "nixpkgs-update ${name} service";
|
||||
enable = true;
|
||||
|
@ -53,25 +54,53 @@ let
|
|||
exec > >(rotatelogs -eD "$LOGS_DIRECTORY"'/~workers/%Y-%m-%d-${name}.stdout.log' 86400)
|
||||
exec 2> >(rotatelogs -eD "$LOGS_DIRECTORY"'/~workers/%Y-%m-%d-${name}.stderr.log' 86400 >&2)
|
||||
|
||||
pipe=/var/lib/nixpkgs-update/fifo
|
||||
socket=/run/nixpkgs-update-supervisor/work.sock
|
||||
|
||||
if [[ ! -p $pipe ]]; then
|
||||
mkfifo $pipe || true
|
||||
fi
|
||||
function run-nixpkgs-update {
|
||||
exit_code=0
|
||||
set -x
|
||||
${nixpkgs-update-bin} update-batch --pr --outpaths --nixpkgs-review "$attr_path $payload" || exit_code=$?
|
||||
set +x
|
||||
msg="DONE $attr_path $exit_code"
|
||||
}
|
||||
|
||||
exec 8<$pipe
|
||||
while true
|
||||
do
|
||||
if read -u 8 line; then
|
||||
set -x
|
||||
${nixpkgs-update-bin} update-batch --pr --outpaths --nixpkgs-review "$line" || true
|
||||
set +x
|
||||
fi
|
||||
msg=READY
|
||||
while true; do
|
||||
response=$(echo "$msg" | socat UNIX-CONNECT:"$socket" - || true)
|
||||
case "$response" in
|
||||
"") # connection error; retry
|
||||
sleep 5
|
||||
;;
|
||||
NOJOBS)
|
||||
msg=READY
|
||||
sleep 60
|
||||
;;
|
||||
JOB\ *)
|
||||
read -r attr_path payload <<< "''${response#JOB }"
|
||||
# If one worker is initializing the nixpkgs clone, the other will
|
||||
# try to use the incomplete clone, consuming a bunch of jobs and
|
||||
# throwing them away. So we use a crude locking mechanism to
|
||||
# run only one worker when there isn't a nixpkgs directory yet.
|
||||
# Once the directory exists and this initial lock is released,
|
||||
# multiple workers can run concurrently.
|
||||
lockdir="$XDG_CACHE_HOME/.nixpkgs.lock"
|
||||
if [ ! -e "$XDG_CACHE_HOME/nixpkgs" ] && mkdir "$lockdir"; then
|
||||
trap 'rmdir "$lockdir"' EXIT
|
||||
run-nixpkgs-update
|
||||
rmdir "$lockdir"
|
||||
trap - EXIT
|
||||
continue
|
||||
fi
|
||||
while [ -e "$lockdir" ]; do
|
||||
sleep 10
|
||||
done
|
||||
run-nixpkgs-update
|
||||
esac
|
||||
done
|
||||
'';
|
||||
};
|
||||
|
||||
mkFetcher = cmd: {
|
||||
mkFetcher = name: cmd: {
|
||||
after = [ "network-online.target" ];
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
path = nixpkgsUpdateSystemDependencies;
|
||||
|
@ -86,21 +115,26 @@ let
|
|||
User = "r-ryantm";
|
||||
Group = "r-ryantm";
|
||||
Restart = "on-failure";
|
||||
RestartSec = "5s";
|
||||
WorkingDirectory = "/var/lib/nixpkgs-update/";
|
||||
RestartSec = "30m";
|
||||
LogsDirectory = "nixpkgs-update/";
|
||||
LogsDirectoryMode = "755";
|
||||
StateDirectory = "nixpkgs-update";
|
||||
StateDirectoryMode = "700";
|
||||
CacheDirectory = "nixpkgs-update/worker";
|
||||
CacheDirectoryMode = "700";
|
||||
};
|
||||
|
||||
script = ''
|
||||
pipe=/var/lib/nixpkgs-update/fifo
|
||||
|
||||
if [[ ! -p $pipe ]]; then
|
||||
mkfifo $pipe || true
|
||||
fi
|
||||
|
||||
${cmd} | sort -R > $pipe
|
||||
mkdir -p "$LOGS_DIRECTORY/~fetchers"
|
||||
cd "$LOGS_DIRECTORY/~fetchers"
|
||||
while true; do
|
||||
run_name="${name}.$(date +%s).txt"
|
||||
rm -f ${name}.*.txt.part
|
||||
${cmd} > "$run_name.part"
|
||||
rm -f ${name}.*.txt
|
||||
mv "$run_name.part" "$run_name"
|
||||
sleep 24h
|
||||
done
|
||||
'';
|
||||
};
|
||||
|
||||
|
@ -142,9 +176,9 @@ in
|
|||
script = "${nixpkgs-update-bin} delete-done --delete";
|
||||
};
|
||||
|
||||
systemd.services.nixpkgs-update-fetch-repology = mkFetcher "${nixpkgs-update-bin} fetch-repology";
|
||||
systemd.services.nixpkgs-update-fetch-updatescript = mkFetcher "${pkgs.nix}/bin/nix eval --raw -f ${./packages-with-update-script.nix}";
|
||||
systemd.services.nixpkgs-update-fetch-github = mkFetcher nixpkgs-update-github-releases';
|
||||
systemd.services.nixpkgs-update-fetch-repology = mkFetcher "repology" "${nixpkgs-update-bin} fetch-repology";
|
||||
systemd.services.nixpkgs-update-fetch-updatescript = mkFetcher "updatescript" "${pkgs.nix}/bin/nix eval --raw -f ${./packages-with-update-script.nix}";
|
||||
systemd.services.nixpkgs-update-fetch-github = mkFetcher "github" nixpkgs-update-github-releases';
|
||||
|
||||
systemd.services.nixpkgs-update-worker1 = mkWorker "worker1";
|
||||
systemd.services.nixpkgs-update-worker2 = mkWorker "worker2";
|
||||
|
@ -152,6 +186,39 @@ in
|
|||
#systemd.services.nixpkgs-update-worker3 = mkWorker "worker3";
|
||||
#systemd.services.nixpkgs-update-worker4 = mkWorker "worker4";
|
||||
|
||||
systemd.services.nixpkgs-update-supervisor = {
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
description = "nixpkgs-update supervisor service";
|
||||
enable = true;
|
||||
restartIfChanged = true;
|
||||
path = with pkgs; [
|
||||
apacheHttpd
|
||||
(python311.withPackages (ps: [ ps.asyncinotify ]))
|
||||
];
|
||||
|
||||
serviceConfig = {
|
||||
Type = "simple";
|
||||
User = "r-ryantm";
|
||||
Group = "r-ryantm";
|
||||
Restart = "on-failure";
|
||||
RestartSec = "5s";
|
||||
LogsDirectory = "nixpkgs-update/";
|
||||
LogsDirectoryMode = "755";
|
||||
RuntimeDirectory = "nixpkgs-update-supervisor/";
|
||||
RuntimeDirectoryMode = "755";
|
||||
StandardOutput = "journal";
|
||||
};
|
||||
|
||||
script = ''
|
||||
mkdir -p "$LOGS_DIRECTORY/~supervisor"
|
||||
# This is for public logs at https://r.ryantm.com/log/~supervisor
|
||||
exec > >(rotatelogs -eD "$LOGS_DIRECTORY"'/~supervisor/%Y-%m-%d.stdout.log' 86400)
|
||||
exec 2> >(rotatelogs -eD "$LOGS_DIRECTORY"'/~supervisor/%Y-%m-%d.stderr.log' 86400 >&2)
|
||||
# Fetcher output is hosted at https://r.ryantm.com/log/~fetchers
|
||||
python3.11 ${./supervisor.py} "$LOGS_DIRECTORY/~supervisor/state.db" "$LOGS_DIRECTORY/~fetchers" "$RUNTIME_DIRECTORY/work.sock"
|
||||
'';
|
||||
};
|
||||
|
||||
systemd.tmpfiles.rules = [
|
||||
"L+ /home/r-ryantm/.gitconfig - - - - ${./gitconfig.txt}"
|
||||
"d /home/r-ryantm/.ssh 700 r-ryantm r-ryantm - -"
|
||||
|
|
702
hosts/build02/supervisor.py
Normal file
702
hosts/build02/supervisor.py
Normal file
|
@ -0,0 +1,702 @@
|
|||
#!/usr/bin/env nix-shell
|
||||
#! nix-shell -i python3.11 -p python311 python311Packages.asyncinotify
|
||||
|
||||
r"""
|
||||
Usage: supervisor.py <database_file> <input_dir> <worker_socket>
|
||||
|
||||
This script has two responsibilities:
|
||||
|
||||
* Watch all files in a directory for tasks to give to workers, and
|
||||
store those tasks in a priority queue.
|
||||
|
||||
* Listen for requests from workers and hand out tasks.
|
||||
|
||||
The input directory is expected to contain files conforming to the
|
||||
following:
|
||||
|
||||
* The name of the file must match [^.]+\.[0-9]+\.txt(\.part)?
|
||||
|
||||
* The numeric part of the file name should be an integer that
|
||||
increases whenever the file is recreated (such as the current Unix
|
||||
time, but the supervisor doesn't require that).
|
||||
|
||||
* The .part suffix should be present if data is still being written
|
||||
to that file. When the file is complete, it should be renamed to
|
||||
lose the .part suffix.
|
||||
|
||||
* The file should consist of lines that look like
|
||||
|
||||
attribute_path old_ver new_ver optional_url
|
||||
|
||||
(though in reality, the supervisor only cares that there are at
|
||||
least two fields; everything past the first space is passed on
|
||||
to the workers without the supervisor getting involved).
|
||||
|
||||
These files are produced by fetcher processes, and are referred to
|
||||
as fetcher files or fetcher runs throughout this script.
|
||||
|
||||
Tasks are distributed to workers based on how recently a task
|
||||
corresponding to the same attribute path has been dispatched. Brand
|
||||
new attribute paths are given highest priority; attribute paths that
|
||||
have run most recently are given lowest priority.
|
||||
|
||||
The worker socket is a Unix stream socket that uses a line-based
|
||||
protocol for communication. The following commands are expected:
|
||||
|
||||
READY
|
||||
DONE <attribute_path> <exit_code>
|
||||
|
||||
The possible responses to either command are:
|
||||
|
||||
NOJOB
|
||||
JOB <attribute_path> <rest_of_line>
|
||||
|
||||
When a worker receives NOJOB, it is expected to wait some period of
|
||||
time before asking for jobs again (with READY).
|
||||
|
||||
When a worker receives JOB, it is expected to complete that job and
|
||||
then send a new DONE request to get the next job.
|
||||
|
||||
This code has a unit test suite in supervisor_test.py.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import functools
|
||||
import os
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
from collections.abc import Callable, Generator, Iterable
|
||||
from typing import (
|
||||
Concatenate,
|
||||
ParamSpec,
|
||||
Protocol,
|
||||
TextIO,
|
||||
TypeVar,
|
||||
overload,
|
||||
)
|
||||
|
||||
import asyncinotify
|
||||
|
||||
P = ParamSpec("P")
|
||||
S = TypeVar("S")
|
||||
T = TypeVar("T")
|
||||
A_co = TypeVar("A_co", covariant=True)
|
||||
B_co = TypeVar("B_co", covariant=True)
|
||||
StorageSelf = TypeVar("StorageSelf", bound="Storage")
|
||||
|
||||
|
||||
class Wrapped(Protocol[A_co, B_co]):
|
||||
"""Protocol for result of `functools.wraps`"""
|
||||
|
||||
@property
|
||||
def __wrapped__(self) -> A_co:
|
||||
...
|
||||
|
||||
@property
|
||||
def __call__(self) -> B_co:
|
||||
...
|
||||
|
||||
|
||||
CurMethod = Callable[Concatenate[S, sqlite3.Cursor, P], T]
|
||||
WrappedCurMethod = Wrapped[CurMethod[S, P, T], Callable[P, T]]
|
||||
|
||||
|
||||
READY = b"READY\n"
|
||||
DONE = b"DONE "
|
||||
JOB = b"JOB "
|
||||
NOJOBS = b"NOJOBS\n"
|
||||
|
||||
|
||||
class Storage:
|
||||
"""
|
||||
Abstracts over the database that stores the supervisor state.
|
||||
|
||||
This state comprises three tables: `fetcher_runs`, `queue`, and
|
||||
`log`. (A fourth table, `fetchers`, is just a simple string
|
||||
deduplication table for fetcher names.) The `fetcher_runs` table is
|
||||
a record of the distinct fetcher files captured in the database. The
|
||||
`log` table stores the times and exit code associated with the last
|
||||
job dispatched for each attribute path. The `queue` table is the
|
||||
most interesting; it stores the contents of the fetcher files along
|
||||
with a bit indicating if that item has been dispatched to a worker.
|
||||
|
||||
The `dequeue` operation has sub-linear performance only because the
|
||||
`queue_order_index` index on `queue` aligns with the query used by
|
||||
`dequeue`. This index relies on a denormalized column
|
||||
`last_started`, which is populated from `log`.`started` via the
|
||||
triggers `queue_insert_started`, `log_insert_started`, and
|
||||
`log_update_started`.
|
||||
"""
|
||||
|
||||
def __init__(self, conn: sqlite3.Connection):
|
||||
self._conn = conn
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _cursor_method(
|
||||
fun: CurMethod[StorageSelf, P, T]
|
||||
) -> WrappedCurMethod[StorageSelf, P, T]:
|
||||
...
|
||||
|
||||
@overload
|
||||
@staticmethod
|
||||
def _cursor_method(
|
||||
*,
|
||||
transaction: bool = False,
|
||||
) -> Callable[[CurMethod[StorageSelf, P, T]], WrappedCurMethod[StorageSelf, P, T]]:
|
||||
...
|
||||
|
||||
# NOTE: mypy <1.6 claims this implementation doesn't match the first
|
||||
# overload; it's wrong.
|
||||
@staticmethod
|
||||
def _cursor_method(
|
||||
fun: CurMethod[StorageSelf, P, T] | None = None,
|
||||
transaction: bool = False,
|
||||
) -> (
|
||||
WrappedCurMethod[StorageSelf, P, T]
|
||||
| Callable[[CurMethod[StorageSelf, P, T]], WrappedCurMethod[StorageSelf, P, T]]
|
||||
):
|
||||
def decorator(
|
||||
fun: CurMethod[StorageSelf, P, T]
|
||||
) -> WrappedCurMethod[StorageSelf, P, T]:
|
||||
if transaction:
|
||||
|
||||
def wrapper(self: StorageSelf, *args: P.args, **kwargs: P.kwargs) -> T:
|
||||
with self._conn:
|
||||
with contextlib.closing(self._conn.cursor()) as cur:
|
||||
cur.execute("BEGIN")
|
||||
return fun(self, cur, *args, **kwargs)
|
||||
|
||||
else:
|
||||
|
||||
def wrapper(self: StorageSelf, *args: P.args, **kwargs: P.kwargs) -> T:
|
||||
with contextlib.closing(self._conn.cursor()) as cur:
|
||||
return fun(self, cur, *args, **kwargs)
|
||||
|
||||
return functools.wraps(fun)(wrapper) # type: ignore
|
||||
|
||||
return decorator if fun is None else decorator(fun)
|
||||
|
||||
@_cursor_method
|
||||
def upgrade(self, cur: sqlite3.Cursor, version: int = 1) -> None:
|
||||
"""
|
||||
Create or update the database schema.
|
||||
|
||||
The database stores a version number to allow for future schema
|
||||
modifications. This function contains scripts for migrating to
|
||||
each version number from its immediate predecessor. Every script
|
||||
from the current database version to the target version is run.
|
||||
|
||||
For testing purposes, the optional `version` parameter can be
|
||||
used to create a database at an earlier version of the schema.
|
||||
|
||||
The `Storage` class documentation explains the current schema.
|
||||
"""
|
||||
|
||||
cur.execute("PRAGMA user_version")
|
||||
user_version = cur.fetchone()[0]
|
||||
if user_version < 1 <= version:
|
||||
# Here is as good a place as any: we're using backtick quotation
|
||||
# extensively in these statements as a way to protect against
|
||||
# having a token mistaken for a keyword. The standard SQL quotation
|
||||
# style for this purpose is double-quote, which SQLite supports,
|
||||
# but SQLite also interprets double-quoted identifiers as strings
|
||||
# if they are in a position where an identifier isn't expected;
|
||||
# this can suppress or obfuscate error messages when one has made a
|
||||
# syntax error. Backticks are nonstandard but used in MySQL and
|
||||
# supported by SQLite, and SQLite doesn't try to interpret
|
||||
# backtick-quoted tokens as anything other than identifiers.
|
||||
cur.executescript(
|
||||
"""
|
||||
BEGIN;
|
||||
|
||||
CREATE TABLE `fetchers` (
|
||||
`fetcher_id` `integer` PRIMARY KEY AUTOINCREMENT,
|
||||
`name` `text` NOT NULL UNIQUE
|
||||
) STRICT;
|
||||
|
||||
CREATE TABLE `fetcher_runs` (
|
||||
`fetcher_id` `integer` NOT NULL REFERENCES `fetchers` (`fetcher_id`),
|
||||
`run_started` `integer` NOT NULL,
|
||||
`is_complete` `integer` NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (`fetcher_id`, `run_started`)
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX `fetcher_run_started_index`
|
||||
ON `fetcher_runs` (`run_started`, `fetcher_id`);
|
||||
|
||||
CREATE TRIGGER `fetcher_runs_delete`
|
||||
AFTER DELETE
|
||||
ON `fetcher_runs`
|
||||
BEGIN
|
||||
DELETE FROM `fetchers`
|
||||
WHERE `fetcher_id` = OLD.`fetcher_id`
|
||||
AND (
|
||||
SELECT `fetcher_id`
|
||||
FROM `fetcher_runs`
|
||||
WHERE `fetcher_id` = OLD.`fetcher_id`
|
||||
) IS NULL;
|
||||
END;
|
||||
|
||||
CREATE TABLE `queue` (
|
||||
`fetcher_id` `integer` NOT NULL,
|
||||
`fetcher_run_started` `integer` NOT NULL,
|
||||
`attr_path` `text` NOT NULL,
|
||||
`payload` `text` NOT NULL,
|
||||
`is_dequeued` `integer` NOT NULL DEFAULT 0,
|
||||
`last_started` `integer`,
|
||||
PRIMARY KEY (`fetcher_run_started`, `fetcher_id`, `attr_path`),
|
||||
FOREIGN KEY (`fetcher_id`, `fetcher_run_started`)
|
||||
REFERENCES `fetcher_runs` (`fetcher_id`, `run_started`)
|
||||
ON DELETE CASCADE
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX `queue_order_index`
|
||||
ON `queue` (
|
||||
`last_started` ASC,
|
||||
`attr_path`,
|
||||
`fetcher_id`,
|
||||
`fetcher_run_started` DESC
|
||||
);
|
||||
|
||||
CREATE INDEX `queue_attr_path_index`
|
||||
ON `queue` (`attr_path` ASC);
|
||||
|
||||
CREATE TABLE `log` (
|
||||
`attr_path` `text` PRIMARY KEY,
|
||||
`started` `integer` NOT NULL,
|
||||
`finished` `integer`,
|
||||
`exit_code` `integer`
|
||||
) STRICT, WITHOUT ROWID;
|
||||
|
||||
CREATE TRIGGER `queue_insert_started`
|
||||
AFTER INSERT
|
||||
ON `queue`
|
||||
BEGIN
|
||||
UPDATE `queue` SET
|
||||
`last_started` = `started`
|
||||
FROM `log`
|
||||
WHERE
|
||||
`log`.`attr_path` = NEW.`attr_path` AND
|
||||
`queue`.`rowid` = NEW.`rowid`;
|
||||
END;
|
||||
|
||||
CREATE TRIGGER `log_insert_started`
|
||||
AFTER INSERT
|
||||
ON `log`
|
||||
BEGIN
|
||||
UPDATE `queue` SET
|
||||
`last_started` = NEW.`started`
|
||||
WHERE `queue`.`attr_path` = NEW.`attr_path`;
|
||||
END;
|
||||
|
||||
CREATE TRIGGER `log_update_started`
|
||||
AFTER UPDATE OF `started`
|
||||
ON `log`
|
||||
BEGIN
|
||||
UPDATE `queue` SET
|
||||
`last_started` = NEW.`started`
|
||||
WHERE `queue`.`attr_path` = NEW.`attr_path`;
|
||||
END;
|
||||
|
||||
COMMIT;
|
||||
"""
|
||||
)
|
||||
cur.execute(f"PRAGMA user_version = {version}")
|
||||
|
||||
@_cursor_method
|
||||
def get_fetcher_runs(self, cur: sqlite3.Cursor) -> dict[tuple[str, int], bool]:
|
||||
"""Return a set of fetcher runs known to the database."""
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT `name`, `run_started`, `is_complete`
|
||||
FROM `fetchers`
|
||||
JOIN `fetcher_runs` USING (`fetcher_id`)
|
||||
"""
|
||||
)
|
||||
return {(r[0], r[1]): r[2] for r in cur}
|
||||
|
||||
@_cursor_method
|
||||
def delete_fetcher_run(
|
||||
self, cur: sqlite3.Cursor, fetcher: str, run_started: int
|
||||
) -> None:
|
||||
"""Delete a fetcher run and all of its queue items."""
|
||||
cur.execute(
|
||||
"""
|
||||
DELETE FROM `fetcher_runs`
|
||||
WHERE `fetcher_id` = (SELECT `fetcher_id` FROM `fetchers` WHERE `name` = ?)
|
||||
AND `run_started` = ?
|
||||
""",
|
||||
(fetcher, run_started),
|
||||
)
|
||||
|
||||
@_cursor_method(transaction=True)
|
||||
def delete_fetcher_runs(
|
||||
self, cur: sqlite3.Cursor, fetchers: Iterable[tuple[str, int]]
|
||||
) -> None:
|
||||
"""Delete multiple fetcher runs and their queue items."""
|
||||
for fetcher, run_started in fetchers:
|
||||
self.delete_fetcher_run.__wrapped__(self, cur, fetcher, run_started)
|
||||
|
||||
@_cursor_method(transaction=True)
|
||||
def upsert_fetcher_run(
|
||||
self,
|
||||
cur: sqlite3.Cursor,
|
||||
fetcher: str,
|
||||
run_started: int,
|
||||
is_complete: bool,
|
||||
) -> None:
|
||||
"""Add or update a fetcher."""
|
||||
cur.execute("INSERT OR IGNORE INTO `fetchers` (`name`) VALUES (?)", (fetcher,))
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO `fetcher_runs` (`fetcher_id`, `run_started`, `is_complete`)
|
||||
VALUES ((SELECT `fetcher_id` FROM `fetchers` WHERE `name` = ?), ?, ?)
|
||||
ON CONFLICT DO UPDATE SET `is_complete` = excluded.`is_complete`
|
||||
""",
|
||||
(fetcher, run_started, is_complete),
|
||||
)
|
||||
|
||||
@_cursor_method(transaction=True)
|
||||
def enqueue(
|
||||
self,
|
||||
cur: sqlite3.Cursor,
|
||||
fetcher: str,
|
||||
run_started: int,
|
||||
entries: list[tuple[str, str]],
|
||||
) -> None:
|
||||
"""
|
||||
Add entries for a given fetcher to the queue.
|
||||
|
||||
The same attribute paths can appear multiple times in the queue
|
||||
with different payloads, but only once per fetcher run. Fetcher
|
||||
files shouldn't contain more than one line for a given attribute
|
||||
path, but if they do, the later line overwrites the earlier one.
|
||||
"""
|
||||
cur.executemany(
|
||||
"""
|
||||
INSERT INTO `queue` (`fetcher_id`, `fetcher_run_started`, `attr_path`, `payload`)
|
||||
SELECT `fetcher_id`, ?, ?, ?
|
||||
FROM `fetchers`
|
||||
WHERE `name` = ?
|
||||
ON CONFLICT DO UPDATE SET `payload` = excluded.`payload`
|
||||
""",
|
||||
((run_started, a, p, fetcher) for a, p in entries),
|
||||
)
|
||||
|
||||
@_cursor_method(transaction=True)
|
||||
def dequeue(self, cur: sqlite3.Cursor, start_time: int) -> tuple[str, str] | None:
|
||||
"""
|
||||
Pull one entry from the top of the queue.
|
||||
|
||||
Returns a tuple (attribute path, payload), or None if nothing is
|
||||
currently available in the queue. If an entry is dequeued, a log
|
||||
record for this entry will be marked as started as of
|
||||
`start_time`.
|
||||
|
||||
Most of the time, if a job for an attribute path was started but
|
||||
has not yet finished, any queue entries for that same path will
|
||||
be skipped. However, in case a worker dies or fails to report
|
||||
back, after 12 hours such entries are eligible again.
|
||||
`start_time` is used to determine if this 12-hour exclusion
|
||||
period has ended. (These details are only likely to be relevant
|
||||
when the queue is very small, like at the beginning or the end
|
||||
of a run.)
|
||||
"""
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
`fetcher_id`,
|
||||
`attr_path`,
|
||||
`payload`
|
||||
FROM `queue`
|
||||
LEFT JOIN `log` USING (`attr_path`)
|
||||
GROUP BY `last_started`, `attr_path`, `fetcher_id`
|
||||
HAVING `is_dequeued` = 0
|
||||
AND (`started` IS NULL OR `finished` IS NOT NULL OR `started` + 43200 < ?)
|
||||
AND (TRUE OR `max`(`fetcher_run_started`))
|
||||
ORDER BY `last_started` ASC, `attr_path`, `fetcher_id`
|
||||
LIMIT 1
|
||||
""",
|
||||
(start_time,),
|
||||
)
|
||||
# NOTE: The `max` call in the above query triggers a nonstandard SQLite
|
||||
# behavior; see <https://www.sqlite.org/lang_select.html#bareagg>.
|
||||
# This behavior is critical to the correctness of this query. We don't
|
||||
# actually need the value of `max`(), though, so we tuck it into the
|
||||
# HAVING clause in a position where it can't have any other effect.
|
||||
|
||||
row: tuple[int, str, str] | None = cur.fetchone()
|
||||
result = None
|
||||
if row is not None:
|
||||
fetcher_id, attr_path, payload = row
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE `queue` SET `is_dequeued` = 1
|
||||
WHERE `fetcher_id` = ? AND `attr_path` = ?
|
||||
""",
|
||||
(fetcher_id, attr_path),
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO `log` (`attr_path`, `started`) VALUES (?, ?)
|
||||
ON CONFLICT DO UPDATE SET
|
||||
`started` = excluded.`started`,
|
||||
`finished` = NULL,
|
||||
`exit_code` = NULL
|
||||
""",
|
||||
(attr_path, start_time),
|
||||
)
|
||||
result = attr_path, payload
|
||||
return result
|
||||
|
||||
@_cursor_method
|
||||
def finish(
|
||||
self, cur: sqlite3.Cursor, attr_path: str, finish_time: int, exit_code: int
|
||||
) -> None:
|
||||
"""Log the completion of a dequeued entry."""
|
||||
cur.execute(
|
||||
"UPDATE `log` SET `finished` = ?, `exit_code` = ? WHERE `attr_path` = ?",
|
||||
(finish_time, exit_code, attr_path),
|
||||
)
|
||||
|
||||
|
||||
async def listen_for_workers(storage: Storage, socket_path: pathlib.Path) -> None:
|
||||
"""Open a Unix stream socket and handle requests from workers."""
|
||||
|
||||
async def worker_connected(
|
||||
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
) -> None:
|
||||
try:
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if line == b"":
|
||||
break
|
||||
now = int(time.time())
|
||||
do_dequeue = False
|
||||
if line == READY:
|
||||
do_dequeue = True
|
||||
elif line.startswith(DONE):
|
||||
parts = line.split(b" ")
|
||||
attr_path = parts[1].decode()
|
||||
exit_code = int(parts[2])
|
||||
storage.finish(attr_path, now, exit_code)
|
||||
do_dequeue = True
|
||||
else:
|
||||
print(f"Unexpected command from worker: {line!r}")
|
||||
break
|
||||
|
||||
if do_dequeue:
|
||||
entry = storage.dequeue(now)
|
||||
if entry:
|
||||
writer.write(
|
||||
b"".join(
|
||||
[
|
||||
JOB,
|
||||
entry[0].encode(),
|
||||
b" ",
|
||||
entry[1].encode(),
|
||||
b"\n",
|
||||
]
|
||||
)
|
||||
)
|
||||
else:
|
||||
writer.write(NOJOBS)
|
||||
await writer.drain()
|
||||
finally:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
server = await asyncio.start_unix_server(worker_connected, socket_path)
|
||||
await server.serve_forever()
|
||||
|
||||
|
||||
class FetcherDataWatcher:
|
||||
"""
|
||||
Monitors a directory containing fetcher files and syncs them to
|
||||
storage.
|
||||
"""
|
||||
|
||||
_dir_events = asyncinotify.Mask.CREATE | asyncinotify.Mask.DELETE
|
||||
_file_events = asyncinotify.Mask.MODIFY | asyncinotify.Mask.MOVE_SELF
|
||||
|
||||
class _FileDeleted(Exception):
|
||||
"""A fetcher file was deleted."""
|
||||
|
||||
class _FileMoved(Exception):
|
||||
"""A fetcher file was moved."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage: Storage,
|
||||
fetcher_data_path: pathlib.Path,
|
||||
inotify: asyncinotify.Inotify,
|
||||
) -> None:
|
||||
self._storage = storage
|
||||
self._fetcher_data_path = fetcher_data_path
|
||||
self._inotify = inotify
|
||||
self._gtors: dict[tuple[str, int], Generator[None, None, None]] = {}
|
||||
|
||||
async def watch(self) -> None:
|
||||
"""Start the watcher."""
|
||||
|
||||
self._inotify.add_watch(self._fetcher_data_path, self._dir_events)
|
||||
|
||||
try:
|
||||
known_fetcher_runs = self._storage.get_fetcher_runs()
|
||||
for path in self._fetcher_data_path.iterdir():
|
||||
if (that := self._parse_fetcher_filename(path.name)) is None:
|
||||
continue
|
||||
name, run_started, is_complete = that
|
||||
if not known_fetcher_runs.pop((name, run_started), False):
|
||||
self._on_fetcher(path, name, run_started, is_complete)
|
||||
self._storage.delete_fetcher_runs(known_fetcher_runs.keys())
|
||||
|
||||
async for event in self._inotify:
|
||||
if event.path is None:
|
||||
continue
|
||||
if (that := self._parse_fetcher_filename(event.path.name)) is None:
|
||||
continue
|
||||
name, run_started, is_complete = that
|
||||
with contextlib.suppress(KeyError):
|
||||
match event.mask:
|
||||
case asyncinotify.Mask.CREATE:
|
||||
self._on_fetcher(event.path, name, run_started, is_complete)
|
||||
case asyncinotify.Mask.DELETE:
|
||||
if not is_complete:
|
||||
self._close_fetcher(
|
||||
name, run_started, self._FileDeleted()
|
||||
)
|
||||
self._storage.delete_fetcher_run(name, run_started)
|
||||
case asyncinotify.Mask.MODIFY:
|
||||
self._gtors[(name, run_started)].send(None)
|
||||
case asyncinotify.Mask.MOVE_SELF:
|
||||
self._close_fetcher(name, run_started, self._FileMoved())
|
||||
finally:
|
||||
with contextlib.suppress(KeyError):
|
||||
while True:
|
||||
self._gtors.popitem()[1].close()
|
||||
|
||||
def _on_fetcher(
|
||||
self,
|
||||
path: pathlib.Path,
|
||||
name: str,
|
||||
run_started: int,
|
||||
is_complete: bool,
|
||||
) -> None:
|
||||
watch = None
|
||||
try:
|
||||
if not is_complete:
|
||||
watch = self._inotify.add_watch(path, self._file_events)
|
||||
file = path.open(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return
|
||||
self._storage.upsert_fetcher_run(name, run_started, is_complete)
|
||||
gtor = self._read_fetcher_file(file, watch, name, run_started)
|
||||
gtor.send(None)
|
||||
if is_complete:
|
||||
gtor.close()
|
||||
else:
|
||||
self._gtors[(name, run_started)] = gtor
|
||||
|
||||
def _close_fetcher(self, name: str, run_started: int, ex: Exception) -> None:
|
||||
with contextlib.suppress(StopIteration):
|
||||
self._gtors.pop((name, run_started)).throw(ex)
|
||||
|
||||
def _parse_fetcher_filename(self, name: str) -> tuple[str, int, bool] | None:
|
||||
match name.split("."):
|
||||
case [stem, run_started, "txt", "part"]:
|
||||
return stem, int(run_started), False
|
||||
case [stem, run_started, "txt"]:
|
||||
return stem, int(run_started), True
|
||||
return None
|
||||
|
||||
def _read_fetcher_file(
|
||||
self,
|
||||
file: TextIO,
|
||||
watch: asyncinotify.Watch | None,
|
||||
name: str,
|
||||
run_started: int,
|
||||
) -> Generator[None, None, None]:
|
||||
with file:
|
||||
try:
|
||||
while True:
|
||||
self._storage.enqueue(
|
||||
name,
|
||||
run_started,
|
||||
(yield from self._read_fetcher_lines(file)),
|
||||
)
|
||||
except self._FileDeleted:
|
||||
pass
|
||||
except self._FileMoved:
|
||||
try:
|
||||
target_path_stat = (
|
||||
self._fetcher_data_path / f"{name}.{run_started}.txt"
|
||||
).stat()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
if target_path_stat.st_ino == os.stat(file.fileno()).st_ino:
|
||||
for entries in self._read_fetcher_lines(file):
|
||||
if entries is not None:
|
||||
self._storage.enqueue(name, run_started, entries)
|
||||
break
|
||||
self._storage.upsert_fetcher_run(name, run_started, True)
|
||||
assert watch is not None
|
||||
self._inotify.rm_watch(watch)
|
||||
return
|
||||
self._storage.delete_fetcher_run(name, run_started)
|
||||
|
||||
def _read_fetcher_lines(
|
||||
self, file: TextIO
|
||||
) -> Generator[None, None, list[tuple[str, str]]]:
|
||||
"""
|
||||
Read all available complete lines from an open fetcher file.
|
||||
|
||||
This is a generator, but not one that yields each line. It will
|
||||
*return* all lines as a non-empty list. If no complete lines are
|
||||
available, however, it will yield. Calling code should reenter
|
||||
the generator when more content becomes available, or use `yield
|
||||
from` to pass that responsibility outward.
|
||||
"""
|
||||
entries: list[tuple[str, str]] = []
|
||||
while True:
|
||||
cookie = file.tell()
|
||||
line = file.readline()
|
||||
if line == "" or line[-1] != "\n":
|
||||
file.seek(cookie)
|
||||
if entries:
|
||||
return entries
|
||||
yield
|
||||
continue
|
||||
match line.strip().split(" ", maxsplit=1):
|
||||
case [attr_path, payload]:
|
||||
entries.append((attr_path, payload))
|
||||
case _:
|
||||
print(f"Unexpected line in {file.name!r}: {line!r}")
|
||||
|
||||
|
||||
async def main(
|
||||
db_path: pathlib.Path,
|
||||
fetcher_data_path: pathlib.Path,
|
||||
socket_path: pathlib.Path,
|
||||
) -> None:
|
||||
"""Run all supervisor responsibilities."""
|
||||
fetcher_data_path.mkdir(parents=True, exist_ok=True)
|
||||
with contextlib.closing(sqlite3.connect(db_path, isolation_level=None)) as conn:
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
storage = Storage(conn)
|
||||
storage.upgrade()
|
||||
with asyncinotify.Inotify() as inotify:
|
||||
watcher = FetcherDataWatcher(storage, fetcher_data_path, inotify)
|
||||
await asyncio.gather(
|
||||
listen_for_workers(storage, socket_path),
|
||||
watcher.watch(),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(*[pathlib.Path(arg) for arg in sys.argv[1:4]]))
|
514
hosts/build02/supervisor_test.py
Executable file
514
hosts/build02/supervisor_test.py
Executable file
|
@ -0,0 +1,514 @@
|
|||
#!/usr/bin/env nix-shell
|
||||
#! nix-shell -i python3.11 -p python311 python311Packages.asyncinotify
|
||||
|
||||
"""
|
||||
In-process unit and integration tests for supervisor.py. Uses temporary
|
||||
directories and memory for all state; cleans up after itself; has no
|
||||
dependencies beyond what's in the nix-shell directive above and
|
||||
supervisor.py itself. Run with confidence.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import pathlib
|
||||
import shutil
|
||||
import socket
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from threading import Thread
|
||||
|
||||
import supervisor
|
||||
|
||||
|
||||
def mkdtemp():
|
||||
return pathlib.Path(
|
||||
tempfile.mkdtemp(prefix="nix-community-infra-build02-supervisor-test-")
|
||||
)
|
||||
|
||||
|
||||
def tick():
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
class UsesFetchers:
|
||||
def fetcher(self, name):
|
||||
return self.fetchers_path / name
|
||||
|
||||
def write_fetcher(self, name, text):
|
||||
with self.fetcher(name).open("a") as f:
|
||||
f.write(text)
|
||||
f.flush()
|
||||
|
||||
def finish_fetcher(self, name):
|
||||
self.fetcher(name).rename(self.fetcher(name.removesuffix(".part")))
|
||||
|
||||
|
||||
class UsesDatabase:
|
||||
def connect(self):
|
||||
return contextlib.closing(sqlite3.connect(self.db_path, isolation_level=None))
|
||||
|
||||
|
||||
class StorageTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.conn = sqlite3.connect(":memory:", isolation_level=None)
|
||||
self.enterContext(contextlib.closing(self.conn))
|
||||
self.conn.execute("PRAGMA foreign_keys = ON")
|
||||
self.storage = supervisor.Storage(self.conn)
|
||||
self.storage.upgrade()
|
||||
|
||||
def execute(self, *args):
|
||||
return self.conn.execute(*args)
|
||||
|
||||
def assertDBContents(self, query, expected):
|
||||
self.assertEqual(set(self.execute(query).fetchall()), expected)
|
||||
|
||||
def test_upsert_same_fetcher(self):
|
||||
self.storage.upsert_fetcher_run("f1", 100, False)
|
||||
self.storage.enqueue("f1", 100, [("alpha", "0 1")])
|
||||
self.storage.upsert_fetcher_run("f1", 100, True)
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` from `queue`",
|
||||
{(1, "alpha", "0 1")},
|
||||
)
|
||||
|
||||
def test_upsert_new_fetcher_generation(self):
|
||||
self.storage.upsert_fetcher_run("f1", 100, False)
|
||||
self.storage.enqueue("f1", 100, [("alpha", "0.1 1.0")])
|
||||
self.storage.upsert_fetcher_run("f1", 101, False)
|
||||
self.storage.enqueue("f1", 101, [("alpha", "0.1 1.1")])
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `fetcher_run_started`, `attr_path`, `payload` from `queue`",
|
||||
{(1, 100, "alpha", "0.1 1.0"), (1, 101, "alpha", "0.1 1.1")},
|
||||
)
|
||||
|
||||
def test_queue_insert_started(self):
|
||||
self.execute(
|
||||
"""
|
||||
INSERT INTO `log` (`attr_path`, `started`)
|
||||
VALUES
|
||||
('alpha', 103),
|
||||
('charlie', 100)
|
||||
"""
|
||||
)
|
||||
|
||||
self.storage.upsert_fetcher_run("f1", 101, False)
|
||||
self.storage.upsert_fetcher_run("f2", 102, False)
|
||||
self.storage.enqueue("f1", 101, [("alpha", "0 1"), ("bravo", "0 1")])
|
||||
self.storage.enqueue("f2", 102, [("alpha", "1.2.3 1.2.4")])
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path`, `payload`, `last_started` FROM `queue`",
|
||||
{
|
||||
("alpha", "0 1", 103),
|
||||
("alpha", "1.2.3 1.2.4", 103),
|
||||
("bravo", "0 1", None),
|
||||
},
|
||||
)
|
||||
|
||||
def test_log_insert_started(self):
|
||||
self.storage.upsert_fetcher_run("f1", 100, False)
|
||||
self.storage.enqueue("f1", 100, [("alpha", "0 1"), ("bravo", "0 1")])
|
||||
|
||||
self.execute("INSERT INTO `log` (`attr_path`, `started`) VALUES ('alpha', 101)")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path`, `payload`, `last_started` FROM `queue`",
|
||||
{("alpha", "0 1", 101), ("bravo", "0 1", None)},
|
||||
)
|
||||
|
||||
def test_log_update_started(self):
|
||||
self.execute("INSERT INTO `log` (`attr_path`, `started`) VALUES ('alpha', 100)")
|
||||
|
||||
self.storage.upsert_fetcher_run("f1", 101, False)
|
||||
self.storage.upsert_fetcher_run("f2", 102, False)
|
||||
self.storage.enqueue("f1", 101, [("alpha", "0 1"), ("bravo", "0 1")])
|
||||
self.storage.enqueue("f2", 102, [("alpha", "1.2.3 1.2.4")])
|
||||
|
||||
self.execute("UPDATE `log` SET `started` = 103 WHERE `attr_path` == 'alpha'")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path`, `payload`, `last_started` FROM `queue`",
|
||||
{
|
||||
("alpha", "0 1", 103),
|
||||
("alpha", "1.2.3 1.2.4", 103),
|
||||
("bravo", "0 1", None),
|
||||
},
|
||||
)
|
||||
|
||||
def test_delete_fetcher_run_cleans_queue(self):
|
||||
self.storage.upsert_fetcher_run("f1", 100, False)
|
||||
self.storage.upsert_fetcher_run("f2", 101, False)
|
||||
self.storage.enqueue("f1", 100, [("alpha", "0 1")])
|
||||
self.storage.enqueue(
|
||||
"f2", 101, [("alpha", "1.2.3 1.2.4"), ("bravo", "0.1 0.1.1")]
|
||||
)
|
||||
self.storage.enqueue("f1", 100, [("charlie", "0 1")])
|
||||
|
||||
self.storage.delete_fetcher_run("f1", 100)
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `fetcher_run_started`, `attr_path`, `payload` FROM `queue`",
|
||||
{(2, 101, "alpha", "1.2.3 1.2.4"), (2, 101, "bravo", "0.1 0.1.1")},
|
||||
)
|
||||
|
||||
def test_dequeue_and_finish(self):
|
||||
self.storage.upsert_fetcher_run("f1", 100, False)
|
||||
self.storage.upsert_fetcher_run("f2", 101, False)
|
||||
self.storage.enqueue("f1", 100, [("alpha", "0 1"), ("bravo", "0 1")])
|
||||
self.storage.enqueue(
|
||||
"f2", 101, [("alpha", "1.2.3 1.2.4"), ("charlie", "0.1 0.1.1")]
|
||||
)
|
||||
|
||||
dequeued = {
|
||||
self.storage.dequeue(102)[0],
|
||||
self.storage.dequeue(103)[0],
|
||||
self.storage.dequeue(104)[0],
|
||||
}
|
||||
self.assertEqual(dequeued, {"alpha", "bravo", "charlie"})
|
||||
self.assertEqual(
|
||||
self.storage.dequeue(105), None
|
||||
) # alpha is excluded because it's in flight
|
||||
self.storage.finish("alpha", 105, 0)
|
||||
self.assertEqual(self.storage.dequeue(105)[0], "alpha")
|
||||
self.assertEqual(self.storage.dequeue(106), None) # queue is truly empty
|
||||
|
||||
self.storage.delete_fetcher_run("f2", 101)
|
||||
self.storage.upsert_fetcher_run("f2", 106, False)
|
||||
self.storage.enqueue(
|
||||
"f2", 106, [("alpha", "1.2.3 1.2.5"), ("bravo", "0.25 0.27")]
|
||||
)
|
||||
self.storage.enqueue("f1", 100, [("delta", "0 1")])
|
||||
|
||||
self.assertEqual(self.storage.dequeue(107), ("delta", "0 1"))
|
||||
self.assertEqual(self.storage.dequeue(108), None) # bravo is excluded
|
||||
self.storage.finish("bravo", 108, 0)
|
||||
self.storage.finish("charlie", 108, 0)
|
||||
self.storage.finish("delta", 108, 0)
|
||||
self.assertEqual(self.storage.dequeue(108), ("bravo", "0.25 0.27"))
|
||||
self.storage.finish("alpha", 109, 0)
|
||||
self.assertEqual(self.storage.dequeue(109), ("alpha", "1.2.3 1.2.5"))
|
||||
self.assertEqual(self.storage.dequeue(110), None)
|
||||
|
||||
self.storage.upsert_fetcher_run("f2", 106, True)
|
||||
self.storage.upsert_fetcher_run("f2", 110, False)
|
||||
self.storage.enqueue(
|
||||
"f2",
|
||||
110,
|
||||
[("alpha", "1.2.3 1.2.5"), ("bravo", "0.25 0.27"), ("charlie", "0.1 0.2")],
|
||||
)
|
||||
|
||||
self.assertEqual(self.storage.dequeue(111), ("charlie", "0.1 0.2"))
|
||||
self.assertEqual(self.storage.dequeue(112), None)
|
||||
|
||||
def test_exclusion_period(self):
|
||||
self.storage.upsert_fetcher_run("f1", 10000, False)
|
||||
self.storage.upsert_fetcher_run("f2", 10000, False)
|
||||
self.storage.enqueue("f1", 10000, [("alpha", "0 1")])
|
||||
self.storage.enqueue("f2", 10000, [("alpha", "1.0 1.1")])
|
||||
|
||||
payloads = set()
|
||||
dequeued = self.storage.dequeue(10000)
|
||||
self.assertEqual(dequeued[0], "alpha")
|
||||
payloads.add(dequeued[1])
|
||||
self.assertEqual(self.storage.dequeue(10000), None)
|
||||
# Even though alpha hasn't finished, after enough time let the other
|
||||
# alpha task run anyway.
|
||||
dequeued = self.storage.dequeue(60000)
|
||||
self.assertEqual(dequeued[0], "alpha")
|
||||
payloads.add(dequeued[1])
|
||||
self.assertEqual(payloads, {"0 1", "1.0 1.1"})
|
||||
|
||||
def test_continue_old_fetcher(self):
|
||||
self.execute(
|
||||
"""
|
||||
INSERT INTO `log` (`attr_path`, `started`, `finished`, `exit_code`)
|
||||
VALUES
|
||||
('alpha', 103, 105, 0),
|
||||
('bravo', 101, 106, 0),
|
||||
('charlie', 102, 107, 0)
|
||||
"""
|
||||
)
|
||||
self.storage.upsert_fetcher_run("f1", 110, False)
|
||||
self.storage.enqueue(
|
||||
"f1",
|
||||
110,
|
||||
[
|
||||
("alpha", "0.1 0.2"),
|
||||
("bravo", "0.1 0.2"),
|
||||
("charlie", "0.1 0.2"),
|
||||
("delta", "0.1 0.2"),
|
||||
],
|
||||
)
|
||||
self.assertEqual(self.storage.dequeue(111), ("delta", "0.1 0.2"))
|
||||
self.storage.finish("delta", 111, 0)
|
||||
self.assertEqual(self.storage.dequeue(112), ("bravo", "0.1 0.2"))
|
||||
self.storage.finish("bravo", 111, 0)
|
||||
self.storage.upsert_fetcher_run("f1", 113, False)
|
||||
self.storage.enqueue(
|
||||
"f1",
|
||||
113,
|
||||
[
|
||||
("alpha", "0.1 0.3"),
|
||||
("bravo", "0.1 0.3"),
|
||||
("delta", "0.1 0.3"),
|
||||
],
|
||||
)
|
||||
self.assertEqual(self.storage.dequeue(114), ("charlie", "0.1 0.2"))
|
||||
self.storage.finish("charlie", 114, 0)
|
||||
self.assertEqual(self.storage.dequeue(115), ("alpha", "0.1 0.3"))
|
||||
self.storage.finish("alpha", 115, 0)
|
||||
self.assertEqual(self.storage.dequeue(116), ("delta", "0.1 0.3"))
|
||||
self.storage.finish("delta", 116, 0)
|
||||
self.assertEqual(self.storage.dequeue(117), ("bravo", "0.1 0.3"))
|
||||
self.storage.finish("bravo", 117, 0)
|
||||
self.assertEqual(self.storage.dequeue(118), None)
|
||||
|
||||
|
||||
class SupervisorTestCase(unittest.TestCase, UsesDatabase, UsesFetchers):
|
||||
def setUp(self):
|
||||
self.playground = mkdtemp()
|
||||
self.fetchers_path = self.playground / "~fetchers"
|
||||
self.db_path = self.playground / "state.db"
|
||||
self.socket_path = self.playground / "work.sock"
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.playground)
|
||||
|
||||
def assertDBContents(self, query, expected):
|
||||
with self.connect() as conn:
|
||||
self.assertEqual(set(conn.execute(query).fetchall()), expected)
|
||||
|
||||
def worker_request(self, msg):
|
||||
with socket.socket(socket.AF_UNIX) as sock:
|
||||
sock.settimeout(1)
|
||||
sock.connect(str(self.socket_path))
|
||||
sock.send(msg)
|
||||
sock.shutdown(socket.SHUT_WR)
|
||||
return sock.recv(4096)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def supervisor(self):
|
||||
with contextlib.closing(asyncio.new_event_loop()) as event_loop:
|
||||
|
||||
def thread_target():
|
||||
asyncio.set_event_loop(event_loop)
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
event_loop.run_until_complete(
|
||||
supervisor.main(
|
||||
self.db_path, self.fetchers_path, self.socket_path
|
||||
)
|
||||
)
|
||||
|
||||
supervisor_thread = Thread(target=thread_target)
|
||||
supervisor_thread.start()
|
||||
|
||||
def cancel_all():
|
||||
for task in asyncio.all_tasks():
|
||||
task.cancel()
|
||||
|
||||
try:
|
||||
tick()
|
||||
yield
|
||||
tick()
|
||||
finally:
|
||||
event_loop.call_soon_threadsafe(cancel_all)
|
||||
supervisor_thread.join()
|
||||
|
||||
def test_enqueue_from_nothing(self):
|
||||
with self.supervisor():
|
||||
self.write_fetcher("f1.100.txt.part", "alpha 0 1\n")
|
||||
tick()
|
||||
self.write_fetcher("f1.100.txt.part", "bravo 0 1\n")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1"), (1, "bravo", "0 1")},
|
||||
)
|
||||
|
||||
def test_enqueue_from_existing_files(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher("f1.100.txt", "alpha 0 1\n")
|
||||
with self.supervisor():
|
||||
pass
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1")},
|
||||
)
|
||||
|
||||
def test_delete_existing_files(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher("f1.100.txt", "alpha 0 1\n")
|
||||
with self.supervisor():
|
||||
self.fetcher("f1.100.txt").unlink()
|
||||
|
||||
self.assertDBContents("SELECT * FROM `queue`", set())
|
||||
|
||||
def test_append_existing_files(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher("f1.100.txt.part", "alpha 0 1\n")
|
||||
with self.supervisor():
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1")},
|
||||
)
|
||||
self.write_fetcher("f1.100.txt.part", "bravo 0 1\n")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1"), (1, "bravo", "0 1")},
|
||||
)
|
||||
|
||||
def test_replace_existing_files(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher("f1.100.txt.part", "alpha 0 1\n")
|
||||
self.write_fetcher("f2.101.txt.part", "bravo 0 1\n")
|
||||
with self.supervisor():
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path` FROM `queue`", {("alpha",), ("bravo",)}
|
||||
)
|
||||
self.finish_fetcher("f1.100.txt.part")
|
||||
self.write_fetcher("f1.102.txt.part", "charlie 0 1\n")
|
||||
tick()
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path` FROM `queue`",
|
||||
{("alpha",), ("bravo",), ("charlie",)},
|
||||
)
|
||||
|
||||
self.fetcher("f1.100.txt").unlink()
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path` FROM `queue`", {("bravo",), ("charlie",)}
|
||||
)
|
||||
|
||||
def test_append_partial_chunks(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher("f1.100.txt.part", "al")
|
||||
with self.supervisor():
|
||||
self.write_fetcher("f1.100.txt.part", "pha 0 1\n")
|
||||
tick()
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1")},
|
||||
)
|
||||
self.write_fetcher("f1.100.txt.part", "bra")
|
||||
tick()
|
||||
self.write_fetcher("f1.100.txt.part", "vo ")
|
||||
tick()
|
||||
self.write_fetcher("f1.100.txt.part", "0 1")
|
||||
tick()
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1")},
|
||||
)
|
||||
self.write_fetcher("f1.100.txt.part", "\n")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1"), (1, "bravo", "0 1")},
|
||||
)
|
||||
|
||||
def test_delete_between_runs(self):
|
||||
with self.supervisor():
|
||||
self.write_fetcher("f1.100.txt", "alpha 0 1\n")
|
||||
self.write_fetcher("f2.101.txt", "bravo 0 1\n")
|
||||
|
||||
self.fetcher("f1.100.txt").unlink()
|
||||
|
||||
with self.supervisor():
|
||||
pass
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(2, "bravo", "0 1")},
|
||||
)
|
||||
|
||||
def test_replace_between_runs(self):
|
||||
with self.supervisor():
|
||||
self.write_fetcher("f1.100.txt", "alpha 0 1\n")
|
||||
tick()
|
||||
self.write_fetcher("f2.101.txt", "bravo 0 1\n")
|
||||
|
||||
self.fetcher("f1.100.txt").unlink()
|
||||
self.write_fetcher("f1.102.txt", "charlie 0 1\n")
|
||||
|
||||
with self.supervisor():
|
||||
pass
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `fetcher_run_started`, `attr_path`, `payload` FROM `queue`",
|
||||
{(2, 101, "bravo", "0 1"), (1, 102, "charlie", "0 1")},
|
||||
)
|
||||
|
||||
def test_append_between_runs(self):
|
||||
with self.supervisor():
|
||||
self.write_fetcher("f1.100.txt.part", "alpha 0 1\n")
|
||||
|
||||
self.write_fetcher("f1.100.txt.part", "bravo 0 1\n")
|
||||
|
||||
with self.supervisor():
|
||||
pass
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `fetcher_id`, `attr_path`, `payload` FROM `queue`",
|
||||
{(1, "alpha", "0 1"), (1, "bravo", "0 1")},
|
||||
)
|
||||
|
||||
def test_worker_empty(self):
|
||||
with self.supervisor():
|
||||
msg = self.worker_request(supervisor.READY)
|
||||
self.assertEqual(msg, supervisor.NOJOBS)
|
||||
|
||||
def test_worker(self):
|
||||
self.fetchers_path.mkdir()
|
||||
self.write_fetcher(
|
||||
"f1.100.txt.part", "\n".join(["alpha 0 1", "bravo 0 1", "charlie 0 1", ""])
|
||||
)
|
||||
self.write_fetcher(
|
||||
"f2.100.txt.part",
|
||||
"\n".join(["alpha 0.1 1.0", "charlie 3.0 3.1", "delta 0.2 0.2.1", ""]),
|
||||
)
|
||||
|
||||
with self.supervisor():
|
||||
with self.connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO `log` (`attr_path`, `started`, `finished`, `exit_code`)
|
||||
VALUES
|
||||
('alpha', 100, 105, 0),
|
||||
('bravo', 120, 125, 0),
|
||||
('charlie', 110, 115, 1)
|
||||
"""
|
||||
)
|
||||
msg = self.worker_request(supervisor.READY)
|
||||
self.assertEqual(msg, supervisor.JOB + b"delta 0.2 0.2.1\n")
|
||||
msg = self.worker_request(supervisor.READY)
|
||||
self.assertTrue(msg.startswith(supervisor.JOB + b"alpha "))
|
||||
msg = self.worker_request(supervisor.READY)
|
||||
self.assertTrue(msg.startswith(supervisor.JOB + b"charlie "))
|
||||
msg = self.worker_request(supervisor.DONE + b"delta 1\n")
|
||||
self.assertTrue(msg.startswith(supervisor.JOB + b"bravo "))
|
||||
msg = self.worker_request(supervisor.DONE + b"charlie 0\n")
|
||||
self.assertTrue(msg.startswith(supervisor.JOB + b"charlie "))
|
||||
msg = self.worker_request(supervisor.DONE + b"bravo 0\n")
|
||||
self.assertEqual(msg, supervisor.NOJOBS)
|
||||
self.write_fetcher("f1.100.txt.part", "echo 0 1\n")
|
||||
msg = self.worker_request(supervisor.DONE + b"charlie 0\n")
|
||||
self.assertEqual(msg, supervisor.JOB + b"echo 0 1\n")
|
||||
|
||||
self.assertDBContents(
|
||||
"SELECT `attr_path`, `exit_code` FROM `log`",
|
||||
{
|
||||
("alpha", None),
|
||||
("bravo", 0),
|
||||
("charlie", 0),
|
||||
("delta", 1),
|
||||
("echo", None),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Add table
Reference in a new issue