[vlc-devel] [PATCH 1/6] dash: enabled persistent connections and pipelining

Hugo Beauzée-Luyssen beauze.h at gmail.com
Wed Apr 11 12:30:51 CEST 2012


On Tue, Apr 3, 2012 at 4:41 PM, Christopher Mueller <"Christopher
Mueller"@mailsrv.uni-klu.ac.at> wrote:
> From: Christopher Mueller <christopher.mueller at itec.aau.at>
>
> ---
>  modules/stream_filter/dash/DASHDownloader.cpp      |   40 ++---
>  modules/stream_filter/dash/DASHDownloader.h        |    3 +-
>  modules/stream_filter/dash/DASHManager.cpp         |    8 +-
>  modules/stream_filter/dash/DASHManager.h           |    2 +-
>  modules/stream_filter/dash/dash.cpp                |   17 +--
>  .../dash/http/HTTPConnectionManager.cpp            |  185 ++++++++++---------
>  .../dash/http/HTTPConnectionManager.h              |   46 +++---
>  7 files changed, 145 insertions(+), 156 deletions(-)
>
> diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
> index 0fd5c92..9388e34 100644
> --- a/modules/stream_filter/dash/DASHDownloader.cpp
> +++ b/modules/stream_filter/dash/DASHDownloader.cpp
> @@ -32,11 +32,11 @@ using namespace dash::http;
>  using namespace dash::logic;
>  using namespace dash::buffer;
>
> -DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
> +
> +DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, BlockBuffer *buffer)
>  {
>     this->t_sys                     = (thread_sys_t *) malloc(sizeof(thread_sys_t));
>     this->t_sys->conManager         = conManager;
> -    this->t_sys->adaptationLogic    = adaptationLogic;
>     this->t_sys->buffer             = buffer;
>  }
>  DASHDownloader::~DASHDownloader ()
> @@ -57,42 +57,24 @@ void*       DASHDownloader::download    (void *thread_sys)
>  {
>     thread_sys_t            *t_sys              = (thread_sys_t *) thread_sys;
>     HTTPConnectionManager   *conManager         = t_sys->conManager;
> -    IAdaptationLogic        *adaptationLogic    = t_sys->adaptationLogic;
>     BlockBuffer             *buffer             = t_sys->buffer;
> -    Chunk                   *currentChunk       = NULL;
>     block_t                 *block              = block_Alloc(BLOCKSIZE);
> +    int                     ret                 = 0;
>
>     do
>     {
> -        if(currentChunk == NULL)
> -        {
> -            currentChunk  = adaptationLogic->getNextChunk();
> -            if(currentChunk == NULL)
> -            {
> -                buffer->setEOF(true);
> -            }
> -        }
> -        else
> +        ret = conManager->read(block);
> +        if(ret > 0)
>         {
> -            int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
> -            if(ret <= 0)
> -            {
> -                currentChunk = NULL;
> -            }
> -            else
> -            {
> -                block_t *bufBlock = block_Alloc(ret);
> -                memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> -
> -                if(currentChunk->getBitrate() <= 0)
> -                    currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
> +            block_t *bufBlock = block_Alloc(ret);
> +            memcpy(bufBlock->p_buffer, block->p_buffer, ret);
>
> -                bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
> -                buffer->put(bufBlock);
> -            }
> +            bufBlock->i_length = block->i_length;
> +            buffer->put(bufBlock);
>         }
> -    }while(!buffer->getEOF());
> +    }while(ret && !buffer->getEOF());
>
> +    buffer->setEOF(true);
>     block_Release(block);
>
>     return NULL;
> diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
> index 1fcdf19..7807ec5 100644
> --- a/modules/stream_filter/dash/DASHDownloader.h
> +++ b/modules/stream_filter/dash/DASHDownloader.h
> @@ -39,14 +39,13 @@ namespace dash
>     struct thread_sys_t
>     {
>         dash::http::HTTPConnectionManager   *conManager;
> -        logic::IAdaptationLogic             *adaptationLogic;
>         buffer::BlockBuffer                 *buffer;
>     };
>
>     class DASHDownloader
>     {
>         public:
> -            DASHDownloader          (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
> +            DASHDownloader          (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
>             virtual ~DASHDownloader ();
>
>             bool            start       ();
> diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
> index 8483c9d..b897019 100644
> --- a/modules/stream_filter/dash/DASHManager.cpp
> +++ b/modules/stream_filter/dash/DASHManager.cpp
> @@ -34,9 +34,8 @@ using namespace dash::logic;
>  using namespace dash::mpd;
>  using namespace dash::buffer;
>
> -DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
> +DASHManager::DASHManager    ( MPD *mpd,
>                               IAdaptationLogic::LogicType type, stream_t *stream) :
> -             conManager     ( conManager ),
>              currentChunk   ( NULL ),
>              adaptationLogic( NULL ),
>              logicType      ( type ),
> @@ -51,8 +50,10 @@ DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
>     if ( this->adaptationLogic == NULL )
>         return ;
>
> +    this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
> +
>     this->buffer     = new BlockBuffer(this->stream);
> -    this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
> +    this->downloader = new DASHDownloader(this->conManager, this->buffer);
>
>     this->conManager->attach(this->adaptationLogic);
>     this->buffer->attach(this->adaptationLogic);
> @@ -61,6 +62,7 @@ DASHManager::~DASHManager   ()
>  {
>     delete this->downloader;
>     delete this->buffer;
> +    delete this->conManager;
>     delete this->adaptationLogic;
>     delete this->mpdManager;
>  }
> diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
> index fe42761..31ea7dd 100644
> --- a/modules/stream_filter/dash/DASHManager.h
> +++ b/modules/stream_filter/dash/DASHManager.h
> @@ -40,7 +40,7 @@ namespace dash
>     class DASHManager
>     {
>         public:
> -            DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
> +            DASHManager( mpd::MPD *mpd,
>                          logic::IAdaptationLogic::LogicType type, stream_t *stream);
>             virtual ~DASHManager    ();
>
> diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
> index 9003a9d..56eaa9f 100644
> --- a/modules/stream_filter/dash/dash.cpp
> +++ b/modules/stream_filter/dash/dash.cpp
> @@ -74,9 +74,8 @@ vlc_module_end ()
>  *****************************************************************************/
>  struct stream_sys_t
>  {
> -        dash::DASHManager                   *p_dashManager;
> -        dash::http::HTTPConnectionManager   *p_conManager;
> -        dash::mpd::MPD                      *p_mpd;
> +        dash::DASHManager   *p_dashManager;
> +        dash::mpd::MPD      *p_mpd;
>         uint64_t                            position;
>         bool                                isLive;
>  };
> @@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
>         return VLC_ENOMEM;
>
>     p_sys->p_mpd = mpd;
> -    dash::http::HTTPConnectionManager *p_conManager =
> -                              new dash::http::HTTPConnectionManager( p_stream );
> -    dash::DASHManager*p_dashManager =
> -            new dash::DASHManager( p_conManager, p_sys->p_mpd,
> -                                   dash::logic::IAdaptationLogic::RateBased, p_stream);
> +    dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
> +                                          dash::logic::IAdaptationLogic::RateBased,
> +                                          p_stream);
>
>     if ( p_dashManager->getMpdManager()           == NULL   ||
>          p_dashManager->getMpdManager()->getMPD() == NULL   ||
>          p_dashManager->getAdaptionLogic()        == NULL   ||
>          p_dashManager->start()                   == false)
>     {
> -        delete p_conManager;
>         delete p_dashManager;
>         free( p_sys );
>         return VLC_EGENERIC;
>     }
>     p_sys->p_dashManager    = p_dashManager;
> -    p_sys->p_conManager     = p_conManager;
>     p_sys->position         = 0;
>     p_sys->isLive           = p_dashManager->getMpdManager()->getMPD()->isLive();
>     p_stream->p_sys         = p_sys;
> @@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
>     stream_t                            *p_stream       = (stream_t*) p_obj;
>     stream_sys_t                        *p_sys          = (stream_sys_t *) p_stream->p_sys;
>     dash::DASHManager                   *p_dashManager  = p_sys->p_dashManager;
> -    dash::http::HTTPConnectionManager   *p_conManager   = p_sys->p_conManager;
>
> -    delete(p_conManager);
>     delete(p_dashManager);
>     free(p_sys);
>  }
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> index c0e18b2..cd4fade 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> @@ -31,118 +31,70 @@
>  using namespace dash::http;
>  using namespace dash::logic;
>
> -HTTPConnectionManager::HTTPConnectionManager    (stream_t *stream)
> +const size_t    HTTPConnectionManager::PIPELINE               = 80;
> +const size_t    HTTPConnectionManager::PIPELINELENGTH         = 2;
> +const uint64_t  HTTPConnectionManager::CHUNKDEFAULTBITRATE    = 1;
> +
> +HTTPConnectionManager::HTTPConnectionManager    (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
> +                       adaptationLogic          (adaptationLogic),
> +                       stream                   (stream),
> +                       chunkCount               (0),
> +                       bpsAvg                   (0),
> +                       bpsLastChunk             (0),
> +                       bpsCurrentChunk          (0),
> +                       bytesReadSession         (0),
> +                       bytesReadChunk           (0),
> +                       timeSession              (0),
> +                       timeChunk                (0)
>  {
> -    this->timeSecSession    = 0;
> -    this->bytesReadSession  = 0;
> -    this->timeSecChunk      = 0;
> -    this->bytesReadChunk    = 0;
> -    this->bpsAvg            = 0;
> -    this->bpsLastChunk      = 0;
> -    this->chunkCount        = 0;
> -    this->stream            = stream;
>  }
>  HTTPConnectionManager::~HTTPConnectionManager   ()
>  {
>     this->closeAllConnections();
>  }
>
> -bool                HTTPConnectionManager::closeConnection( IHTTPConnection *con )
> +void                                HTTPConnectionManager::closeAllConnections      ()
>  {
> -    for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
> -        it != this->chunkMap.end(); ++it)
> -    {
> -        if( it->second == con )
> -        {
> -            delete con;
> -            this->chunkMap.erase( it );
> -            return true;
> -        }
> -    }
> -    return false;
> +    vlc_delete_all(this->connectionPool);
> +    vlc_delete_all(this->downloadQueue);
>  }
> -
> -bool                HTTPConnectionManager::closeConnection( Chunk *chunk )
> -{
> -    HTTPConnection *con = this->chunkMap[chunk];
> -    bool ret = this->closeConnection(con);
> -    this->chunkMap.erase(chunk);
> -    delete(chunk);
> -    return ret;
> -}
> -
> -void                HTTPConnectionManager::closeAllConnections      ()
> +int                                 HTTPConnectionManager::read                     (block_t *block)
>  {
> -    std::map<Chunk *, HTTPConnection *>::iterator it;
> +    if(this->downloadQueue.size() == 0)
> +        if(!this->addChunk(this->adaptationLogic->getNextChunk()))
> +            return 0;
>
> -    for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
> -        delete(it->second);
> +    if(this->downloadQueue.front()->getPercentDownloaded() > this->PIPELINE &&
> +       this->downloadQueue.size() < this->PIPELINELENGTH)
You shouldn't access a static member using this. Use
HTTPConnectionManager:: instead.

> +        this->addChunk(this->adaptationLogic->getNextChunk());
>
> -    this->chunkMap.clear();
> -}
> -
> -int                 HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
> -{
> -    if(this->chunkMap.find(chunk) == this->chunkMap.end())
> -    {
> -        this->bytesReadChunk    = 0;
> -        this->timeSecChunk      = 0;
> -
> -        if ( this->initConnection( chunk ) == NULL )
> -            return -1;
> -    }
> +    int ret = 0;
>
>     mtime_t start = mdate();
> -    int ret = this->chunkMap[chunk]->read(p_buffer, len);
> +    ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
>     mtime_t end = mdate();
>
> -    if( ret <= 0 )
> -        this->closeConnection( chunk );
> -    else
> -    {
> -        double time = ((double)(end - start)) / 1000000;
> -
> -        this->bytesReadSession += ret;
> -        this->bytesReadChunk   += ret;
> -        this->timeSecSession   += time;
> -        this->timeSecChunk     += time;
> -
> +    block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
>
> -        if(this->timeSecSession > 0)
> -            this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
> +    double time = ((double)(end - start)) / 1000000;
>
> -        if(this->timeSecChunk > 0)
> -            this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
> -
> -        if(this->chunkCount < 2)
> -            this->bpsAvg = 0;
> +    if(ret <= 0)
> +    {
> +        this->bpsLastChunk   = this->bpsCurrentChunk;
> +        this->bytesReadChunk = 0;
> +        this->timeChunk      = 0;
>
> -        if(this->chunkCount < 2)
> -            this->bpsLastChunk = 0;
> +        delete(this->downloadQueue.front());
> +        this->downloadQueue.pop_front();
>
> -        this->notify();
> +        return this->read(block);
>     }
> -    return ret;
> -}
> -
> -int                 HTTPConnectionManager::peek                     (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
> -{
> -    if(this->chunkMap.find(chunk) == this->chunkMap.end())
> +    else
>     {
> -        if ( this->initConnection(chunk) == NULL )
> -            return -1;
> +        this->updateStatistics(ret, time);
>     }
> -    return this->chunkMap[chunk]->peek(pp_peek, i_peek);
> -}
>
> -IHTTPConnection*     HTTPConnectionManager::initConnection(Chunk *chunk)
> -{
> -    HTTPConnection *con = new HTTPConnection(this->stream);
> -    if ( con->init(chunk) == false )
> -        return NULL;
> -    this->chunkMap[chunk] = con;
> -    this->chunkCount++;
> -    return con;
> +    return ret;
>  }
>  void                HTTPConnectionManager::attach                   (IDownloadRateObserver *observer)
>  {
> @@ -155,3 +107,60 @@ void                HTTPConnectionManager::notify                   ()
>     for(size_t i = 0; i < this->rateObservers.size(); i++)
>         this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
>  }
> +std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost    (const std::string &hostname)
> +{
> +    std::vector<PersistentConnection *> cons;
> +
> +    for(size_t i = 0; i < this->connectionPool.size(); i++)
> +        if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
> +            cons.push_back(this->connectionPool.at(i));
> +
> +    return cons;
> +}
> +void                                HTTPConnectionManager::updateStatistics         (int bytes, double time)
> +{
> +    this->bytesReadSession  += bytes;
> +    this->bytesReadChunk    += bytes;
> +    this->timeSession       += time;
> +    this->timeChunk         += time;
> +
> +    this->bpsAvg            = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
> +    this->bpsCurrentChunk   = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
> +
> +    if(this->bpsAvg < 0)
> +        this->bpsAvg = 0;
> +
> +    if(this->bpsCurrentChunk < 0)
> +        this->bpsCurrentChunk = 0;
> +
> +    this->notify();
> +}
> +bool                                HTTPConnectionManager::addChunk                 (Chunk *chunk)
> +{
> +    if(chunk == NULL)
> +        return false;
> +
> +    this->downloadQueue.push_back(chunk);
> +
> +    std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
> +
> +    if(cons.size() == 0)
> +    {
> +        PersistentConnection *con = new PersistentConnection(this->stream);
> +        this->connectionPool.push_back(con);
> +        cons.push_back(con);
> +    }
> +
> +    size_t pos = this->chunkCount % cons.size();
> +
> +    cons.at(pos)->addChunk(chunk);
> +
> +    chunk->setConnection(cons.at(pos));
> +
> +    this->chunkCount++;
> +
> +    if(chunk->getBitrate() <= 0)
> +        chunk->setBitrate(this->CHUNKDEFAULTBITRATE);
Same cosmetic/syntaxic remark.

> +
> +    return true;
> +}
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> index 006ca12..b71b3c1 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> @@ -29,14 +29,13 @@
>
>  #include <string>
>  #include <vector>
> +#include <deque>
>  #include <iostream>
>  #include <ctime>
> -#include <map>
>  #include <limits.h>
>
> -#include "http/HTTPConnection.h"
> -#include "http/Chunk.h"
> -#include "adaptationlogic/IDownloadRateObserver.h"
> +#include "http/PersistentConnection.h"
> +#include "adaptationlogic/IAdaptationLogic.h"
>
>  namespace dash
>  {
> @@ -45,31 +44,36 @@ namespace dash
>         class HTTPConnectionManager
>         {
>             public:
> -                HTTPConnectionManager           (stream_t *stream);
> +                HTTPConnectionManager           (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
>                 virtual ~HTTPConnectionManager  ();
>
> -                void                closeAllConnections ();
> -                bool                closeConnection     (IHTTPConnection *con);
> -                int                 read                (Chunk *chunk, void *p_buffer, size_t len);
> -                int                 peek                (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
> -                void                attach              (dash::logic::IDownloadRateObserver *observer);
> -                void                notify              ();
> +                void    closeAllConnections ();
> +                bool    addChunk            (Chunk *chunk);
> +                int     read                (block_t *block);
> +                void    attach              (dash::logic::IDownloadRateObserver *observer);
> +                void    notify              ();
>
>             private:
> -                std::map<Chunk *, HTTPConnection *>                 chunkMap;
> -                std::map<std::string, HTTPConnection *>             urlMap;
>                 std::vector<dash::logic::IDownloadRateObserver *>   rateObservers;
> -                uint64_t                                            bpsAvg;
> -                uint64_t                                            bpsLastChunk;
> -                long                                                bytesReadSession;
> -                double                                              timeSecSession;
> -                long                                                bytesReadChunk;
> -                double                                              timeSecChunk;
> +                std::deque<Chunk *>                                 downloadQueue;
> +                std::vector<PersistentConnection *>                 connectionPool;
> +                logic::IAdaptationLogic                             *adaptationLogic;
>                 stream_t                                            *stream;
>                 int                                                 chunkCount;
> +                int64_t                                             bpsAvg;
> +                int64_t                                             bpsLastChunk;
> +                int64_t                                             bpsCurrentChunk;
> +                int64_t                                             bytesReadSession;
> +                int64_t                                             bytesReadChunk;
> +                double                                              timeSession;
> +                double                                              timeChunk;
>
> -                bool                closeConnection( Chunk *chunk );
> -                IHTTPConnection*    initConnection( Chunk *chunk );
> +                static const size_t     PIPELINE;
> +                static const size_t     PIPELINELENGTH;
> +                static const uint64_t   CHUNKDEFAULTBITRATE;
> +
> +                std::vector<PersistentConnection *>     getConnectionsForHost   (const std::string &hostname);
> +                void                                    updateStatistics        (int bytes, double time);
>
>         };
>     }
> --
> 1.7.0.4
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> http://mailman.videolan.org/listinfo/vlc-devel

Despite the syntaxic/cosmetic remark, looks good to me.

Regards,

-- 
Hugo Beauzée-Luyssen


More information about the vlc-devel mailing list