summaryrefslogtreecommitdiff
path: root/apt-pkg/acquire.cc
diff options
context:
space:
mode:
Diffstat (limited to 'apt-pkg/acquire.cc')
-rw-r--r--apt-pkg/acquire.cc80
1 files changed, 67 insertions, 13 deletions
diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc
index 8693de930..c3682504f 100644
--- a/apt-pkg/acquire.cc
+++ b/apt-pkg/acquire.cc
@@ -24,15 +24,17 @@
#include <algorithm>
#include <chrono>
+#include <cmath>
#include <iomanip>
#include <iostream>
#include <memory>
#include <numeric>
#include <sstream>
#include <string>
+#include <tuple>
#include <vector>
-#include <cmath>
+#include <assert.h>
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
@@ -51,6 +53,14 @@
using namespace std;
+// helper to convert time_point to a timeval
+static struct timeval SteadyDurationToTimeVal(std::chrono::steady_clock::duration Time)
+{
+ auto const Time_sec = std::chrono::duration_cast<std::chrono::seconds>(Time);
+ auto const Time_usec = std::chrono::duration_cast<std::chrono::microseconds>(Time - Time_sec);
+ return {Time_sec.count(), Time_usec.count()};
+}
+
std::string pkgAcquire::URIEncode(std::string const &part) /*{{{*/
{
// The "+" is encoded as a workaround for an S3 bug (LP#1003633 and LP#1086997)
@@ -686,7 +696,7 @@ static void CheckDropPrivsMustBeDisabled(pkgAcquire const &Fetcher)
if (setgroups(old_gidlist_nr, old_gidlist.get()))
_error->FatalE("setgroups", "setgroups %u failed", 0);
}
-pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
+pkgAcquire::RunResult pkgAcquire::Run(int PulseInterval)
{
_error->PushToStack();
CheckDropPrivsMustBeDisabled(*this);
@@ -702,9 +712,7 @@ pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
bool WasCancelled = false;
// Run till all things have been acquired
- struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = PulseIntervall;
+ struct timeval tv = SteadyDurationToTimeVal(std::chrono::microseconds(PulseInterval));
while (ToFetch > 0)
{
fd_set RFds;
@@ -713,7 +721,37 @@ pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
FD_ZERO(&RFds);
FD_ZERO(&WFds);
SetFds(Highest,&RFds,&WFds);
-
+
+ // Shorten the select() cycle in case we have items about to become ready
+ auto now = clock::now();
+ auto fetchAfter = time_point{};
+ for (Queue *I = Queues; I != nullptr; I = I->Next)
+ {
+ if (I->Items == nullptr)
+ continue;
+
+ auto f = I->Items->GetFetchAfter();
+
+ if (f == time_point() || I->Items->Owner->Status != pkgAcquire::Item::StatIdle)
+ continue;
+
+ if (f <= now)
+ {
+ I->Cycle(); // Queue got stuck, unstuck it.
+ fetchAfter = now; // need to time out in select() below
+ assert(I->Items->Owner->Status != pkgAcquire::Item::StatIdle);
+ }
+ else if (f < fetchAfter || fetchAfter == time_point{})
+ {
+ fetchAfter = f;
+ }
+ }
+
+ if (fetchAfter != time_point{} && (fetchAfter - now) < std::chrono::seconds(tv.tv_sec) + std::chrono::microseconds(tv.tv_usec))
+ {
+ tv = SteadyDurationToTimeVal(fetchAfter - now);
+ }
+
int Res;
do
{
@@ -733,7 +771,8 @@ pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
// Timeout, notify the log class
if (Res == 0 || (Log != 0 && Log->Update == true))
{
- tv.tv_usec = PulseIntervall;
+ tv = SteadyDurationToTimeVal(std::chrono::microseconds(PulseInterval));
+
for (Worker *I = Workers; I != 0; I = I->NextAcquire)
I->Pulse();
if (Log != 0 && Log->Pulse(this) == false)
@@ -962,6 +1001,7 @@ bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
};
QItem **OptimalI = &Items;
QItem **I = &Items;
+ auto insertLocation = std::make_tuple(Item.Owner->FetchAfter(), -Item.Owner->Priority());
// move to the end of the queue and check for duplicates here
for (; *I != 0; ) {
if (Item.URI == (*I)->URI && MetaKeysMatch(Item, *I))
@@ -974,10 +1014,12 @@ bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
}
// Determine the optimal position to insert: before anything with a
// higher priority.
- int priority = (*I)->GetPriority();
+ auto queueLocation = std::make_tuple((*I)->GetFetchAfter(),
+ -(*I)->GetPriority());
I = &(*I)->Next;
- if (priority >= Item.Owner->Priority()) {
+ if (queueLocation <= insertLocation)
+ {
OptimalI = I;
}
}
@@ -1153,6 +1195,7 @@ bool pkgAcquire::Queue::Cycle()
// Look for a queable item
QItem *I = Items;
int ActivePriority = 0;
+ auto currentTime = clock::now();
while (PipeDepth < static_cast<decltype(PipeDepth)>(MaxPipeDepth))
{
for (; I != 0; I = I->Next) {
@@ -1170,6 +1213,11 @@ bool pkgAcquire::Queue::Cycle()
// the queue is idle
if (I->GetPriority() < ActivePriority)
return true;
+
+ // Item is not ready yet, delay
+ if (I->GetFetchAfter() > currentTime)
+ return true;
+
I->Worker = Workers;
for (auto const &O: I->Owners)
O->Status = pkgAcquire::Item::StatFetching;
@@ -1237,6 +1285,15 @@ APT_PURE int pkgAcquire::Queue::QItem::GetPriority() const /*{{{*/
return Priority;
}
/*}}}*/
+APT_PURE pkgAcquire::time_point pkgAcquire::Queue::QItem::GetFetchAfter() const /*{{{*/
+{
+ time_point FetchAfter{};
+ for (auto const &O : Owners)
+ FetchAfter = std::max(FetchAfter, O->FetchAfter());
+
+ return FetchAfter;
+}
+ /*}}}*/
void pkgAcquire::Queue::QItem::SyncDestinationFiles() const /*{{{*/
{
/* ensure that the first owner has the best partial file of all and
@@ -1294,10 +1351,7 @@ pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(-1), Update(true), MoreP
as well as the current CPS estimate. */
static struct timeval GetTimevalFromSteadyClock()
{
- auto const Time = std::chrono::steady_clock::now().time_since_epoch();
- auto const Time_sec = std::chrono::duration_cast<std::chrono::seconds>(Time);
- auto const Time_usec = std::chrono::duration_cast<std::chrono::microseconds>(Time - Time_sec);
- return { Time_sec.count(), Time_usec.count() };
+ return SteadyDurationToTimeVal(std::chrono::steady_clock::now().time_since_epoch());
}
bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
{