From 6de26b616ac0aa6d0879482b6d249919469ead50 Mon Sep 17 00:00:00 2001
From: Ryan Hendrickson <ryan.hendrickson@alum.mit.edu>
Date: Wed, 3 Jan 2024 22:43:54 -0500
Subject: [PATCH] build02/nixpkgs-update: scramble order of new jobs

---
 hosts/build02/supervisor.py | 61 ++++++++++++++++++++++++++++++++-----
 1 file changed, 53 insertions(+), 8 deletions(-)

diff --git a/hosts/build02/supervisor.py b/hosts/build02/supervisor.py
index 5bd9020..c2a82a4 100644
--- a/hosts/build02/supervisor.py
+++ b/hosts/build02/supervisor.py
@@ -48,10 +48,10 @@ protocol for communication. The following commands are expected:
 
 The possible responses to either command are:
 
-    NOJOB
+    NOJOBS
     JOB <attribute_path> <rest_of_line>
 
-When a worker receives NOJOB, it is expected to wait some period of
+When a worker receives NOJOBS, it is expected to wait some period of
 time before asking for jobs again (with READY).
 
 When a worker receives JOB, it is expected to complete that job and
@@ -63,6 +63,7 @@ This code has a unit test suite in supervisor_test.py.
 import asyncio
 import contextlib
 import functools
+import hashlib
 import os
 import pathlib
 import sqlite3
@@ -110,6 +111,18 @@ JOB = b"JOB "
 NOJOBS = b"NOJOBS\n"
 
 
+def jitter_hash(data: str) -> int:
+    """
+    Deterministically produce a 32-bit integer from a string, not correlated
+    with the sort order of the string.
+
+    The implementation shouldn't matter but if it ever changes, the database
+    queue will need to be recreated.
+    """
+    h = hashlib.md5(data.encode(), usedforsecurity=False)
+    return int.from_bytes(h.digest()[:4], byteorder="little", signed=True)
+
+
 class Storage:
     """
     Abstracts over the database that stores the supervisor state.
@@ -181,7 +194,7 @@ class Storage:
         return decorator if fun is None else decorator(fun)
 
     @_cursor_method
-    def upgrade(self, cur: sqlite3.Cursor, version: int = 1) -> None:
+    def upgrade(self, cur: sqlite3.Cursor, version: int = 2) -> None:
         """
         Create or update the database schema.
 
@@ -302,6 +315,38 @@ class Storage:
                     WHERE `queue`.`attr_path` = NEW.`attr_path`;
                 END;
 
+                COMMIT;
+                """
+            )
+        if user_version < 2 <= version:
+            # We want to add some disorder to the initial order of packages;
+            # dispatching packages that are alphabetically adjacent increases
+            # the chances of parallel duplicates or getting stuck on a large
+            # block of time-consuming packages.
+            #
+            # Unfortunately, to apply this jitter retroactively we need to
+            # delete most of the rows already in the database.
+            cur.executescript(
+                """
+                BEGIN;
+
+                DELETE FROM `fetcher_runs`;
+                DELETE FROM `log`;
+
+                ALTER TABLE `queue`
+                ADD COLUMN `order_jitter` `integer`;
+
+                DROP INDEX `queue_order_index`;
+
+                CREATE INDEX `queue_order_index`
+                ON `queue` (
+                    `last_started` ASC,
+                    `order_jitter`,
+                    `attr_path`,
+                    `fetcher_id`,
+                    `fetcher_run_started` DESC
+                );
+
                 COMMIT;
                 """
             )
@@ -378,13 +423,13 @@ class Storage:
         """
         cur.executemany(
             """
-            INSERT INTO `queue` (`fetcher_id`, `fetcher_run_started`, `attr_path`, `payload`)
-            SELECT `fetcher_id`, ?, ?, ?
+            INSERT INTO `queue` (`fetcher_id`, `fetcher_run_started`, `attr_path`, `payload`, `order_jitter`)
+            SELECT `fetcher_id`, ?, ?, ?, ?
             FROM `fetchers`
             WHERE `name` = ?
             ON CONFLICT DO UPDATE SET `payload` = excluded.`payload`
             """,
-            ((run_started, a, p, fetcher) for a, p in entries),
+            ((run_started, a, p, jitter_hash(a), fetcher) for a, p in entries),
         )
 
     @_cursor_method(transaction=True)
@@ -414,11 +459,11 @@ class Storage:
                 `payload`
             FROM `queue`
             LEFT JOIN `log` USING (`attr_path`)
-            GROUP BY `last_started`, `attr_path`, `fetcher_id`
+            GROUP BY `last_started`, `order_jitter`, `attr_path`, `fetcher_id`
             HAVING `is_dequeued` = 0
             AND (`started` IS NULL OR `finished` IS NOT NULL OR `started` + 43200 < ?)
             AND (TRUE OR `max`(`fetcher_run_started`))
-            ORDER BY `last_started` ASC, `attr_path`, `fetcher_id`
+            ORDER BY `last_started` ASC, `order_jitter`, `attr_path`, `fetcher_id`
             LIMIT 1
             """,
             (start_time,),