diff options
author | Arch Librarian <arch@canonical.com> | 2004-09-20 16:51:09 +0000 |
---|---|---|
committer | Arch Librarian <arch@canonical.com> | 2004-09-20 16:51:09 +0000 |
commit | 0a8a80e58374771acc225fe1e08ed8e0fe0016cc (patch) | |
tree | afa40f2a73b369e2ba930e47c961992170a669b7 | |
parent | 93641593cafac296b9072288d8ef9e1a526d745b (diff) |
Sync
Author: jgg
Date: 1998-10-22 04:56:38 GMT
Sync
-rw-r--r-- | apt-pkg/acquire-item.cc | 49 | ||||
-rw-r--r-- | apt-pkg/acquire-item.h | 21 | ||||
-rw-r--r-- | apt-pkg/acquire-worker.cc | 247 | ||||
-rw-r--r-- | apt-pkg/acquire-worker.h | 26 | ||||
-rw-r--r-- | apt-pkg/acquire.cc | 287 | ||||
-rw-r--r-- | apt-pkg/acquire.h | 61 | ||||
-rw-r--r-- | apt-pkg/contrib/configuration.cc | 18 | ||||
-rw-r--r-- | apt-pkg/contrib/configuration.h | 7 | ||||
-rw-r--r-- | apt-pkg/contrib/fileutl.cc | 8 | ||||
-rw-r--r-- | apt-pkg/contrib/strutl.cc | 75 | ||||
-rw-r--r-- | apt-pkg/contrib/strutl.h | 9 | ||||
-rw-r--r-- | doc/examples/apt.conf | 8 | ||||
-rw-r--r-- | methods/file.cc | 5 | ||||
-rw-r--r-- | test/scratch.cc | 5 |
14 files changed, 704 insertions, 122 deletions
diff --git a/apt-pkg/acquire-item.cc b/apt-pkg/acquire-item.cc index e92b61181..e1049dde9 100644 --- a/apt-pkg/acquire-item.cc +++ b/apt-pkg/acquire-item.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-item.cc,v 1.2 1998/10/20 02:39:12 jgg Exp $ +// $Id: acquire-item.cc,v 1.3 1998/10/22 04:56:38 jgg Exp $ /* ###################################################################### Acquire Item - Item to acquire @@ -19,6 +19,9 @@ #include <apt-pkg/acquire-item.h> #include <apt-pkg/configuration.h> #include <strutl.h> + +#include <sys/stat.h> +#include <unistd.h> /*}}}*/ // Acquire::Item::Item - Constructor /*{{{*/ @@ -45,23 +48,30 @@ pkgAcquire::Item::~Item() pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location) : Item(Owner), Location(Location) { - QueueURI(Location->PackagesURI() + ".gz"); - Description = Location->PackagesInfo(); + DestFile = _config->FindDir("Dir::State::lists") + "partial/"; + DestFile += URItoFileName(Location->PackagesURI()); + + QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo()); + // Create the Release fetch class new pkgAcqIndexRel(Owner,Location); } /*}}}*/ -// pkgAcqIndex::ToFile - File to write the download to /*{{{*/ +// AcqIndex::Custom600Headers - Insert custom request headers /*{{{*/ // --------------------------------------------------------------------- -/* */ -string pkgAcqIndex::ToFile() +/* The only header we use is the last-modified header. */ +string pkgAcqIndex::Custom600Headers() { - string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/"; + string Final = _config->FindDir("Dir::State::lists"); + Final += URItoFileName(Location->PackagesURI()); + + struct stat Buf; + if (stat(Final.c_str(),&Buf) != 0) + return string(); - return PartialDir + URItoFileName(Location->PackagesURI()); + return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime); } /*}}}*/ - // AcqIndexRel::pkgAcqIndexRel - Constructor /*{{{*/ // --------------------------------------------------------------------- /* The Release file is added to the queue */ @@ -69,17 +79,24 @@ pkgAcqIndexRel::pkgAcqIndexRel(pkgAcquire *Owner, const pkgSourceList::Item *Location) : Item(Owner), Location(Location) { - QueueURI(Location->ReleaseURI()); - Description = Location->ReleaseInfo(); + DestFile = _config->FindDir("Dir::State::lists") + "partial/"; + DestFile += URItoFileName(Location->ReleaseURI()); + + QueueURI(Location->ReleaseURI(),Location->ReleaseInfo()); } /*}}}*/ -// AcqIndexRel::ToFile - File to write the download to /*{{{*/ +// AcqIndexRel::Custom600Headers - Insert custom request headers /*{{{*/ // --------------------------------------------------------------------- -/* */ -string pkgAcqIndexRel::ToFile() +/* The only header we use is the last-modified header. */ +string pkgAcqIndexRel::Custom600Headers() { - string PartialDir = _config->FindFile("Dir::State::lists") + "/partial/"; + string Final = _config->FindDir("Dir::State::lists"); + Final += URItoFileName(Location->ReleaseURI()); + + struct stat Buf; + if (stat(Final.c_str(),&Buf) != 0) + return string(); - return PartialDir + URItoFileName(Location->ReleaseURI()); + return "\nLast-Modified: " + TimeRFC1123(Buf.st_mtime); } /*}}}*/ diff --git a/apt-pkg/acquire-item.h b/apt-pkg/acquire-item.h index 6ab8859e4..8b2d6e908 100644 --- a/apt-pkg/acquire-item.h +++ b/apt-pkg/acquire-item.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-item.h,v 1.1 1998/10/15 06:59:59 jgg Exp $ +// $Id: acquire-item.h,v 1.2 1998/10/22 04:56:39 jgg Exp $ /* ###################################################################### Acquire Item - Item to acquire @@ -30,16 +30,20 @@ class pkgAcquire::Item protected: pkgAcquire *Owner; - inline void QueueURI(string URI) {Owner->Enqueue(this,URI);}; + inline void QueueURI(string URI,string Description) + {Owner->Enqueue(this,URI,Description);}; public: + // Number of queues we are inserted into unsigned int QueueCounter; - string Description; - virtual string ToFile() = 0; - virtual void Failed() {}; + // File to write the fetch into + string DestFile; + virtual void Failed() {}; + virtual string Custom600Headers() {return string();}; + Item(pkgAcquire *Owner); virtual ~Item(); }; @@ -53,7 +57,7 @@ class pkgAcqIndex : public pkgAcquire::Item public: - virtual string ToFile(); + virtual string Custom600Headers(); pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location); }; @@ -67,10 +71,9 @@ class pkgAcqIndexRel : public pkgAcquire::Item public: - virtual string ToFile(); - + virtual string Custom600Headers(); + pkgAcqIndexRel(pkgAcquire *Owner,const pkgSourceList::Item *Location); }; - #endif diff --git a/apt-pkg/acquire-worker.cc b/apt-pkg/acquire-worker.cc index 688c5e220..756b30959 100644 --- a/apt-pkg/acquire-worker.cc +++ b/apt-pkg/acquire-worker.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-worker.cc,v 1.3 1998/10/20 04:33:12 jgg Exp $ +// $Id: acquire-worker.cc,v 1.4 1998/10/22 04:56:40 jgg Exp $ /* ###################################################################### Acquire Worker @@ -16,6 +16,7 @@ #pragma implementation "apt-pkg/acquire-worker.h" #endif #include <apt-pkg/acquire-worker.h> +#include <apt-pkg/acquire-item.h> #include <apt-pkg/configuration.h> #include <apt-pkg/error.h> #include <apt-pkg/fileutl.h> @@ -29,11 +30,12 @@ // Worker::Worker - Constructor for Queue startup /*{{{*/ // --------------------------------------------------------------------- /* */ -pkgAcquire::Worker::Worker(Queue *Q,string Acc) +pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf) { OwnerQ = Q; - Config = 0; - Access = Acc; + Config = Cnf; + Access = Cnf->Access; + CurrentItem = 0; Construct(); } @@ -46,7 +48,8 @@ pkgAcquire::Worker::Worker(MethodConfig *Cnf) OwnerQ = 0; Config = Cnf; Access = Cnf->Access; - + CurrentItem = 0; + Construct(); } /*}}}*/ @@ -55,10 +58,13 @@ pkgAcquire::Worker::Worker(MethodConfig *Cnf) /* */ void pkgAcquire::Worker::Construct() { - Next = 0; + NextQueue = 0; + NextAcquire = 0; Process = -1; InFd = -1; OutFd = -1; + OutReady = false; + InReady = false; Debug = _config->FindB("Debug::pkgAcquire::Worker",false); } /*}}}*/ @@ -71,7 +77,11 @@ pkgAcquire::Worker::~Worker() close(OutFd); if (Process > 0) + { kill(Process,SIGINT); + if (waitpid(Process,0,0) != Process) + _error->Warning("I waited but nothing was there!"); + } } /*}}}*/ // Worker::Start - Start the worker process /*{{{*/ @@ -133,6 +143,8 @@ bool pkgAcquire::Worker::Start() SetNonBlock(Pipes[3],true); close(Pipes[1]); close(Pipes[2]); + OutReady = false; + InReady = true; // Read the configuration data if (WaitFd(InFd) == false || @@ -140,70 +152,18 @@ bool pkgAcquire::Worker::Start() return _error->Error("Method %s did not start correctly",Method.c_str()); RunMessages(); + SendConfiguration(); return true; } /*}}}*/ // Worker::ReadMessages - Read all pending messages into the list /*{{{*/ // --------------------------------------------------------------------- -/* This pulls full messages from the input FD into the message buffer. - It assumes that messages will not pause during transit so no - fancy buffering is used. */ +/* */ bool pkgAcquire::Worker::ReadMessages() { - char Buffer[4000]; - char *End = Buffer; - - while (1) - { - int Res = read(InFd,End,sizeof(Buffer) - (End-Buffer)); - - // Process is dead, this is kind of bad.. - if (Res == 0) - { - if (waitpid(Process,0,0) != Process) - _error->Warning("I waited but nothing was there!"); - Process = -1; - close(InFd); - close(OutFd); - InFd = -1; - OutFd = -1; - return false; - } - - // No data - if (Res == -1) - return true; - - End += Res; - - // Look for the end of the message - for (char *I = Buffer; I < End; I++) - { - if (I[0] != '\n' || I[1] != '\n') - continue; - - // Pull the message out - string Message(Buffer,0,I-Buffer); - - // Fix up the buffer - for (; I < End && *I == '\n'; I++); - End -= I-Buffer; - memmove(Buffer,I,End-Buffer); - I = Buffer; - - if (Debug == true) - clog << "Message " << Access << ':' << QuoteString(Message,"\n") << endl; - - MessageQueue.push_back(Message); - } - if (End == Buffer) - return true; - - if (WaitFd(InFd) == false) - return false; - } - + if (::ReadMessages(InFd,MessageQueue) == false) + return MethodFailure(); return true; } /*}}}*/ @@ -218,6 +178,9 @@ bool pkgAcquire::Worker::RunMessages() { string Message = MessageQueue.front(); MessageQueue.erase(MessageQueue.begin()); + + if (Debug == true) + clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl; // Fetch the message number char *End; @@ -228,10 +191,39 @@ bool pkgAcquire::Worker::RunMessages() // Determine the message number and dispatch switch (Number) { + // 100 Capabilities case 100: if (Capabilities(Message) == false) return _error->Error("Unable to process Capabilities message from %s",Access.c_str()); break; + + // 101 Log + case 101: + if (Debug == true) + clog << " <- (log) " << LookupTag(Message,"Message") << endl; + break; + + // 102 Status + case 102: + Status = LookupTag(Message,"Message"); + break; + + // 200 URI Start + case 200: + break; + + // 201 URI Done + case 201: + break; + + // 400 URI Failure + case 400: + break; + + // 401 General Failure + case 401: + _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str()); + break; } } return true; @@ -249,15 +241,142 @@ bool pkgAcquire::Worker::Capabilities(string Message) Config->Version = LookupTag(Message,"Version"); Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false); Config->PreScan = StringToBool(LookupTag(Message,"Pre-Scan"),false); + Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false); + Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false); // Some debug text if (Debug == true) { clog << "Configured access method " << Config->Access << endl; - clog << "Version: " << Config->Version << " SingleInstance: " << - Config->SingleInstance << " PreScan: " << Config->PreScan << endl; + clog << "Version:" << Config->Version << " SingleInstance:" << + Config->SingleInstance << " PreScan: " << Config->PreScan << + " Pipeline:" << Config->Pipeline << " SendConfig:" << + Config->SendConfig << endl; } return true; } /*}}}*/ +// Worker::SendConfiguration - Send the config to the method /*{{{*/ +// --------------------------------------------------------------------- +/* */ +bool pkgAcquire::Worker::SendConfiguration() +{ + if (Config->SendConfig == false) + return true; + + if (OutFd == -1) + return false; + + string Message = "601 Configuration\n"; + Message.reserve(2000); + + /* Write out all of the configuration directives by walking the + configuration tree */ + const Configuration::Item *Top = _config->Tree(0); + for (; Top != 0;) + { + if (Top->Value.empty() == false) + { + string Line = "Config-Item: " + Top->FullTag() + "="; + Line += QuoteString(Top->Value,"\n") + '\n'; + Message += Line; + } + + if (Top->Child != 0) + { + Top = Top->Child; + continue; + } + + while (Top != 0 && Top->Next == 0) + Top = Top->Parent; + if (Top != 0) + Top = Top->Next; + } + Message += '\n'; + + if (Debug == true) + clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl; + OutQueue += Message; + OutReady = true; + + return true; +} + /*}}}*/ +// Worker::QueueItem - Add an item to the outbound queue /*{{{*/ +// --------------------------------------------------------------------- +/* Send a URI Acquire message to the method */ +bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item) +{ + if (OutFd == -1) + return false; + + string Message = "600 URI Acquire\n"; + Message.reserve(300); + Message += "URI: " + Item->URI; + Message += "\nFilename: " + Item->Owner->DestFile; + Message += Item->Owner->Custom600Headers(); + Message += "\n\n"; + + if (Debug == true) + clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl; + OutQueue += Message; + OutReady = true; + + return true; +} + /*}}}*/ +// Worker::OutFdRead - Out bound FD is ready /*{{{*/ +// --------------------------------------------------------------------- +/* */ +bool pkgAcquire::Worker::OutFdReady() +{ + int Res = write(OutFd,OutQueue.begin(),OutQueue.length()); + if (Res <= 0) + return MethodFailure(); + + // Hmm.. this should never happen. + if (Res < 0) + return true; + + OutQueue.erase(0,Res); + if (OutQueue.empty() == true) + OutReady = false; + + return true; +} + /*}}}*/ +// Worker::InFdRead - In bound FD is ready /*{{{*/ +// --------------------------------------------------------------------- +/* */ +bool pkgAcquire::Worker::InFdReady() +{ + if (ReadMessages() == false) + return false; + RunMessages(); + return true; +} + /*}}}*/ +// Worker::MethodFailure - Called when the method fails /*{{{*/ +// --------------------------------------------------------------------- +/* This is called when the method is belived to have failed, probably because + read returned -1. */ +bool pkgAcquire::Worker::MethodFailure() +{ + cerr << "Method " << Access << " has died unexpectedly!" << endl; + if (waitpid(Process,0,0) != Process) + _error->Warning("I waited but nothing was there!"); + Process = -1; + close(InFd); + close(OutFd); + InFd = -1; + OutFd = -1; + OutReady = false; + InReady = false; + OutQueue = string(); + MessageQueue.erase(MessageQueue.begin(),MessageQueue.end()); + + return false; +} + /*}}}*/ diff --git a/apt-pkg/acquire-worker.h b/apt-pkg/acquire-worker.h index 28072373f..d128ec8b2 100644 --- a/apt-pkg/acquire-worker.h +++ b/apt-pkg/acquire-worker.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-worker.h,v 1.2 1998/10/20 02:39:14 jgg Exp $ +// $Id: acquire-worker.h,v 1.3 1998/10/22 04:56:42 jgg Exp $ /* ###################################################################### Acquire Worker - Worker process manager @@ -21,10 +21,15 @@ // Interfacing to the method process class pkgAcquire::Worker { + friend pkgAcquire; + protected: friend Queue; - Worker *Next; + /* Linked list starting at a Queue and a linked list starting + at Acquire */ + Worker *NextQueue; + Worker *NextAcquire; // The access association Queue *OwnerQ; @@ -35,27 +40,40 @@ class pkgAcquire::Worker pid_t Process; int InFd; int OutFd; + bool InReady; + bool OutReady; // Various internal things bool Debug; vector<string> MessageQueue; - + string OutQueue; + // Private constructor helper void Construct(); // Message handling things bool ReadMessages(); bool RunMessages(); + bool InFdReady(); + bool OutFdReady(); // The message handlers bool Capabilities(string Message); + bool SendConfiguration(); + + bool MethodFailure(); public: + pkgAcquire::Queue::QItem *CurrentItem; + + string Status; + // Load the method and do the startup + bool QueueItem(pkgAcquire::Queue::QItem *Item); bool Start(); - Worker(Queue *OwnerQ,string Access); + Worker(Queue *OwnerQ,MethodConfig *Config); Worker(MethodConfig *Config); ~Worker(); }; diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc index ad5016b28..80fee9af2 100644 --- a/apt-pkg/acquire.cc +++ b/apt-pkg/acquire.cc @@ -1,10 +1,15 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire.cc,v 1.2 1998/10/20 02:39:15 jgg Exp $ +// $Id: acquire.cc,v 1.3 1998/10/22 04:56:43 jgg Exp $ /* ###################################################################### Acquire - File Acquiration + The core element for the schedual system is the concept of a named + queue. Each queue is unique and each queue has a name derived from the + URI. The degree of paralization can be controled by how the queue + name is derived from the URI. + ##################################################################### */ /*}}}*/ // Include Files /*{{{*/ @@ -14,6 +19,8 @@ #include <apt-pkg/acquire.h> #include <apt-pkg/acquire-item.h> #include <apt-pkg/acquire-worker.h> +#include <apt-pkg/configuration.h> +#include <apt-pkg/error.h> #include <strutl.h> /*}}}*/ @@ -24,6 +31,16 @@ pkgAcquire::pkgAcquire() { Queues = 0; Configs = 0; + Workers = 0; + ToFetch = 0; + + string Mode = _config->Find("Acquire::Queue-Mode","host"); + if (strcasecmp(Mode.c_str(),"host") == 0) + QueueMode = QueueHost; + if (strcasecmp(Mode.c_str(),"access") == 0) + QueueMode = QueueAccess; + + Debug = _config->FindB("Debug::pkgAcquire",false); } /*}}}*/ // Acquire::~pkgAcquire - Destructor /*{{{*/ @@ -40,6 +57,13 @@ pkgAcquire::~pkgAcquire() Configs = Configs->Next; delete Jnk; } + + while (Queues != 0) + { + Queue *Jnk = Queues; + Queues = Queues->Next; + delete Jnk; + } } /*}}}*/ // Acquire::Add - Add a new item /*{{{*/ @@ -62,23 +86,97 @@ void pkgAcquire::Remove(Item *Itm) } } /*}}}*/ +// Acquire::Add - Add a worker /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Add(Worker *Work) +{ + Work->NextAcquire = Workers; + Workers = Work; +} + /*}}}*/ +// Acquire::Remove - Remove a worker /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Remove(Worker *Work) +{ + Worker **I = &Workers; + for (; *I != 0;) + { + if (*I == Work) + *I = (*I)->NextAcquire; + else + I = &(*I)->NextAcquire; + } +} + /*}}}*/ // Acquire::Enqueue - Queue an URI for fetching /*{{{*/ // --------------------------------------------------------------------- /* */ -void pkgAcquire::Enqueue(Item *Item,string URI) +void pkgAcquire::Enqueue(Item *Itm,string URI,string Description) { - cout << "Fetching " << URI << endl; - cout << " to " << Item->ToFile() << endl; - cout << " Queue is: " << QueueName(URI) << endl; + // Determine which queue to put the item in + string Name = QueueName(URI); + if (Name.empty() == true) + return; + + // Find the queue structure + Queue *I = Queues; + for (; I != 0 && I->Name != Name; I = I->Next); + if (I == 0) + { + I = new Queue(Name,this); + I->Next = Queues; + Queues = I; + } + + // Queue it into the named queue + I->Enqueue(Itm,URI,Description); + ToFetch++; + + // Some trace stuff + if (Debug == true) + { + clog << "Fetching " << URI << endl; + clog << " to " << Itm->DestFile << endl; + clog << " Queue is: " << QueueName(URI) << endl; + } } /*}}}*/ -// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/ +// Acquire::Dequeue - Remove an item from all queues /*{{{*/ // --------------------------------------------------------------------- /* */ +void pkgAcquire::Dequeue(Item *Itm) +{ + Queue *I = Queues; + for (; I != 0; I = I->Next) + I->Dequeue(Itm); + ToFetch--; +} + /*}}}*/ +// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/ +// --------------------------------------------------------------------- +/* The string returned depends on the configuration settings and the + method parameters. Given something like http://foo.org/bar it can + return http://foo.org or http */ string pkgAcquire::QueueName(string URI) { const MethodConfig *Config = GetConfig(URIAccess(URI)); - return string(); + if (Config == 0) + return string(); + + /* Single-Instance methods get exactly one queue per URI. This is + also used for the Access queue method */ + if (Config->SingleInstance == true || QueueMode == QueueAccess) + return URIAccess(URI); + + // Host based queue + string::iterator I = URI.begin(); + for (; I < URI.end() && *I != ':'; I++); + for (; I < URI.end() && (*I == '/' || *I == ':'); I++); + for (; I < URI.end() && *I != '/'; I++); + + return string(URI,0,I - URI.begin()); } /*}}}*/ // Acquire::GetConfig - Fetch the configuration information /*{{{*/ @@ -86,7 +184,7 @@ string pkgAcquire::QueueName(string URI) /* This locates the configuration structure for an access method. If a config structure cannot be found a Worker will be created to retrieve it */ -const pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access) +pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access) { // Search for an existing config MethodConfig *Conf; @@ -108,6 +206,74 @@ const pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access) return Conf; } /*}}}*/ +// Acquire::SetFds - Deal with readable FDs /*{{{*/ +// --------------------------------------------------------------------- +/* Collect FDs that have activity monitors into the fd sets */ +void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet) +{ + for (Worker *I = Workers; I != 0; I = I->NextAcquire) + { + if (I->InReady == true && I->InFd >= 0) + { + if (Fd < I->InFd) + Fd = I->InFd; + FD_SET(I->InFd,RSet); + } + if (I->OutReady == true && I->OutFd >= 0) + { + if (Fd < I->OutFd) + Fd = I->OutFd; + FD_SET(I->OutFd,WSet); + } + } +} + /*}}}*/ +// Acquire::RunFds - Deal with active FDs /*{{{*/ +// --------------------------------------------------------------------- +/* Dispatch active FDs over to the proper workers */ +void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet) +{ + for (Worker *I = Workers; I != 0; I = I->NextAcquire) + { + if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0) + I->InFdReady(); + if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0) + I->OutFdReady(); + } +} + /*}}}*/ +// Acquire::Run - Run the fetch sequence /*{{{*/ +// --------------------------------------------------------------------- +/* This runs the queues. It manages a select loop for all of the + Worker tasks. The workers interact with the queues and items to + manage the actual fetch. */ +bool pkgAcquire::Run() +{ + for (Queue *I = Queues; I != 0; I = I->Next) + I->Startup(); + + // Run till all things have been acquired + while (ToFetch > 0) + { + fd_set RFds; + fd_set WFds; + int Highest = 0; + FD_ZERO(&RFds); + FD_ZERO(&WFds); + SetFds(Highest,&RFds,&WFds); + + if (select(Highest+1,&RFds,&WFds,0,0) <= 0) + return _error->Errno("select","Select has failed"); + + RunFds(&RFds,&WFds); + } + + for (Queue *I = Queues; I != 0; I = I->Next) + I->Shutdown(); + + return true; +} + /*}}}*/ // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/ // --------------------------------------------------------------------- @@ -116,5 +282,110 @@ pkgAcquire::MethodConfig::MethodConfig() { SingleInstance = false; PreScan = false; + Pipeline = false; + SendConfig = false; + Next = 0; +} + /*}}}*/ + +// Queue::Queue - Constructor /*{{{*/ +// --------------------------------------------------------------------- +/* */ +pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name), + Owner(Owner) +{ + Items = 0; + Next = 0; + Workers = 0; +} + /*}}}*/ +// Queue::~Queue - Destructor /*{{{*/ +// --------------------------------------------------------------------- +/* */ +pkgAcquire::Queue::~Queue() +{ + Shutdown(); + + while (Items != 0) + { + QItem *Jnk = Items; + Items = Items->Next; + delete Jnk; + } +} + /*}}}*/ +// Queue::Enqueue - Queue an item to the queue /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description) +{ + // Create a new item + QItem *I = new QItem; + I->Next = Items; + Items = I; + + // Fill it in + Items->Owner = Owner; + Items->URI = URI; + Items->Description = Description; + Owner->QueueCounter++; +} + /*}}}*/ +// Queue::Dequeue - Remove and item from the queue /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Queue::Dequeue(Item *Owner) +{ + QItem **I = &Items; + for (; *I != 0;) + { + if ((*I)->Owner == Owner) + { + QItem *Jnk= *I; + *I = (*I)->Next; + Owner->QueueCounter--; + delete Jnk; + } + else + I = &(*I)->Next; + } +} + /*}}}*/ +// Queue::Startup - Start the worker processes /*{{{*/ +// --------------------------------------------------------------------- +/* */ +bool pkgAcquire::Queue::Startup() +{ + Shutdown(); + + pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(URIAccess(Name)); + if (Cnf == 0) + return false; + + Workers = new Worker(this,Cnf); + Owner->Add(Workers); + if (Workers->Start() == false) + return false; + + Workers->QueueItem(Items); + + return true; +} + /*}}}*/ +// Queue::Shutdown - Shutdown the worker processes /*{{{*/ +// --------------------------------------------------------------------- +/* */ +bool pkgAcquire::Queue::Shutdown() +{ + // Delete all of the workers + while (Workers != 0) + { + pkgAcquire::Worker *Jnk = Workers; + Workers = Workers->NextQueue; + Owner->Remove(Jnk); + delete Jnk; + } + + return true; } /*}}}*/ diff --git a/apt-pkg/acquire.h b/apt-pkg/acquire.h index 355eb3c3a..cea7c8891 100644 --- a/apt-pkg/acquire.h +++ b/apt-pkg/acquire.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire.h,v 1.2 1998/10/20 02:39:16 jgg Exp $ +// $Id: acquire.h,v 1.3 1998/10/22 04:56:44 jgg Exp $ /* ###################################################################### Acquire - File Acquiration @@ -39,6 +39,8 @@ #pragma interface "apt-pkg/acquire.h" #endif +#include <unistd.h> + class pkgAcquire { public: @@ -48,21 +50,40 @@ class pkgAcquire class Worker; struct MethodConfig; friend Item; + friend Queue; protected: + // List of items to fetch vector<Item *> Items; + + // List of active queues and fetched method configuration parameters Queue *Queues; + Worker *Workers; MethodConfig *Configs; + unsigned long ToFetch; + + // Configurable parameters for the schedular + enum {QueueHost,QueueAccess} QueueMode; + bool Debug; void Add(Item *Item); void Remove(Item *Item); - void Enqueue(Item *Item,string URI); + void Add(Worker *Work); + void Remove(Worker *Work); + + void Enqueue(Item *Item,string URI,string Description); + void Dequeue(Item *Item); + string QueueName(string URI); + + // FDSET managers for derived classes + void SetFds(int &Fd,fd_set *RSet,fd_set *WSet); + void RunFds(fd_set *RSet,fd_set *WSet); public: - const MethodConfig *GetConfig(string Access); - string QueueName(string URI); + MethodConfig *GetConfig(string Access); + bool Run(); pkgAcquire(); ~pkgAcquire(); @@ -75,12 +96,36 @@ class pkgAcquire::Queue Queue *Next; protected: - - string URIMatch; - vector<Item *> Items; + // Queued item + struct QItem + { + QItem *Next; + + string URI; + string Description; + Item *Owner; + }; + + // Name of the queue + string Name; + + // Items queued into this queue + QItem *Items; + pkgAcquire::Worker *Workers; + pkgAcquire *Owner; public: + + // Put an item into this queue + void Enqueue(Item *Owner,string URI,string Description); + void Dequeue(Item *Owner); + + bool Startup(); + bool Shutdown(); + + Queue(string Name,pkgAcquire *Owner); + ~Queue(); }; // Configuration information from each method @@ -93,6 +138,8 @@ struct pkgAcquire::MethodConfig string Version; bool SingleInstance; bool PreScan; + bool Pipeline; + bool SendConfig; MethodConfig(); }; diff --git a/apt-pkg/contrib/configuration.cc b/apt-pkg/contrib/configuration.cc index 82418f9c2..fa07ed35a 100644 --- a/apt-pkg/contrib/configuration.cc +++ b/apt-pkg/contrib/configuration.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: configuration.cc,v 1.7 1998/10/20 02:39:26 jgg Exp $ +// $Id: configuration.cc,v 1.8 1998/10/22 04:56:45 jgg Exp $ /* ###################################################################### Configuration Class @@ -33,7 +33,7 @@ Configuration::Configuration() Root = new Item; } /*}}}*/ -// Configuration::Lookup - Lookup a single item /*{{{*/ +// Configuration::Lookup - Lookup a single item /*{{{*/ // --------------------------------------------------------------------- /* This will lookup a single item by name below another item. It is a helper function for the main lookup function */ @@ -66,6 +66,9 @@ Configuration::Item *Configuration::Lookup(Item *Head,const char *S, new items */ Configuration::Item *Configuration::Lookup(const char *Name,bool Create) { + if (Name == 0) + return Root->Child; + const char *Start = Name; const char *End = Start + strlen(Name); const char *TagEnd = Name; @@ -210,6 +213,17 @@ bool Configuration::Exists(const char *Name) } /*}}}*/ +// Configuration::Item::FullTag - Return the fully scoped tag /*{{{*/ +// --------------------------------------------------------------------- +/* */ +string Configuration::Item::FullTag() const +{ + if (Parent == 0 || Parent->Parent == 0) + return Tag; + return Parent->FullTag() + "::" + Tag; +} + /*}}}*/ + // ReadConfigFile - Read a configuration file /*{{{*/ // --------------------------------------------------------------------- /* The configuration format is very much like the named.conf format diff --git a/apt-pkg/contrib/configuration.h b/apt-pkg/contrib/configuration.h index c98b0bb14..14c80e4ad 100644 --- a/apt-pkg/contrib/configuration.h +++ b/apt-pkg/contrib/configuration.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: configuration.h,v 1.5 1998/10/20 02:39:27 jgg Exp $ +// $Id: configuration.h,v 1.6 1998/10/22 04:56:46 jgg Exp $ /* ###################################################################### Configuration Class @@ -39,6 +39,9 @@ class Configuration Item *Parent; Item *Child; Item *Next; + + string FullTag() const; + Item() : Child(0), Next(0) {}; }; Item *Root; @@ -61,6 +64,8 @@ class Configuration inline bool Exists(string Name) {return Exists(Name.c_str());}; bool Exists(const char *Name); + inline const Item *Tree(const char *Name) {return Lookup(Name,false);}; + Configuration(); }; diff --git a/apt-pkg/contrib/fileutl.cc b/apt-pkg/contrib/fileutl.cc index bfc674c62..3d5c4686b 100644 --- a/apt-pkg/contrib/fileutl.cc +++ b/apt-pkg/contrib/fileutl.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: fileutl.cc,v 1.10 1998/10/20 04:33:16 jgg Exp $ +// $Id: fileutl.cc,v 1.11 1998/10/22 04:56:47 jgg Exp $ /* ###################################################################### File Utilities @@ -136,8 +136,8 @@ void SetCloseExec(int Fd,bool Close) /* */ void SetNonBlock(int Fd,bool Block) { - int Flags = fcntl(Fd,F_GETFL); - if (fcntl(Fd,F_SETFL,(Flags & ~O_NONBLOCK) | (Block == false)?0:O_NONBLOCK) != 0) + int Flags = fcntl(Fd,F_GETFL) & (~O_NONBLOCK); + if (fcntl(Fd,F_SETFL,Flags | ((Block == false)?0:O_NONBLOCK)) != 0) { cerr << "FATAL -> Could not set non-blocking flag " << strerror(errno) << endl; exit(100); @@ -153,8 +153,10 @@ bool WaitFd(int Fd) fd_set Set; FD_ZERO(&Set); FD_SET(Fd,&Set); + if (select(Fd+1,&Set,0,0,0) <= 0) return false; + return true; } /*}}}*/ diff --git a/apt-pkg/contrib/strutl.cc b/apt-pkg/contrib/strutl.cc index c615f6229..04a3c7bb7 100644 --- a/apt-pkg/contrib/strutl.cc +++ b/apt-pkg/contrib/strutl.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: strutl.cc,v 1.5 1998/10/20 02:39:30 jgg Exp $ +// $Id: strutl.cc,v 1.6 1998/10/22 04:56:48 jgg Exp $ /* ###################################################################### String Util - Some usefull string functions. @@ -14,9 +14,12 @@ /*}}}*/ // Includes /*{{{*/ #include <strutl.h> +#include <apt-pkg/fileutl.h> + #include <ctype.h> #include <string.h> #include <stdio.h> +#include <time.h> /*}}}*/ // strstrip - Remove white space from the front and back of a string /*{{{*/ @@ -310,7 +313,7 @@ string URIAccess(string URI) { string::size_type Pos = URI.find(':'); if (Pos == string::npos) - return string(); + return URI; return string(URI,0,Pos); } /*}}}*/ @@ -472,3 +475,71 @@ int StringToBool(string Text,int Default = -1) return Default; } /*}}}*/ +// TimeRFC1123 - Convert a time_t into RFC1123 format /*{{{*/ +// --------------------------------------------------------------------- +/* This converts a time_t into a string time representation that is + year 2000 complient and timezone neutral */ +string TimeRFC1123(time_t Date) +{ + struct tm Conv = *gmtime(&Date); + char Buf[300]; + + const char *Day[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"}; + const char *Month[] = {"Jan","Feb","Mar","Apr","May","Jun","Jul", + "Aug","Sep","Oct","Nov","Dec"}; + + sprintf(Buf,"%s, %02i %s %i %02i:%02i:%02i GMT",Day[Conv.tm_wday], + Conv.tm_mday,Month[Conv.tm_mon],Conv.tm_year+1900,Conv.tm_hour, + Conv.tm_min,Conv.tm_sec); + return Buf; +} + /*}}}*/ +// ReadMessages - Read messages from the FD /*{{{*/ +// --------------------------------------------------------------------- +/* This pulls full messages from the input FD into the message buffer. + It assumes that messages will not pause during transit so no + fancy buffering is used. */ +bool ReadMessages(int Fd, vector<string> &List) +{ + char Buffer[4000]; + char *End = Buffer; + + while (1) + { + int Res = read(Fd,End,sizeof(Buffer) - (End-Buffer)); + + // Process is dead, this is kind of bad.. + if (Res == 0) + return false; + + // No data + if (Res <= 0) + return true; + + End += Res; + + // Look for the end of the message + for (char *I = Buffer; I < End; I++) + { + if (I[0] != '\n' || I[1] != '\n') + continue; + + // Pull the message out + string Message(Buffer,0,I-Buffer); + + // Fix up the buffer + for (; I < End && *I == '\n'; I++); + End -= I-Buffer; + memmove(Buffer,I,End-Buffer); + I = Buffer; + + List.push_back(Message); + } + if (End == Buffer) + return true; + + if (WaitFd(Fd) == false) + return false; + } +} + /*}}}*/ diff --git a/apt-pkg/contrib/strutl.h b/apt-pkg/contrib/strutl.h index 38aca5762..fca36fc38 100644 --- a/apt-pkg/contrib/strutl.h +++ b/apt-pkg/contrib/strutl.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: strutl.h,v 1.5 1998/10/20 02:39:31 jgg Exp $ +// $Id: strutl.h,v 1.6 1998/10/22 04:56:49 jgg Exp $ /* ###################################################################### String Util - These are some usefull string functions @@ -20,6 +20,7 @@ #include <stdlib.h> #include <string> +#include <vector> char *_strstrip(char *String); char *_strtabexpand(char *String,size_t Len); @@ -32,12 +33,14 @@ string SubstVar(string Str,string Subst,string Contents); string Base64Encode(string Str); string URItoFileName(string URI); string URIAccess(string URI); +string TimeRFC1123(time_t Date); +string LookupTag(string Message,const char *Tag,const char *Default = 0); +int StringToBool(string Text,int Default = -1); +bool ReadMessages(int Fd, vector<string> &List); int stringcmp(const char *A,const char *AEnd,const char *B,const char *BEnd); inline int stringcmp(const char *A,const char *AEnd,const char *B) {return stringcmp(A,AEnd,B,B+strlen(B));}; int stringcasecmp(const char *A,const char *AEnd,const char *B,const char *BEnd); inline int stringcasecmp(const char *A,const char *AEnd,const char *B) {return stringcasecmp(A,AEnd,B,B+strlen(B));}; -string LookupTag(string Message,const char *Tag,const char *Default = 0); -int StringToBool(string Text,int Default = -1); #endif diff --git a/doc/examples/apt.conf b/doc/examples/apt.conf index 8cd9c3b43..ab1bfbc21 100644 --- a/doc/examples/apt.conf +++ b/doc/examples/apt.conf @@ -1,4 +1,4 @@ -// $Id: apt.conf,v 1.3 1998/10/20 02:41:06 jgg Exp $ +// $Id: apt.conf,v 1.4 1998/10/22 04:56:50 jgg Exp $ /* This file is an index of all APT configuration directives. It should NOT actually be used as a real config file, though it is a completely valid file. @@ -16,6 +16,11 @@ APT { }; }; +Acquire +{ + Queue-Mode "access"; // host|access +}; + Dir { @@ -50,5 +55,6 @@ DSelect { Debug { pkgProblemResolver "true"; + pkgAcquire "false"; pkgAcquire::Worker "true"; } diff --git a/methods/file.cc b/methods/file.cc index 017222d84..71c564301 100644 --- a/methods/file.cc +++ b/methods/file.cc @@ -4,6 +4,7 @@ int main() { printf("100 Capabilities\n" "Version: 1.0\n" - "Pre-Scan: true\n\n" - "Version: 1.0\n\n"); + "Pre-Scan: true\n\n"); + fflush(stdout); + sleep(10); } diff --git a/test/scratch.cc b/test/scratch.cc index 577ab5f9b..a8817bc41 100644 --- a/test/scratch.cc +++ b/test/scratch.cc @@ -1,9 +1,12 @@ #include <apt-pkg/acquire-item.h> #include <apt-pkg/init.h> #include <apt-pkg/error.h> +#include <signal.h> int main() { + signal(SIGPIPE,SIG_IGN); + pkgInitialize(*_config); pkgSourceList List; @@ -17,6 +20,8 @@ int main() if (_error->PendingError() == true) break; } + + Fetcher.Run(); _error->DumpErrors(); } |