diff options
author | David Kalnischkies <david@kalnischkies.de> | 2019-04-27 16:13:02 +0200 |
---|---|---|
committer | David Kalnischkies <david@kalnischkies.de> | 2019-07-08 13:18:31 +0200 |
commit | 79b1a82983e737e74359bc306d9edb357c5bdd46 (patch) | |
tree | 28fe86abdf88e54e8e9407e81e59d01e149a2ef2 /apt-pkg/acquire.cc | |
parent | 8ff87e9cd37a4436eb7e56f814a099cb30845ae1 (diff) |
Distribute host-less work based on backlog of the queues
Work like applying patches via rred can be performed by many concurrent
rred processes, but we can't just spawn new ones forever: We limit us to
the number of CPUs which can drive them and reuse existing ones if they
have nothing to do at the moment.
The problem arises if we have reached the limit of queues and all of
them are busy which is more likely to happen on "slow" machines with few
CPUs. In this case we opted for random distribution, but that can result
in many big files (e.g. Contents) being added to one queue while the
others get none or only small files.
Ideally we would ask the methods how much they still have to do, but
they only know that for the current item, not for all items in the
queue, so we use the filesize of the expected result.
Diffstat (limited to 'apt-pkg/acquire.cc')
-rw-r--r-- | apt-pkg/acquire.cc | 110 |
1 files changed, 66 insertions, 44 deletions
diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc index 8bb72d549..6cf8b4c83 100644 --- a/apt-pkg/acquire.cc +++ b/apt-pkg/acquire.cc @@ -385,75 +385,97 @@ void pkgAcquire::Dequeue(Item *Itm) return http://foo.org or http */ string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config) { + constexpr int DEFAULT_HOST_LIMIT = 10; URI U(Uri); - - Config = GetConfig(U.Access); - if (Config == 0) - return string(); - - /* Single-Instance methods get exactly one queue per URI. This is - also used for the Access queue method */ - if (Config->SingleInstance == true || QueueMode == QueueAccess) + // Access mode forces all methods to be Single-Instance + if (QueueMode == QueueAccess) return U.Access; - int Limit = 10; - string AccessSchema = U.Access + ':'; - string FullQueueName; + Config = GetConfig(U.Access); + if (Config == nullptr) + return {}; + + // Single-Instance methods get exactly one queue per URI + if (Config->SingleInstance == true) + return U.Access; + // Host-less methods like rred, store, … if (U.Host.empty()) { - long existing = 0; + int existing = 0; // check how many queues exist already and reuse empty ones + auto const AccessSchema = U.Access + ':'; for (Queue const *I = Queues; I != 0; I = I->Next) - if (I->Name.compare(0, AccessSchema.length(), AccessSchema) == 0) + if (APT::String::Startswith(I->Name, AccessSchema)) { if (I->Items == nullptr) return I->Name; ++existing; } + int const Limit = _config->FindI("Acquire::QueueHost::Limit", #ifdef _SC_NPROCESSORS_ONLN - long cpuCount = sysconf(_SC_NPROCESSORS_ONLN) * 2; + sysconf(_SC_NPROCESSORS_ONLN) * 2 #else - long cpuCount = Limit; + DEFAULT_HOST_LIMIT #endif - Limit = _config->FindI("Acquire::QueueHost::Limit", cpuCount); + ); + // create a new worker if we don't have too many yet if (Limit <= 0 || existing < Limit) - strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), existing); - else - { - long const randomQueue = random() % Limit; - strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), randomQueue); - } + return AccessSchema + std::to_string(existing); + + // find the worker with the least to do + // we already established that there are no empty and we can't spawn new + Queue const *selected = nullptr; + auto selected_backlog = std::numeric_limits<decltype(HashStringList().FileSize())>::max(); + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (APT::String::Startswith(Q->Name, AccessSchema)) + { + decltype(selected_backlog) current_backlog = 0; + for (auto const *I = Q->Items; I != nullptr; I = I->Next) + { + auto const hashes = I->Owner->GetExpectedHashes(); + if (not hashes.empty()) + current_backlog += hashes.FileSize(); + else + current_backlog += I->Owner->FileSize; + } + if (current_backlog < selected_backlog) + { + selected = Q; + selected_backlog = current_backlog; + } + } - if (Debug) - clog << "Chose random queue " << FullQueueName << " for " << Uri << endl; - } else - { - Limit = _config->FindI("Acquire::QueueHost::Limit", Limit); - FullQueueName = AccessSchema + U.Host; + if (unlikely(selected == nullptr)) + return AccessSchema + "0"; + return selected->Name; } - unsigned int Instances = 0, SchemaLength = AccessSchema.length(); - - Queue *I = Queues; - for (; I != 0; I = I->Next) { + // most methods talking to remotes like http + else + { + auto const FullQueueName = U.Access + ':' + U.Host; // if the queue already exists, re-use it - if (I->Name == FullQueueName) - return FullQueueName; - - if (I->Name.compare(0, SchemaLength, AccessSchema) == 0) - Instances++; - } + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (Q->Name == FullQueueName) + return FullQueueName; - if (Debug) { - clog << "Found " << Instances << " instances of " << U.Access << endl; - } + int existing = 0; + // check how many queues exist already and reuse empty ones + auto const AccessSchema = U.Access + ':'; + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (APT::String::Startswith(Q->Name, AccessSchema)) + ++existing; - if (Instances >= static_cast<decltype(Instances)>(Limit)) - return U.Access; + int const Limit = _config->FindI("Acquire::QueueHost::Limit", DEFAULT_HOST_LIMIT); + // if we have too many hosts open use a single generic for the rest + if (existing >= Limit) + return U.Access; - return FullQueueName; + // we can still create new named queues + return FullQueueName; + } } /*}}}*/ // Acquire::GetConfig - Fetch the configuration information /*{{{*/ |