[vlc-devel] [PATCH 08/12] dash: add persistentconnection to connectionmanager

Hugo Beauzée-Luyssen beauze.h at gmail.com
Tue Mar 13 16:31:44 CET 2012


On Tue, Mar 13, 2012 at 3:18 PM,  <Christopher at mailsrv.uni-klu.ac.at> wrote:
> From: Christopher Mueller <christopher.mueller at itec.aau.at>
>
> ---
>  modules/stream_filter/dash/DASHDownloader.cpp      |   40 ++---
>  modules/stream_filter/dash/DASHDownloader.h        |    3 +-
>  modules/stream_filter/dash/DASHManager.cpp         |    8 +-
>  modules/stream_filter/dash/DASHManager.h           |    2 +-
>  modules/stream_filter/dash/dash.cpp                |   21 +--
>  .../dash/http/HTTPConnectionManager.cpp            |  189 ++++++++++---------
>  .../dash/http/HTTPConnectionManager.h              |   46 +++---
>  7 files changed, 149 insertions(+), 160 deletions(-)
>
> diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
> index 0fd5c92..9388e34 100644
> --- a/modules/stream_filter/dash/DASHDownloader.cpp
> +++ b/modules/stream_filter/dash/DASHDownloader.cpp
> @@ -32,11 +32,11 @@ using namespace dash::http;
>  using namespace dash::logic;
>  using namespace dash::buffer;
>
> -DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
> +
> +DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, BlockBuffer *buffer)
>  {
>     this->t_sys                     = (thread_sys_t *) malloc(sizeof(thread_sys_t));
>     this->t_sys->conManager         = conManager;
> -    this->t_sys->adaptationLogic    = adaptationLogic;
>     this->t_sys->buffer             = buffer;
>  }
>  DASHDownloader::~DASHDownloader ()
> @@ -57,42 +57,24 @@ void*       DASHDownloader::download    (void *thread_sys)
>  {
>     thread_sys_t            *t_sys              = (thread_sys_t *) thread_sys;
>     HTTPConnectionManager   *conManager         = t_sys->conManager;
> -    IAdaptationLogic        *adaptationLogic    = t_sys->adaptationLogic;
>     BlockBuffer             *buffer             = t_sys->buffer;
> -    Chunk                   *currentChunk       = NULL;
>     block_t                 *block              = block_Alloc(BLOCKSIZE);
> +    int                     ret                 = 0;
>
>     do
>     {
> -        if(currentChunk == NULL)
> -        {
> -            currentChunk  = adaptationLogic->getNextChunk();
> -            if(currentChunk == NULL)
> -            {
> -                buffer->setEOF(true);
> -            }
> -        }
> -        else
> +        ret = conManager->read(block);
> +        if(ret > 0)
>         {
> -            int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
> -            if(ret <= 0)
> -            {
> -                currentChunk = NULL;
> -            }
> -            else
> -            {
> -                block_t *bufBlock = block_Alloc(ret);
> -                memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> -
> -                if(currentChunk->getBitrate() <= 0)
> -                    currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
> +            block_t *bufBlock = block_Alloc(ret);
> +            memcpy(bufBlock->p_buffer, block->p_buffer, ret);
>
> -                bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
> -                buffer->put(bufBlock);
> -            }
> +            bufBlock->i_length = block->i_length;
> +            buffer->put(bufBlock);
>         }
> -    }while(!buffer->getEOF());
> +    }while(ret && !buffer->getEOF());
>
> +    buffer->setEOF(true);
>     block_Release(block);
>
>     return NULL;
> diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
> index 1fcdf19..7807ec5 100644
> --- a/modules/stream_filter/dash/DASHDownloader.h
> +++ b/modules/stream_filter/dash/DASHDownloader.h
> @@ -39,14 +39,13 @@ namespace dash
>     struct thread_sys_t
>     {
>         dash::http::HTTPConnectionManager   *conManager;
> -        logic::IAdaptationLogic             *adaptationLogic;
>         buffer::BlockBuffer                 *buffer;
>     };
>
>     class DASHDownloader
>     {
>         public:
> -            DASHDownloader          (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
> +            DASHDownloader          (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
>             virtual ~DASHDownloader ();
>
>             bool            start       ();
> diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
> index f649015..bb2f3f0 100644
> --- a/modules/stream_filter/dash/DASHManager.cpp
> +++ b/modules/stream_filter/dash/DASHManager.cpp
> @@ -34,9 +34,8 @@ using namespace dash::logic;
>  using namespace dash::mpd;
>  using namespace dash::buffer;
>
> -DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
> +DASHManager::DASHManager    ( MPD *mpd,
>                               IAdaptationLogic::LogicType type, stream_t *stream) :
> -             conManager     ( conManager ),
>              currentChunk   ( NULL ),
>              adaptationLogic( NULL ),
>              logicType      ( type ),
> @@ -51,8 +50,10 @@ DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
>     if ( this->adaptationLogic == NULL )
>         return ;
>
> +    this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
> +
>     this->buffer     = new BlockBuffer(this->stream);
> -    this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
> +    this->downloader = new DASHDownloader(this->conManager, this->buffer);
>
>     this->conManager->attach(this->adaptationLogic);
>     this->buffer->attach(this->adaptationLogic);
> @@ -61,6 +62,7 @@ DASHManager::~DASHManager   ()
>  {
>     delete this->downloader;
>     delete this->buffer;
> +    delete this->conManager;
>     delete this->adaptationLogic;
>     delete this->mpdManager;
>  }
> diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
> index a09e98e..ab76a4a 100644
> --- a/modules/stream_filter/dash/DASHManager.h
> +++ b/modules/stream_filter/dash/DASHManager.h
> @@ -40,7 +40,7 @@ namespace dash
>     class DASHManager
>     {
>         public:
> -            DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
> +            DASHManager( mpd::MPD *mpd,
>                          logic::IAdaptationLogic::LogicType type, stream_t *stream);
>             virtual ~DASHManager    ();
>
> diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
> index d1ca988..ee42beb 100644
> --- a/modules/stream_filter/dash/dash.cpp
> +++ b/modules/stream_filter/dash/dash.cpp
> @@ -74,11 +74,10 @@ vlc_module_end ()
>  *****************************************************************************/
>  struct stream_sys_t
>  {
> -        dash::DASHManager                   *p_dashManager;
> -        dash::http::HTTPConnectionManager   *p_conManager;
> -        dash::mpd::MPD                      *p_mpd;
> -        int                                 position;
> -        bool                                isLive;
> +        dash::DASHManager   *p_dashManager;
> +        dash::mpd::MPD      *p_mpd;
> +        int                 position;
> +        bool                isLive;
>  };
>
>  static int  Read            (stream_t *p_stream, void *p_buffer, unsigned int i_len);
> @@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
>         return VLC_ENOMEM;
>
>     p_sys->p_mpd = mpd;
> -    dash::http::HTTPConnectionManager *p_conManager =
> -                              new dash::http::HTTPConnectionManager( p_stream );
> -    dash::DASHManager*p_dashManager =
> -            new dash::DASHManager( p_conManager, p_sys->p_mpd,
> -                                   dash::logic::IAdaptationLogic::RateBased, p_stream);
> +    dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
> +                                          dash::logic::IAdaptationLogic::RateBased,
> +                                          p_stream);
>
>     if ( p_dashManager->getMpdManager()           == NULL   ||
>          p_dashManager->getMpdManager()->getMPD() == NULL   ||
>          p_dashManager->getAdaptionLogic()        == NULL   ||
>          p_dashManager->start()                   == false)
>     {
> -        delete p_conManager;
>         delete p_dashManager;
>         free( p_sys );
>         return VLC_EGENERIC;
>     }
>     p_sys->p_dashManager    = p_dashManager;
> -    p_sys->p_conManager     = p_conManager;
>     p_sys->position         = 0;
>     p_sys->isLive           = p_dashManager->getMpdManager()->getMPD()->isLive();
>     p_stream->p_sys         = p_sys;
> @@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
>     stream_t                            *p_stream       = (stream_t*) p_obj;
>     stream_sys_t                        *p_sys          = (stream_sys_t *) p_stream->p_sys;
>     dash::DASHManager                   *p_dashManager  = p_sys->p_dashManager;
> -    dash::http::HTTPConnectionManager   *p_conManager   = p_sys->p_conManager;
>
> -    delete(p_conManager);
>     delete(p_dashManager);
>     free(p_sys);
>  }
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> index c0e18b2..2059d05 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> @@ -31,127 +31,136 @@
>  using namespace dash::http;
>  using namespace dash::logic;
>
> -HTTPConnectionManager::HTTPConnectionManager    (stream_t *stream)
> +const int HTTPConnectionManager::PIPELINE               = 80;
> +const int HTTPConnectionManager::PIPELINELENGTH         = 2;
> +const int HTTPConnectionManager::CHUNKDEFAULTBITRATE    = 1;
> +
> +HTTPConnectionManager::HTTPConnectionManager    (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
> +                       adaptationLogic          (adaptationLogic),
> +                       stream                   (stream),
> +                       chunkCount               (0),
> +                       bpsAvg                   (0),
> +                       bpsLastChunk             (0),
> +                       bpsCurrentChunk          (0),
> +                       bytesReadSession         (0),
> +                       bytesReadChunk           (0),
> +                       timeSession              (0),
> +                       timeChunk                (0)
>  {
> -    this->timeSecSession    = 0;
> -    this->bytesReadSession  = 0;
> -    this->timeSecChunk      = 0;
> -    this->bytesReadChunk    = 0;
> -    this->bpsAvg            = 0;
> -    this->bpsLastChunk      = 0;
> -    this->chunkCount        = 0;
> -    this->stream            = stream;
>  }
>  HTTPConnectionManager::~HTTPConnectionManager   ()
>  {
>     this->closeAllConnections();
>  }
>
> -bool                HTTPConnectionManager::closeConnection( IHTTPConnection *con )
> +void                                HTTPConnectionManager::closeAllConnections      ()
>  {
> -    for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
> -        it != this->chunkMap.end(); ++it)
> -    {
> -        if( it->second == con )
> -        {
> -            delete con;
> -            this->chunkMap.erase( it );
> -            return true;
> -        }
> -    }
> -    return false;
> +    vlc_delete_all(this->connectionPool);
> +    vlc_delete_all(this->downloadQueue);
>  }
> -
> -bool                HTTPConnectionManager::closeConnection( Chunk *chunk )
> -{
> -    HTTPConnection *con = this->chunkMap[chunk];
> -    bool ret = this->closeConnection(con);
> -    this->chunkMap.erase(chunk);
> -    delete(chunk);
> -    return ret;
> -}
> -
> -void                HTTPConnectionManager::closeAllConnections      ()
> +int                                 HTTPConnectionManager::read                     (block_t *block)
>  {
> -    std::map<Chunk *, HTTPConnection *>::iterator it;
> +    if(this->downloadQueue.size() == 0)
> +        if(!this->addChunk(this->adaptationLogic->getNextChunk()))
> +            return 0;
>
> -    for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
> -        delete(it->second);
> +    if(this->downloadQueue.front()->getPercentDownloaded() > this->PIPELINE &&
> +       this->downloadQueue.size() < this->PIPELINELENGTH)
> +        this->addChunk(this->adaptationLogic->getNextChunk());
>
> -    this->chunkMap.clear();
> -}
> -
> -int                 HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
> -{
> -    if(this->chunkMap.find(chunk) == this->chunkMap.end())
> -    {
> -        this->bytesReadChunk    = 0;
> -        this->timeSecChunk      = 0;
> -
> -        if ( this->initConnection( chunk ) == NULL )
> -            return -1;
> -    }
> +    int ret = 0;
>
>     mtime_t start = mdate();
> -    int ret = this->chunkMap[chunk]->read(p_buffer, len);
> +    ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
>     mtime_t end = mdate();
>
> -    if( ret <= 0 )
> -        this->closeConnection( chunk );
> -    else
> -    {
> -        double time = ((double)(end - start)) / 1000000;
> -
> -        this->bytesReadSession += ret;
> -        this->bytesReadChunk   += ret;
> -        this->timeSecSession   += time;
> -        this->timeSecChunk     += time;
> -
> +    block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
>
> -        if(this->timeSecSession > 0)
> -            this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
> +    double time = ((double)(end - start)) / 1000000;
>
> -        if(this->timeSecChunk > 0)
> -            this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
> -
> -        if(this->chunkCount < 2)
> -            this->bpsAvg = 0;
> +    if(ret <= 0)
> +    {
> +        this->bpsLastChunk   = this->bpsCurrentChunk;
> +        this->bytesReadChunk = 0;
> +        this->timeChunk      = 0;
>
> -        if(this->chunkCount < 2)
> -            this->bpsLastChunk = 0;
> +        delete(this->downloadQueue.front());
> +        this->downloadQueue.pop_front();
>
> -        this->notify();
> +        return this->read(block);
>     }
> -    return ret;
> -}
> -
> -int                 HTTPConnectionManager::peek                     (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
> -{
> -    if(this->chunkMap.find(chunk) == this->chunkMap.end())
> +    else
>     {
> -        if ( this->initConnection(chunk) == NULL )
> -            return -1;
> +        this->updateStatistics(ret, time);
>     }
> -    return this->chunkMap[chunk]->peek(pp_peek, i_peek);
> -}
>
> -IHTTPConnection*     HTTPConnectionManager::initConnection(Chunk *chunk)
> -{
> -    HTTPConnection *con = new HTTPConnection(this->stream);
> -    if ( con->init(chunk) == false )
> -        return NULL;
> -    this->chunkMap[chunk] = con;
> -    this->chunkCount++;
> -    return con;
> +    return ret;
>  }
> -void                HTTPConnectionManager::attach                   (IDownloadRateObserver *observer)
> +void                                HTTPConnectionManager::attach                   (IDownloadRateObserver *observer)
>  {
>     this->rateObservers.push_back(observer);
>  }
> -void                HTTPConnectionManager::notify                   ()
> +void                                HTTPConnectionManager::notify                   ()
>  {
>     if ( this->bpsAvg == 0 )
>         return ;
>     for(size_t i = 0; i < this->rateObservers.size(); i++)
>         this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
>  }
> +std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost    (const std::string &hostname)
> +{
> +    std::vector<PersistentConnection *> cons;
> +
> +    for(size_t i = 0; i < this->connectionPool.size(); i++)
> +        if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
> +            cons.push_back(this->connectionPool.at(i));
> +
> +    return cons;
> +}
> +void                                HTTPConnectionManager::updateStatistics         (int bytes, double time)
> +{
> +    this->bytesReadSession  += bytes;
> +    this->bytesReadChunk    += bytes;
> +    this->timeSession       += time;
> +    this->timeChunk         += time;
> +
> +    this->bpsAvg            = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
> +    this->bpsCurrentChunk   = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
> +
> +    if(this->bpsAvg < 0)
> +        this->bpsAvg = 0;
> +
> +    if(this->bpsCurrentChunk < 0)
> +        this->bpsCurrentChunk = 0;
> +
> +    this->notify();
> +}
> +bool                                HTTPConnectionManager::addChunk                 (Chunk *chunk)
> +{
> +    if(chunk == NULL)
> +        return false;
> +
> +    this->downloadQueue.push_back(chunk);
> +
> +    std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
> +
> +    if(cons.size() == 0)
> +    {
> +        PersistentConnection *con = new PersistentConnection(this->stream);
> +        this->connectionPool.push_back(con);
> +        cons.push_back(con);
> +    }
> +
> +    size_t pos = this->chunkCount % cons.size();
> +
> +    cons.at(pos)->addChunk(chunk);
> +
> +    chunk->setConnection(cons.at(pos));
> +
> +    this->chunkCount++;
> +
> +    if(chunk->getBitrate() <= 0)
> +        chunk->setBitrate(this->CHUNKDEFAULTBITRATE);
> +
> +    return true;
> +}
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> index 006ca12..c2f69cf 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> @@ -29,14 +29,13 @@
>
>  #include <string>
>  #include <vector>
> +#include <deque>
>  #include <iostream>
>  #include <ctime>
> -#include <map>
>  #include <limits.h>
>
> -#include "http/HTTPConnection.h"
> -#include "http/Chunk.h"
> -#include "adaptationlogic/IDownloadRateObserver.h"
> +#include "http/PersistentConnection.h"
> +#include "adaptationlogic/IAdaptationLogic.h"
>
>  namespace dash
>  {
> @@ -45,31 +44,36 @@ namespace dash
>         class HTTPConnectionManager
>         {
>             public:
> -                HTTPConnectionManager           (stream_t *stream);
> +                HTTPConnectionManager           (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
>                 virtual ~HTTPConnectionManager  ();
>
> -                void                closeAllConnections ();
> -                bool                closeConnection     (IHTTPConnection *con);
> -                int                 read                (Chunk *chunk, void *p_buffer, size_t len);
> -                int                 peek                (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
> -                void                attach              (dash::logic::IDownloadRateObserver *observer);
> -                void                notify              ();
> +                void    closeAllConnections ();
> +                bool    addChunk            (Chunk *chunk);
> +                int     read                (block_t *block);
> +                void    attach              (dash::logic::IDownloadRateObserver *observer);
> +                void    notify              ();
>
>             private:
> -                std::map<Chunk *, HTTPConnection *>                 chunkMap;
> -                std::map<std::string, HTTPConnection *>             urlMap;
>                 std::vector<dash::logic::IDownloadRateObserver *>   rateObservers;
> -                uint64_t                                            bpsAvg;
> -                uint64_t                                            bpsLastChunk;
> -                long                                                bytesReadSession;
> -                double                                              timeSecSession;
> -                long                                                bytesReadChunk;
> -                double                                              timeSecChunk;
> +                std::deque<Chunk *>                                 downloadQueue;
> +                std::vector<PersistentConnection *>                 connectionPool;
> +                logic::IAdaptationLogic                             *adaptationLogic;
>                 stream_t                                            *stream;
>                 int                                                 chunkCount;
> +                int64_t                                             bpsAvg;
> +                int64_t                                             bpsLastChunk;
> +                int64_t                                             bpsCurrentChunk;
> +                int64_t                                             bytesReadSession;
> +                int64_t                                             bytesReadChunk;
> +                double                                              timeSession;
> +                double                                              timeChunk;
>
> -                bool                closeConnection( Chunk *chunk );
> -                IHTTPConnection*    initConnection( Chunk *chunk );
> +                static const int PIPELINE;
> +                static const int PIPELINELENGTH;
> +                static const int CHUNKDEFAULTBITRATE;
> +
> +                std::vector<PersistentConnection *>     getConnectionsForHost   (const std::string &hostname);
> +                void                                    updateStatistics        (int bytes, double time);
>
>         };
>     }
> --
> 1.7.0.4
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> http://mailman.videolan.org/listinfo/vlc-devel

Please split this commit in multiple parts. Not every change are
relative to your commit message.

Looks like the Chunk are deleted from the HTTPConnectionManager DTOR,
so the leak is kinda fixed. Although, with a big stream, this will be
an issue.

Regards,

-- 
Hugo Beauzée-Luyssen


More information about the vlc-devel mailing list