summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Kalnischkies <david@kalnischkies.de>2016-04-06 01:08:57 +0200
committerDavid Kalnischkies <david@kalnischkies.de>2016-04-07 13:48:31 +0200
commit38f8704e419ed93f433129e20df5611df6652620 (patch)
treeb49599c1b86bb22109fc1f68468e385baf6d9ce0
parent57f16d51f4158dce1a49f6d5f5f05f057125b871 (diff)
stop handling items in doomed transactions
With the previous commit we track the state of transactions, so we can now use our knowledge to avoid processing data for a transaction which was already closed (via an abort in this case). This is needed as multiple independent processes are interacting in the process, so there isn't a simple immediate full-engine stop and it would also be bad to teach each and every item how to check if its manager has failed subordinate and what to do in that case. In the pdiff case, which deals (potentially) with many items during its lifetime e.g. a hashsum mismatch in another file can abort the transaction the file we try to patch via pdiff belongs to. This causes some of the items (which are already done) to be aborted with it, but items still in the process of acquisition continue in the processing and will later try to use all the items together failing in strange ways as cleanup already happened. The chosen solution is to dry up the communication channels instead by ignoring new requests for data acquisition, canceling requests which are not assigned to a queue and not calling Done/Failed on items anymore. This means that e.g. already started or pending (e.g. pipelined) downloads aren't stopped and continue as normal for now, but they remain in partial/ and aren't processed further so the next update command will pick them up and put them to good use while the current process fails updating (for this transaction group) in an orderly fashion. Closes: 817240 Thanks: Barr Detwix & Vincent Lefevre for log files
-rw-r--r--apt-pkg/acquire-item.cc8
-rw-r--r--apt-pkg/acquire-worker.cc115
-rwxr-xr-xtest/integration/test-pdiff-usage32
3 files changed, 102 insertions, 53 deletions
diff --git a/apt-pkg/acquire-item.cc b/apt-pkg/acquire-item.cc
index d820756ca..b2e578629 100644
--- a/apt-pkg/acquire-item.cc
+++ b/apt-pkg/acquire-item.cc
@@ -281,6 +281,12 @@ bool pkgAcquire::Item::QueueURI(pkgAcquire::ItemDesc &Item)
for a hashsum mismatch to happen which helps nobody) */
bool pkgAcqTransactionItem::QueueURI(pkgAcquire::ItemDesc &Item)
{
+ if (TransactionManager->State != TransactionStarted)
+ {
+ if (_config->FindB("Debug::Acquire::Transaction", false))
+ std::clog << "Skip " << Target.URI << " as transaction was already dealt with!" << std::endl;
+ return false;
+ }
std::string const FinalFile = GetFinalFilename();
if (TransactionManager != NULL && TransactionManager->IMSHit == true &&
FileExists(FinalFile) == true)
@@ -866,6 +872,8 @@ void pkgAcqMetaBase::AbortTransaction()
for (std::vector<pkgAcqTransactionItem*>::iterator I = Transaction.begin();
I != Transaction.end(); ++I)
{
+ if ((*I)->Status != pkgAcquire::Item::StatFetching)
+ Owner->Dequeue(*I);
(*I)->TransactionState(TransactionAbort);
}
Transaction.clear();
diff --git a/apt-pkg/acquire-worker.cc b/apt-pkg/acquire-worker.cc
index f901847f7..ca1fd4836 100644
--- a/apt-pkg/acquire-worker.cc
+++ b/apt-pkg/acquire-worker.cc
@@ -172,6 +172,25 @@ bool pkgAcquire::Worker::ReadMessages()
// ---------------------------------------------------------------------
/* This takes the messages from the message queue and runs them through
the parsers in order. */
+enum class APT_HIDDEN MessageType {
+ CAPABILITIES = 100,
+ LOG = 101,
+ STATUS = 102,
+ REDIRECT = 103,
+ WARNING = 104,
+ URI_START = 200,
+ URI_DONE = 201,
+ URI_FAILURE = 400,
+ GENERAL_FAILURE = 401,
+ MEDIA_CHANGE = 403
+};
+static bool isDoomedItem(pkgAcquire::Item const * const Itm)
+{
+ auto const TransItm = dynamic_cast<pkgAcqTransactionItem const * const>(Itm);
+ if (TransItm == nullptr)
+ return false;
+ return TransItm->TransactionManager->State != pkgAcqTransactionItem::TransactionStarted;
+}
bool pkgAcquire::Worker::RunMessages()
{
while (MessageQueue.empty() == false)
@@ -184,7 +203,7 @@ bool pkgAcquire::Worker::RunMessages()
// Fetch the message number
char *End;
- int Number = strtol(Message.c_str(),&End,10);
+ MessageType const Number = static_cast<MessageType>(strtoul(Message.c_str(),&End,10));
if (End == Message.c_str())
return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
@@ -210,27 +229,23 @@ bool pkgAcquire::Worker::RunMessages()
// Determine the message number and dispatch
switch (Number)
{
- // 100 Capabilities
- case 100:
+ case MessageType::CAPABILITIES:
if (Capabilities(Message) == false)
return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
break;
- // 101 Log
- case 101:
+ case MessageType::LOG:
if (Debug == true)
clog << " <- (log) " << LookupTag(Message,"Message") << endl;
break;
- // 102 Status
- case 102:
+ case MessageType::STATUS:
Status = LookupTag(Message,"Message");
break;
- // 103 Redirect
- case 103:
+ case MessageType::REDIRECT:
{
- if (Itm == 0)
+ if (Itm == nullptr)
{
_error->Error("Method gave invalid 103 Redirect message");
break;
@@ -248,11 +263,13 @@ bool pkgAcquire::Worker::RunMessages()
// and then put it in the main queue again
std::vector<Item*> const ItmOwners = Itm->Owners;
OwnerQ->ItemDone(Itm);
- Itm = NULL;
- for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
+ Itm = nullptr;
+ for (auto const &Owner: ItmOwners)
{
- pkgAcquire::Item *Owner = *O;
pkgAcquire::ItemDesc &desc = Owner->GetItemDesc();
+ if (Log != nullptr)
+ Log->Done(desc);
+
// if we change site, treat it as a mirror change
if (URI::SiteOnly(NewURI) != URI::SiteOnly(desc.URI))
{
@@ -270,22 +287,19 @@ bool pkgAcquire::Worker::RunMessages()
}
}
desc.URI = NewURI;
- OwnerQ->Owner->Enqueue(desc);
-
- if (Log != 0)
- Log->Done(desc);
+ if (isDoomedItem(Owner) == false)
+ OwnerQ->Owner->Enqueue(desc);
}
break;
}
- // 104 Warning
- case 104:
+
+ case MessageType::WARNING:
_error->Warning("%s: %s", Itm->Owner->DescURI().c_str(), LookupTag(Message,"Message").c_str());
break;
- // 200 URI Start
- case 200:
+ case MessageType::URI_START:
{
- if (Itm == 0)
+ if (Itm == nullptr)
{
_error->Error("Method gave invalid 200 URI Start message");
break;
@@ -295,26 +309,24 @@ bool pkgAcquire::Worker::RunMessages()
CurrentSize = 0;
TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10);
- for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O)
+ for (auto const Owner: Itm->Owners)
{
- (*O)->Start(Message, TotalSize);
-
+ Owner->Start(Message, TotalSize);
// Display update before completion
- if (Log != 0)
+ if (Log != nullptr)
{
if (Log->MorePulses == true)
- Log->Pulse((*O)->GetOwner());
- Log->Fetch((*O)->GetItemDesc());
+ Log->Pulse(Owner->GetOwner());
+ Log->Fetch(Owner->GetItemDesc());
}
}
break;
}
- // 201 URI Done
- case 201:
+ case MessageType::URI_DONE:
{
- if (Itm == 0)
+ if (Itm == nullptr)
{
_error->Error("Method gave invalid 201 URI Done message");
break;
@@ -363,9 +375,8 @@ bool pkgAcquire::Worker::RunMessages()
bool const isIMSHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) ||
StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false);
- for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
+ for (auto const Owner: ItmOwners)
{
- pkgAcquire::Item * const Owner = *O;
HashStringList const ExpectedHashes = Owner->GetExpectedHashes();
if(_config->FindB("Debug::pkgAcquire::Auth", false) == true)
{
@@ -412,10 +423,12 @@ bool pkgAcquire::Worker::RunMessages()
else // hashsum mismatch
Owner->Status = pkgAcquire::Item::StatAuthError;
+
if (consideredOkay == true)
{
- Owner->Done(Message, ReceivedHashes, Config);
- if (Log != 0)
+ if (isDoomedItem(Owner) == false)
+ Owner->Done(Message, ReceivedHashes, Config);
+ if (Log != nullptr)
{
if (isIMSHit)
Log->IMSHit(Owner->GetItemDesc());
@@ -425,8 +438,9 @@ bool pkgAcquire::Worker::RunMessages()
}
else
{
- Owner->Failed(Message,Config);
- if (Log != 0)
+ if (isDoomedItem(Owner) == false)
+ Owner->Failed(Message,Config);
+ if (Log != nullptr)
Log->Fail(Owner->GetItemDesc());
}
}
@@ -434,10 +448,9 @@ bool pkgAcquire::Worker::RunMessages()
break;
}
- // 400 URI Failure
- case 400:
+ case MessageType::URI_FAILURE:
{
- if (Itm == 0)
+ if (Itm == nullptr)
{
std::string const msg = LookupTag(Message,"Message");
_error->Error("Method gave invalid 400 URI Failure message: %s", msg.c_str());
@@ -447,13 +460,13 @@ bool pkgAcquire::Worker::RunMessages()
PrepareFiles("400::URIFailure", Itm);
// Display update before completion
- if (Log != 0 && Log->MorePulses == true)
+ if (Log != nullptr && Log->MorePulses == true)
for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O)
Log->Pulse((*O)->GetOwner());
std::vector<Item*> const ItmOwners = Itm->Owners;
OwnerQ->ItemDone(Itm);
- Itm = NULL;
+ Itm = nullptr;
bool errTransient;
{
@@ -463,27 +476,25 @@ bool pkgAcquire::Worker::RunMessages()
errTransient = std::find(std::begin(reasons), std::end(reasons), failReason) != std::end(reasons);
}
- for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O)
+ for (auto const Owner: ItmOwners)
{
if (errTransient)
- (*O)->Status = pkgAcquire::Item::StatTransientNetworkError;
- (*O)->Failed(Message,Config);
-
- if (Log != 0)
- Log->Fail((*O)->GetItemDesc());
+ Owner->Status = pkgAcquire::Item::StatTransientNetworkError;
+ if (isDoomedItem(Owner) == false)
+ Owner->Failed(Message,Config);
+ if (Log != nullptr)
+ Log->Fail(Owner->GetItemDesc());
}
ItemDone();
break;
}
- // 401 General Failure
- case 401:
+ case MessageType::GENERAL_FAILURE:
_error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
break;
- // 403 Media Change
- case 403:
+ case MessageType::MEDIA_CHANGE:
MediaChange(Message);
break;
}
diff --git a/test/integration/test-pdiff-usage b/test/integration/test-pdiff-usage
index 11c8c4cc1..4e2d1f182 100755
--- a/test/integration/test-pdiff-usage
+++ b/test/integration/test-pdiff-usage
@@ -10,9 +10,24 @@ LOWCOSTEXT='lz4'
buildaptarchive
setupflataptarchive
-changetowebserver -o aptwebserver::support::modified-since=false
+changetowebserver
+
+cat >rootdir/etc/apt/apt.conf.d/contents.conf <<EOF
+Acquire::IndexTargets::deb::Contents {
+ MetaKey "\$(COMPONENT)/Contents-\$(ARCHITECTURE)";
+ ShortDescription "Contents";
+ Description "\$(RELEASE)/\$(COMPONENT) \$(ARCHITECTURE) Contents";
+ MetaKey "\$(COMPONENT)/Contents-\$(ARCHITECTURE)";
+ flatMetaKey "Contents-\$(ARCHITECTURE)";
+ flatDescription "\$(RELEASE) \$(ARCHITECTURE) Contents";
+};
+EOF
PKGFILE="${TESTDIR}/$(echo "$(basename $0)" | sed 's#^test-#Packages-#')"
+echo 'contents for stuff' > aptarchive/Contents-i386
+compressfile aptarchive/Contents-i386
+echo 'hacked' > aptarchive/hacked-i386
+compressfile aptarchive/hacked-i386
wasmergeused() {
testsuccess apt update "$@"
@@ -193,6 +208,21 @@ SHA256-Download:
" aptcache show apt newstuff futurestuff
# we reuse the archive state of the previous test here
+ msgmsg "Testcase: pdiff handling is stopped if transaction fails $*"
+ rm -rf rootdir/var/lib/apt/lists
+ cp -a rootdir/var/lib/apt/lists-bak rootdir/var/lib/apt/lists
+ cp Packages-future aptarchive/Packages
+ rm -f rootdir/var/lib/apt/lists/*_Contents-*
+ webserverconfig 'aptwebserver::overwrite::.*Contents-.*::filename' '/hacked-i386.gz'
+ testfailure apt update "$@" -o Debug::pkgAcquire::Worker=1 -o Debug::Acquire::http=1
+ webserverconfig 'aptwebserver::overwrite::.*Contents-.*::filename' '/Contents-i386.gz'
+ cp rootdir/tmp/testfailure.output patchdownload.output
+ testfailure grep 'rred:600' patchdownload.output
+ testnopackage newstuff futurestuff
+ testsuccessequal "$(cat "${PKGFILE}")
+" aptcache show apt oldstuff
+
+ # we reuse the archive state of the previous test here
msgmsg "Testcase: downloading a patch fails, but successful fallback: $*"
rm -rf rootdir/var/lib/apt/lists
cp -a rootdir/var/lib/apt/lists-bak rootdir/var/lib/apt/lists