[vlc-devel] [PATCH 08/12] dash: add persistentconnection to connectionmanager
Christopher Müller
christopher.mueller at itec.uni-klu.ac.at
Tue Mar 13 17:02:00 CET 2012
> -----Ursprüngliche Nachricht-----
> Von: Hugo Beauzée-Luyssen [mailto:beauze.h at gmail.com]
> Gesendet: Dienstag, 13. März 2012 16:32
> An: Mailing list for VLC media player developers
> Cc: Christopher Mueller
> Betreff: Re: [vlc-devel] [PATCH 08/12] dash: add persistentconnection
> to connectionmanager
>
> On Tue, Mar 13, 2012 at 3:18 PM, <Christopher at mailsrv.uni-klu.ac.at>
> wrote:
> > From: Christopher Mueller <christopher.mueller at itec.aau.at>
> >
> > ---
> > modules/stream_filter/dash/DASHDownloader.cpp | 40 ++---
> > modules/stream_filter/dash/DASHDownloader.h | 3 +-
> > modules/stream_filter/dash/DASHManager.cpp | 8 +-
> > modules/stream_filter/dash/DASHManager.h | 2 +-
> > modules/stream_filter/dash/dash.cpp | 21 +--
> > .../dash/http/HTTPConnectionManager.cpp | 189
> > ++++++++++---------
> > .../dash/http/HTTPConnectionManager.h | 46 +++---
> > 7 files changed, 149 insertions(+), 160 deletions(-)
> >
> > diff --git a/modules/stream_filter/dash/DASHDownloader.cpp
> > b/modules/stream_filter/dash/DASHDownloader.cpp
> > index 0fd5c92..9388e34 100644
> > --- a/modules/stream_filter/dash/DASHDownloader.cpp
> > +++ b/modules/stream_filter/dash/DASHDownloader.cpp
> > @@ -32,11 +32,11 @@ using namespace dash::http;
> > using namespace dash::logic;
> > using namespace dash::buffer;
> >
> > -DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager,
> > IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
> > +
> > +DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager,
> > +BlockBuffer *buffer)
> > {
> > this->t_sys = (thread_sys_t *)
> > malloc(sizeof(thread_sys_t));
> > this->t_sys->conManager = conManager;
> > - this->t_sys->adaptationLogic = adaptationLogic;
> > this->t_sys->buffer = buffer;
> > }
> > DASHDownloader::~DASHDownloader ()
> > @@ -57,42 +57,24 @@ void* DASHDownloader::download (void
> > *thread_sys)
> > {
> > thread_sys_t *t_sys = (thread_sys_t *)
> > thread_sys;
> > HTTPConnectionManager *conManager = t_sys->conManager;
> > - IAdaptationLogic *adaptationLogic =
> > t_sys->adaptationLogic;
> > BlockBuffer *buffer = t_sys->buffer;
> > - Chunk *currentChunk = NULL;
> > block_t *block =
> > block_Alloc(BLOCKSIZE);
> > + int ret = 0;
> >
> > do
> > {
> > - if(currentChunk == NULL)
> > - {
> > - currentChunk = adaptationLogic->getNextChunk();
> > - if(currentChunk == NULL)
> > - {
> > - buffer->setEOF(true);
> > - }
> > - }
> > - else
> > + ret = conManager->read(block);
> > + if(ret > 0)
> > {
> > - int ret = conManager->read(currentChunk, block-
> >p_buffer,
> > block->i_buffer);
> > - if(ret <= 0)
> > - {
> > - currentChunk = NULL;
> > - }
> > - else
> > - {
> > - block_t *bufBlock = block_Alloc(ret);
> > - memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> > -
> > - if(currentChunk->getBitrate() <= 0)
> > - currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
> > + block_t *bufBlock = block_Alloc(ret);
> > + memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> >
> > - bufBlock->i_length = (mtime_t)((ret * 8) /
> > ((float)currentChunk->getBitrate() / 1000000));
> > - buffer->put(bufBlock);
> > - }
> > + bufBlock->i_length = block->i_length;
> > + buffer->put(bufBlock);
> > }
> > - }while(!buffer->getEOF());
> > + }while(ret && !buffer->getEOF());
> >
> > + buffer->setEOF(true);
> > block_Release(block);
> >
> > return NULL;
> > diff --git a/modules/stream_filter/dash/DASHDownloader.h
> > b/modules/stream_filter/dash/DASHDownloader.h
> > index 1fcdf19..7807ec5 100644
> > --- a/modules/stream_filter/dash/DASHDownloader.h
> > +++ b/modules/stream_filter/dash/DASHDownloader.h
> > @@ -39,14 +39,13 @@ namespace dash
> > struct thread_sys_t
> > {
> > dash::http::HTTPConnectionManager *conManager;
> > - logic::IAdaptationLogic *adaptationLogic;
> > buffer::BlockBuffer *buffer;
> > };
> >
> > class DASHDownloader
> > {
> > public:
> > - DASHDownloader (http::HTTPConnectionManager
> > *conManager, logic::IAdaptationLogic *adaptationLogic,
> > buffer::BlockBuffer *buffer);
> > + DASHDownloader (http::HTTPConnectionManager
> > + *conManager, buffer::BlockBuffer *buffer);
> > virtual ~DASHDownloader ();
> >
> > bool start (); diff --git
> > a/modules/stream_filter/dash/DASHManager.cpp
> > b/modules/stream_filter/dash/DASHManager.cpp
> > index f649015..bb2f3f0 100644
> > --- a/modules/stream_filter/dash/DASHManager.cpp
> > +++ b/modules/stream_filter/dash/DASHManager.cpp
> > @@ -34,9 +34,8 @@ using namespace dash::logic;
> > using namespace dash::mpd;
> > using namespace dash::buffer;
> >
> > -DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD
> > *mpd,
> > +DASHManager::DASHManager ( MPD *mpd,
> > IAdaptationLogic::LogicType type,
> stream_t *stream) :
> > - conManager ( conManager ),
> > currentChunk ( NULL ),
> > adaptationLogic( NULL ),
> > logicType ( type ),
> > @@ -51,8 +50,10 @@ DASHManager::DASHManager (
> HTTPConnectionManager
> > *conManager, MPD *mpd,
> > if ( this->adaptationLogic == NULL )
> > return ;
> >
> > + this->conManager = new
> > + dash::http::HTTPConnectionManager(this->adaptationLogic,
> > + this->stream);
> > +
> > this->buffer = new BlockBuffer(this->stream);
> > - this->downloader = new DASHDownloader(this->conManager,
> > this->adaptationLogic, this->buffer);
> > + this->downloader = new DASHDownloader(this->conManager,
> > + this->buffer);
> >
> > this->conManager->attach(this->adaptationLogic);
> > this->buffer->attach(this->adaptationLogic);
> > @@ -61,6 +62,7 @@ DASHManager::~DASHManager ()
> > {
> > delete this->downloader;
> > delete this->buffer;
> > + delete this->conManager;
> > delete this->adaptationLogic;
> > delete this->mpdManager;
> > }
> > diff --git a/modules/stream_filter/dash/DASHManager.h
> > b/modules/stream_filter/dash/DASHManager.h
> > index a09e98e..ab76a4a 100644
> > --- a/modules/stream_filter/dash/DASHManager.h
> > +++ b/modules/stream_filter/dash/DASHManager.h
> > @@ -40,7 +40,7 @@ namespace dash
> > class DASHManager
> > {
> > public:
> > - DASHManager( http::HTTPConnectionManager *conManager,
> > mpd::MPD *mpd,
> > + DASHManager( mpd::MPD *mpd,
> > logic::IAdaptationLogic::LogicType type,
> > stream_t *stream);
> > virtual ~DASHManager ();
> >
> > diff --git a/modules/stream_filter/dash/dash.cpp
> > b/modules/stream_filter/dash/dash.cpp
> > index d1ca988..ee42beb 100644
> > --- a/modules/stream_filter/dash/dash.cpp
> > +++ b/modules/stream_filter/dash/dash.cpp
> > @@ -74,11 +74,10 @@ vlc_module_end ()
> >
> >
> **********************************************************************
> > *******/
> > struct stream_sys_t
> > {
> > - dash::DASHManager *p_dashManager;
> > - dash::http::HTTPConnectionManager *p_conManager;
> > - dash::mpd::MPD *p_mpd;
> > - int position;
> > - bool isLive;
> > + dash::DASHManager *p_dashManager;
> > + dash::mpd::MPD *p_mpd;
> > + int position;
> > + bool isLive;
> > };
> >
> > static int Read (stream_t *p_stream, void *p_buffer,
> > unsigned int i_len); @@ -114,24 +113,20 @@ static int
> > Open(vlc_object_t *p_obj)
> > return VLC_ENOMEM;
> >
> > p_sys->p_mpd = mpd;
> > - dash::http::HTTPConnectionManager *p_conManager =
> > - new dash::http::HTTPConnectionManager(
> > p_stream );
> > - dash::DASHManager*p_dashManager =
> > - new dash::DASHManager( p_conManager, p_sys->p_mpd,
> > -
> > dash::logic::IAdaptationLogic::RateBased, p_stream);
> > + dash::DASHManager*p_dashManager = new
> > + dash::DASHManager(p_sys->p_mpd,
> > +
> > + dash::logic::IAdaptationLogic::RateBased,
> > + p_stream);
> >
> > if ( p_dashManager->getMpdManager() == NULL ||
> > p_dashManager->getMpdManager()->getMPD() == NULL ||
> > p_dashManager->getAdaptionLogic() == NULL ||
> > p_dashManager->start() == false)
> > {
> > - delete p_conManager;
> > delete p_dashManager;
> > free( p_sys );
> > return VLC_EGENERIC;
> > }
> > p_sys->p_dashManager = p_dashManager;
> > - p_sys->p_conManager = p_conManager;
> > p_sys->position = 0;
> > p_sys->isLive =
> > p_dashManager->getMpdManager()->getMPD()->isLive();
> > p_stream->p_sys = p_sys;
> > @@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
> > stream_t *p_stream = (stream_t*)
> > p_obj;
> > stream_sys_t *p_sys =
> > (stream_sys_t *) p_stream->p_sys;
> > dash::DASHManager *p_dashManager =
> > p_sys->p_dashManager;
> > - dash::http::HTTPConnectionManager *p_conManager =
> > p_sys->p_conManager;
> >
> > - delete(p_conManager);
> > delete(p_dashManager);
> > free(p_sys);
> > }
> > diff --git
> a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> > b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> > index c0e18b2..2059d05 100644
> > --- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> > +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> > @@ -31,127 +31,136 @@
> > using namespace dash::http;
> > using namespace dash::logic;
> >
> > -HTTPConnectionManager::HTTPConnectionManager (stream_t *stream)
> > +const int HTTPConnectionManager::PIPELINE = 80; const
> > +int HTTPConnectionManager::PIPELINELENGTH = 2; const int
> > +HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
> > +
> > +HTTPConnectionManager::HTTPConnectionManager
> (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
> > + adaptationLogic (adaptationLogic),
> > + stream (stream),
> > + chunkCount (0),
> > + bpsAvg (0),
> > + bpsLastChunk (0),
> > + bpsCurrentChunk (0),
> > + bytesReadSession (0),
> > + bytesReadChunk (0),
> > + timeSession (0),
> > + timeChunk (0)
> > {
> > - this->timeSecSession = 0;
> > - this->bytesReadSession = 0;
> > - this->timeSecChunk = 0;
> > - this->bytesReadChunk = 0;
> > - this->bpsAvg = 0;
> > - this->bpsLastChunk = 0;
> > - this->chunkCount = 0;
> > - this->stream = stream;
> > }
> > HTTPConnectionManager::~HTTPConnectionManager ()
> > {
> > this->closeAllConnections();
> > }
> >
> > -bool HTTPConnectionManager::closeConnection(
> > IHTTPConnection *con )
> > +void
> > +HTTPConnectionManager::closeAllConnections ()
> > {
> > - for(std::map<Chunk*, HTTPConnection *>::iterator it =
> > this->chunkMap.begin();
> > - it != this->chunkMap.end(); ++it)
> > - {
> > - if( it->second == con )
> > - {
> > - delete con;
> > - this->chunkMap.erase( it );
> > - return true;
> > - }
> > - }
> > - return false;
> > + vlc_delete_all(this->connectionPool);
> > + vlc_delete_all(this->downloadQueue);
> > }
> > -
> > -bool HTTPConnectionManager::closeConnection( Chunk
> > *chunk ) -{
> > - HTTPConnection *con = this->chunkMap[chunk];
> > - bool ret = this->closeConnection(con);
> > - this->chunkMap.erase(chunk);
> > - delete(chunk);
> > - return ret;
> > -}
> > -
> > -void HTTPConnectionManager::closeAllConnections
> > ()
> > +int HTTPConnectionManager::read
>
> > +(block_t *block)
> > {
> > - std::map<Chunk *, HTTPConnection *>::iterator it;
> > + if(this->downloadQueue.size() == 0)
> > + if(!this->addChunk(this->adaptationLogic->getNextChunk()))
> > + return 0;
> >
> > - for(it = this->chunkMap.begin(); it != this->chunkMap.end();
> > ++it)
> > - delete(it->second);
> > + if(this->downloadQueue.front()->getPercentDownloaded() >
> > + this->PIPELINE &&
> > + this->downloadQueue.size() < this->PIPELINELENGTH)
> > + this->addChunk(this->adaptationLogic->getNextChunk());
> >
> > - this->chunkMap.clear();
> > -}
> > -
> > -int HTTPConnectionManager::read( Chunk *chunk, void
> > *p_buffer, size_t len ) -{
> > - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> > - {
> > - this->bytesReadChunk = 0;
> > - this->timeSecChunk = 0;
> > -
> > - if ( this->initConnection( chunk ) == NULL )
> > - return -1;
> > - }
> > + int ret = 0;
> >
> > mtime_t start = mdate();
> > - int ret = this->chunkMap[chunk]->read(p_buffer, len);
> > + ret =
> > + this->downloadQueue.front()->getConnection()->read(block->p_buffer,
> > + block->i_buffer);
> > mtime_t end = mdate();
> >
> > - if( ret <= 0 )
> > - this->closeConnection( chunk );
> > - else
> > - {
> > - double time = ((double)(end - start)) / 1000000;
> > -
> > - this->bytesReadSession += ret;
> > - this->bytesReadChunk += ret;
> > - this->timeSecSession += time;
> > - this->timeSecChunk += time;
> > -
> > + block->i_length = (mtime_t)((ret * 8) /
> > + ((float)this->downloadQueue.front()->getBitrate() / 1000000));
> >
> > - if(this->timeSecSession > 0)
> > - this->bpsAvg = (this->bytesReadSession /
> > this->timeSecSession) * 8;
> > + double time = ((double)(end - start)) / 1000000;
> >
> > - if(this->timeSecChunk > 0)
> > - this->bpsLastChunk = (this->bytesReadChunk /
> > this->timeSecChunk) * 8;
> > -
> > - if(this->chunkCount < 2)
> > - this->bpsAvg = 0;
> > + if(ret <= 0)
> > + {
> > + this->bpsLastChunk = this->bpsCurrentChunk;
> > + this->bytesReadChunk = 0;
> > + this->timeChunk = 0;
> >
> > - if(this->chunkCount < 2)
> > - this->bpsLastChunk = 0;
> > + delete(this->downloadQueue.front());
> > + this->downloadQueue.pop_front();
> >
> > - this->notify();
> > + return this->read(block);
> > }
> > - return ret;
> > -}
> > -
> > -int HTTPConnectionManager::peek
> > (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek) -{
> > - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> > + else
> > {
> > - if ( this->initConnection(chunk) == NULL )
> > - return -1;
> > + this->updateStatistics(ret, time);
> > }
> > - return this->chunkMap[chunk]->peek(pp_peek, i_peek); -}
> >
> > -IHTTPConnection* HTTPConnectionManager::initConnection(Chunk
> > *chunk) -{
> > - HTTPConnection *con = new HTTPConnection(this->stream);
> > - if ( con->init(chunk) == false )
> > - return NULL;
> > - this->chunkMap[chunk] = con;
> > - this->chunkCount++;
> > - return con;
> > + return ret;
> > }
> > -void HTTPConnectionManager::attach
> > (IDownloadRateObserver *observer)
> > +void HTTPConnectionManager::attach
>
> > +(IDownloadRateObserver *observer)
> > {
> > this->rateObservers.push_back(observer);
> > }
> > -void HTTPConnectionManager::notify
> > ()
> > +void HTTPConnectionManager::notify
>
> > +()
> > {
> > if ( this->bpsAvg == 0 )
> > return ;
> > for(size_t i = 0; i < this->rateObservers.size(); i++)
> > this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg,
> > this->bpsLastChunk);
> > }
> > +std::vector<PersistentConnection *>
> > +HTTPConnectionManager::getConnectionsForHost (const std::string
> > +&hostname) {
> > + std::vector<PersistentConnection *> cons;
> > +
> > + for(size_t i = 0; i < this->connectionPool.size(); i++)
> > +
> > + if(!this->connectionPool.at(i)->getHostname().compare(hostname) ||
> > + !this->connectionPool.at(i)->isConnected())
> > + cons.push_back(this->connectionPool.at(i));
> > +
> > + return cons;
> > +}
> > +void
> > +HTTPConnectionManager::updateStatistics (int bytes, double
> > +time) {
> > + this->bytesReadSession += bytes;
> > + this->bytesReadChunk += bytes;
> > + this->timeSession += time;
> > + this->timeChunk += time;
> > +
> > + this->bpsAvg = (int64_t) ((this->bytesReadSession *
> 8)
> > + / this->timeSession);
> > + this->bpsCurrentChunk = (int64_t) ((this->bytesReadChunk * 8)
> /
> > + this->timeChunk);
> > +
> > + if(this->bpsAvg < 0)
> > + this->bpsAvg = 0;
> > +
> > + if(this->bpsCurrentChunk < 0)
> > + this->bpsCurrentChunk = 0;
> > +
> > + this->notify();
> > +}
> > +bool HTTPConnectionManager::addChunk
>
> > +(Chunk *chunk) {
> > + if(chunk == NULL)
> > + return false;
> > +
> > + this->downloadQueue.push_back(chunk);
> > +
> > + std::vector<PersistentConnection *> cons =
> > + this->getConnectionsForHost(chunk->getHostname());
> > +
> > + if(cons.size() == 0)
> > + {
> > + PersistentConnection *con = new
> > + PersistentConnection(this->stream);
> > + this->connectionPool.push_back(con);
> > + cons.push_back(con);
> > + }
> > +
> > + size_t pos = this->chunkCount % cons.size();
> > +
> > + cons.at(pos)->addChunk(chunk);
> > +
> > + chunk->setConnection(cons.at(pos));
> > +
> > + this->chunkCount++;
> > +
> > + if(chunk->getBitrate() <= 0)
> > + chunk->setBitrate(this->CHUNKDEFAULTBITRATE);
> > +
> > + return true;
> > +}
> > diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> > b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> > index 006ca12..c2f69cf 100644
> > --- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> > +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> > @@ -29,14 +29,13 @@
> >
> > #include <string>
> > #include <vector>
> > +#include <deque>
> > #include <iostream>
> > #include <ctime>
> > -#include <map>
> > #include <limits.h>
> >
> > -#include "http/HTTPConnection.h"
> > -#include "http/Chunk.h"
> > -#include "adaptationlogic/IDownloadRateObserver.h"
> > +#include "http/PersistentConnection.h"
> > +#include "adaptationlogic/IAdaptationLogic.h"
> >
> > namespace dash
> > {
> > @@ -45,31 +44,36 @@ namespace dash
> > class HTTPConnectionManager
> > {
> > public:
> > - HTTPConnectionManager (stream_t *stream);
> > + HTTPConnectionManager
> > + (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
> > virtual ~HTTPConnectionManager ();
> >
> > - void closeAllConnections ();
> > - bool closeConnection
> > (IHTTPConnection *con);
> > - int read (Chunk
> > *chunk, void *p_buffer, size_t len);
> > - int peek (Chunk
> > *chunk, const uint8_t **pp_peek, size_t i_peek);
> > - void attach
> > (dash::logic::IDownloadRateObserver *observer);
> > - void notify ();
> > + void closeAllConnections ();
> > + bool addChunk (Chunk *chunk);
> > + int read (block_t *block);
> > + void attach
> > + (dash::logic::IDownloadRateObserver *observer);
> > + void notify ();
> >
> > private:
> > - std::map<Chunk *, HTTPConnection *>
> > chunkMap;
> > - std::map<std::string, HTTPConnection *>
> > urlMap;
> > std::vector<dash::logic::IDownloadRateObserver *>
> > rateObservers;
> > - uint64_t
> > bpsAvg;
> > - uint64_t
> > bpsLastChunk;
> > - long
> > bytesReadSession;
> > - double
> > timeSecSession;
> > - long
> > bytesReadChunk;
> > - double
> > timeSecChunk;
> > + std::deque<Chunk *>
> > + downloadQueue;
> > + std::vector<PersistentConnection *>
> > + connectionPool;
> > + logic::IAdaptationLogic
> > + *adaptationLogic;
> > stream_t
> > *stream;
> > int
> > chunkCount;
> > + int64_t
> > + bpsAvg;
> > + int64_t
> > + bpsLastChunk;
> > + int64_t
> > + bpsCurrentChunk;
> > + int64_t
> > + bytesReadSession;
> > + int64_t
> > + bytesReadChunk;
> > + double
> > + timeSession;
> > + double
> > + timeChunk;
> >
> > - bool closeConnection( Chunk *chunk );
> > - IHTTPConnection* initConnection( Chunk *chunk );
> > + static const int PIPELINE;
> > + static const int PIPELINELENGTH;
> > + static const int CHUNKDEFAULTBITRATE;
> > +
> > + std::vector<PersistentConnection *>
> > + getConnectionsForHost (const std::string &hostname);
> > + void
> > + updateStatistics (int bytes, double time);
> >
> > };
> > }
> > --
> > 1.7.0.4
> >
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > http://mailman.videolan.org/listinfo/vlc-devel
>
> Please split this commit in multiple parts. Not every change are
> relative to your commit message.
>
> Looks like the Chunk are deleted from the HTTPConnectionManager DTOR,
> so the leak is kinda fixed. Although, with a big stream, this will be
> an issue.
The chunks get deleted in the read function:
> + if(ret <= 0)
> + {
> + this->bpsLastChunk = this->bpsCurrentChunk;
> + this->bytesReadChunk = 0;
> + this->timeChunk = 0;
>
> - if(this->chunkCount < 2)
> - this->bpsLastChunk = 0;
> + delete(this->downloadQueue.front());
> + this->downloadQueue.pop_front();
And if there are some chunks left they will be deleted in the destructor. The problem is I don’t know a convenient way to split a patch maybe you can advise me. Sorry if this is a stupid question ;)
BR
Chris
More information about the vlc-devel
mailing list