summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apt-pkg/acquire.cc110
1 files changed, 66 insertions, 44 deletions
diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc
index 8bb72d549..6cf8b4c83 100644
--- a/apt-pkg/acquire.cc
+++ b/apt-pkg/acquire.cc
@@ -385,75 +385,97 @@ void pkgAcquire::Dequeue(Item *Itm)
return http://foo.org or http */
string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
{
+ constexpr int DEFAULT_HOST_LIMIT = 10;
URI U(Uri);
-
- Config = GetConfig(U.Access);
- if (Config == 0)
- return string();
-
- /* Single-Instance methods get exactly one queue per URI. This is
- also used for the Access queue method */
- if (Config->SingleInstance == true || QueueMode == QueueAccess)
+ // Access mode forces all methods to be Single-Instance
+ if (QueueMode == QueueAccess)
return U.Access;
- int Limit = 10;
- string AccessSchema = U.Access + ':';
- string FullQueueName;
+ Config = GetConfig(U.Access);
+ if (Config == nullptr)
+ return {};
+
+ // Single-Instance methods get exactly one queue per URI
+ if (Config->SingleInstance == true)
+ return U.Access;
+ // Host-less methods like rred, store, …
if (U.Host.empty())
{
- long existing = 0;
+ int existing = 0;
// check how many queues exist already and reuse empty ones
+ auto const AccessSchema = U.Access + ':';
for (Queue const *I = Queues; I != 0; I = I->Next)
- if (I->Name.compare(0, AccessSchema.length(), AccessSchema) == 0)
+ if (APT::String::Startswith(I->Name, AccessSchema))
{
if (I->Items == nullptr)
return I->Name;
++existing;
}
+ int const Limit = _config->FindI("Acquire::QueueHost::Limit",
#ifdef _SC_NPROCESSORS_ONLN
- long cpuCount = sysconf(_SC_NPROCESSORS_ONLN) * 2;
+ sysconf(_SC_NPROCESSORS_ONLN) * 2
#else
- long cpuCount = Limit;
+ DEFAULT_HOST_LIMIT
#endif
- Limit = _config->FindI("Acquire::QueueHost::Limit", cpuCount);
+ );
+ // create a new worker if we don't have too many yet
if (Limit <= 0 || existing < Limit)
- strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), existing);
- else
- {
- long const randomQueue = random() % Limit;
- strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), randomQueue);
- }
+ return AccessSchema + std::to_string(existing);
+
+ // find the worker with the least to do
+ // we already established that there are no empty and we can't spawn new
+ Queue const *selected = nullptr;
+ auto selected_backlog = std::numeric_limits<decltype(HashStringList().FileSize())>::max();
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (APT::String::Startswith(Q->Name, AccessSchema))
+ {
+ decltype(selected_backlog) current_backlog = 0;
+ for (auto const *I = Q->Items; I != nullptr; I = I->Next)
+ {
+ auto const hashes = I->Owner->GetExpectedHashes();
+ if (not hashes.empty())
+ current_backlog += hashes.FileSize();
+ else
+ current_backlog += I->Owner->FileSize;
+ }
+ if (current_backlog < selected_backlog)
+ {
+ selected = Q;
+ selected_backlog = current_backlog;
+ }
+ }
- if (Debug)
- clog << "Chose random queue " << FullQueueName << " for " << Uri << endl;
- } else
- {
- Limit = _config->FindI("Acquire::QueueHost::Limit", Limit);
- FullQueueName = AccessSchema + U.Host;
+ if (unlikely(selected == nullptr))
+ return AccessSchema + "0";
+ return selected->Name;
}
- unsigned int Instances = 0, SchemaLength = AccessSchema.length();
-
- Queue *I = Queues;
- for (; I != 0; I = I->Next) {
+ // most methods talking to remotes like http
+ else
+ {
+ auto const FullQueueName = U.Access + ':' + U.Host;
// if the queue already exists, re-use it
- if (I->Name == FullQueueName)
- return FullQueueName;
-
- if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
- Instances++;
- }
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (Q->Name == FullQueueName)
+ return FullQueueName;
- if (Debug) {
- clog << "Found " << Instances << " instances of " << U.Access << endl;
- }
+ int existing = 0;
+ // check how many queues exist already and reuse empty ones
+ auto const AccessSchema = U.Access + ':';
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (APT::String::Startswith(Q->Name, AccessSchema))
+ ++existing;
- if (Instances >= static_cast<decltype(Instances)>(Limit))
- return U.Access;
+ int const Limit = _config->FindI("Acquire::QueueHost::Limit", DEFAULT_HOST_LIMIT);
+ // if we have too many hosts open use a single generic for the rest
+ if (existing >= Limit)
+ return U.Access;
- return FullQueueName;
+ // we can still create new named queues
+ return FullQueueName;
+ }
}
/*}}}*/
// Acquire::GetConfig - Fetch the configuration information /*{{{*/