build02/nixpkgs-update: scramble order of new jobs

This commit is contained in:
Ryan Hendrickson 2024-01-03 22:43:54 -05:00 committed by zowoq
parent a4b82fa209
commit 6de26b616a

View file

@ -48,10 +48,10 @@ protocol for communication. The following commands are expected:
The possible responses to either command are:
NOJOB
NOJOBS
JOB <attribute_path> <rest_of_line>
When a worker receives NOJOB, it is expected to wait some period of
When a worker receives NOJOBS, it is expected to wait some period of
time before asking for jobs again (with READY).
When a worker receives JOB, it is expected to complete that job and
@ -63,6 +63,7 @@ This code has a unit test suite in supervisor_test.py.
import asyncio
import contextlib
import functools
import hashlib
import os
import pathlib
import sqlite3
@ -110,6 +111,18 @@ JOB = b"JOB "
NOJOBS = b"NOJOBS\n"
def jitter_hash(data: str) -> int:
"""
Deterministically produce a 32-bit integer from a string, not correlated
with the sort order of the string.
The implementation shouldn't matter but if it ever changes, the database
queue will need to be recreated.
"""
h = hashlib.md5(data.encode(), usedforsecurity=False)
return int.from_bytes(h.digest()[:4], byteorder="little", signed=True)
class Storage:
"""
Abstracts over the database that stores the supervisor state.
@ -181,7 +194,7 @@ class Storage:
return decorator if fun is None else decorator(fun)
@_cursor_method
def upgrade(self, cur: sqlite3.Cursor, version: int = 1) -> None:
def upgrade(self, cur: sqlite3.Cursor, version: int = 2) -> None:
"""
Create or update the database schema.
@ -302,6 +315,38 @@ class Storage:
WHERE `queue`.`attr_path` = NEW.`attr_path`;
END;
COMMIT;
"""
)
if user_version < 2 <= version:
# We want to add some disorder to the initial order of packages;
# dispatching packages that are alphabetically adjacent increases
# the chances of parallel duplicates or getting stuck on a large
# block of time-consuming packages.
#
# Unfortunately, to apply this jitter retroactively we need to
# delete most of the rows already in the database.
cur.executescript(
"""
BEGIN;
DELETE FROM `fetcher_runs`;
DELETE FROM `log`;
ALTER TABLE `queue`
ADD COLUMN `order_jitter` `integer`;
DROP INDEX `queue_order_index`;
CREATE INDEX `queue_order_index`
ON `queue` (
`last_started` ASC,
`order_jitter`,
`attr_path`,
`fetcher_id`,
`fetcher_run_started` DESC
);
COMMIT;
"""
)
@ -378,13 +423,13 @@ class Storage:
"""
cur.executemany(
"""
INSERT INTO `queue` (`fetcher_id`, `fetcher_run_started`, `attr_path`, `payload`)
SELECT `fetcher_id`, ?, ?, ?
INSERT INTO `queue` (`fetcher_id`, `fetcher_run_started`, `attr_path`, `payload`, `order_jitter`)
SELECT `fetcher_id`, ?, ?, ?, ?
FROM `fetchers`
WHERE `name` = ?
ON CONFLICT DO UPDATE SET `payload` = excluded.`payload`
""",
((run_started, a, p, fetcher) for a, p in entries),
((run_started, a, p, jitter_hash(a), fetcher) for a, p in entries),
)
@_cursor_method(transaction=True)
@ -414,11 +459,11 @@ class Storage:
`payload`
FROM `queue`
LEFT JOIN `log` USING (`attr_path`)
GROUP BY `last_started`, `attr_path`, `fetcher_id`
GROUP BY `last_started`, `order_jitter`, `attr_path`, `fetcher_id`
HAVING `is_dequeued` = 0
AND (`started` IS NULL OR `finished` IS NOT NULL OR `started` + 43200 < ?)
AND (TRUE OR `max`(`fetcher_run_started`))
ORDER BY `last_started` ASC, `attr_path`, `fetcher_id`
ORDER BY `last_started` ASC, `order_jitter`, `attr_path`, `fetcher_id`
LIMIT 1
""",
(start_time,),