[vlc-devel] [PATCH] dash: enabled persistent connections and pipelining

Christopher Mueller "Christopher Mueller" at mailsrv.uni-klu.ac.at
Wed Apr 11 20:54:20 CEST 2012


From: Christopher Mueller <christopher.mueller at itec.aau.at>

---
 modules/stream_filter/dash/DASHDownloader.cpp      |   40 ++---
 modules/stream_filter/dash/DASHDownloader.h        |    3 +-
 modules/stream_filter/dash/DASHManager.cpp         |    8 +-
 modules/stream_filter/dash/DASHManager.h           |    2 +-
 modules/stream_filter/dash/dash.cpp                |   17 +--
 .../dash/http/HTTPConnectionManager.cpp            |  185 ++++++++++---------
 .../dash/http/HTTPConnectionManager.h              |   46 +++---
 7 files changed, 145 insertions(+), 156 deletions(-)

diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
index 0fd5c92..9388e34 100644
--- a/modules/stream_filter/dash/DASHDownloader.cpp
+++ b/modules/stream_filter/dash/DASHDownloader.cpp
@@ -32,11 +32,11 @@ using namespace dash::http;
 using namespace dash::logic;
 using namespace dash::buffer;
 
-DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
+
+DASHDownloader::DASHDownloader  (HTTPConnectionManager *conManager, BlockBuffer *buffer)
 {
     this->t_sys                     = (thread_sys_t *) malloc(sizeof(thread_sys_t));
     this->t_sys->conManager         = conManager;
-    this->t_sys->adaptationLogic    = adaptationLogic;
     this->t_sys->buffer             = buffer;
 }
 DASHDownloader::~DASHDownloader ()
@@ -57,42 +57,24 @@ void*       DASHDownloader::download    (void *thread_sys)
 {
     thread_sys_t            *t_sys              = (thread_sys_t *) thread_sys;
     HTTPConnectionManager   *conManager         = t_sys->conManager;
-    IAdaptationLogic        *adaptationLogic    = t_sys->adaptationLogic;
     BlockBuffer             *buffer             = t_sys->buffer;
-    Chunk                   *currentChunk       = NULL;
     block_t                 *block              = block_Alloc(BLOCKSIZE);
+    int                     ret                 = 0;
 
     do
     {
-        if(currentChunk == NULL)
-        {
-            currentChunk  = adaptationLogic->getNextChunk();
-            if(currentChunk == NULL)
-            {
-                buffer->setEOF(true);
-            }
-        }
-        else
+        ret = conManager->read(block);
+        if(ret > 0)
         {
-            int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
-            if(ret <= 0)
-            {
-                currentChunk = NULL;
-            }
-            else
-            {
-                block_t *bufBlock = block_Alloc(ret);
-                memcpy(bufBlock->p_buffer, block->p_buffer, ret);
-
-                if(currentChunk->getBitrate() <= 0)
-                    currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
+            block_t *bufBlock = block_Alloc(ret);
+            memcpy(bufBlock->p_buffer, block->p_buffer, ret);
 
-                bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
-                buffer->put(bufBlock);
-            }
+            bufBlock->i_length = block->i_length;
+            buffer->put(bufBlock);
         }
-    }while(!buffer->getEOF());
+    }while(ret && !buffer->getEOF());
 
+    buffer->setEOF(true);
     block_Release(block);
 
     return NULL;
diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
index 1fcdf19..7807ec5 100644
--- a/modules/stream_filter/dash/DASHDownloader.h
+++ b/modules/stream_filter/dash/DASHDownloader.h
@@ -39,14 +39,13 @@ namespace dash
     struct thread_sys_t
     {
         dash::http::HTTPConnectionManager   *conManager;
-        logic::IAdaptationLogic             *adaptationLogic;
         buffer::BlockBuffer                 *buffer;
     };
 
     class DASHDownloader
     {
         public:
-            DASHDownloader          (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
+            DASHDownloader          (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
             virtual ~DASHDownloader ();
 
             bool            start       ();
diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
index 8483c9d..b897019 100644
--- a/modules/stream_filter/dash/DASHManager.cpp
+++ b/modules/stream_filter/dash/DASHManager.cpp
@@ -34,9 +34,8 @@ using namespace dash::logic;
 using namespace dash::mpd;
 using namespace dash::buffer;
 
-DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
+DASHManager::DASHManager    ( MPD *mpd,
                               IAdaptationLogic::LogicType type, stream_t *stream) :
-             conManager     ( conManager ),
              currentChunk   ( NULL ),
              adaptationLogic( NULL ),
              logicType      ( type ),
@@ -51,8 +50,10 @@ DASHManager::DASHManager    ( HTTPConnectionManager *conManager, MPD *mpd,
     if ( this->adaptationLogic == NULL )
         return ;
 
+    this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
+
     this->buffer     = new BlockBuffer(this->stream);
-    this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
+    this->downloader = new DASHDownloader(this->conManager, this->buffer);
 
     this->conManager->attach(this->adaptationLogic);
     this->buffer->attach(this->adaptationLogic);
@@ -61,6 +62,7 @@ DASHManager::~DASHManager   ()
 {
     delete this->downloader;
     delete this->buffer;
+    delete this->conManager;
     delete this->adaptationLogic;
     delete this->mpdManager;
 }
diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
index fe42761..31ea7dd 100644
--- a/modules/stream_filter/dash/DASHManager.h
+++ b/modules/stream_filter/dash/DASHManager.h
@@ -40,7 +40,7 @@ namespace dash
     class DASHManager
     {
         public:
-            DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
+            DASHManager( mpd::MPD *mpd,
                          logic::IAdaptationLogic::LogicType type, stream_t *stream);
             virtual ~DASHManager    ();
 
diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
index 9003a9d..56eaa9f 100644
--- a/modules/stream_filter/dash/dash.cpp
+++ b/modules/stream_filter/dash/dash.cpp
@@ -74,9 +74,8 @@ vlc_module_end ()
  *****************************************************************************/
 struct stream_sys_t
 {
-        dash::DASHManager                   *p_dashManager;
-        dash::http::HTTPConnectionManager   *p_conManager;
-        dash::mpd::MPD                      *p_mpd;
+        dash::DASHManager   *p_dashManager;
+        dash::mpd::MPD      *p_mpd;
         uint64_t                            position;
         bool                                isLive;
 };
@@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
         return VLC_ENOMEM;
 
     p_sys->p_mpd = mpd;
-    dash::http::HTTPConnectionManager *p_conManager =
-                              new dash::http::HTTPConnectionManager( p_stream );
-    dash::DASHManager*p_dashManager =
-            new dash::DASHManager( p_conManager, p_sys->p_mpd,
-                                   dash::logic::IAdaptationLogic::RateBased, p_stream);
+    dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
+                                          dash::logic::IAdaptationLogic::RateBased,
+                                          p_stream);
 
     if ( p_dashManager->getMpdManager()           == NULL   ||
          p_dashManager->getMpdManager()->getMPD() == NULL   ||
          p_dashManager->getAdaptionLogic()        == NULL   ||
          p_dashManager->start()                   == false)
     {
-        delete p_conManager;
         delete p_dashManager;
         free( p_sys );
         return VLC_EGENERIC;
     }
     p_sys->p_dashManager    = p_dashManager;
-    p_sys->p_conManager     = p_conManager;
     p_sys->position         = 0;
     p_sys->isLive           = p_dashManager->getMpdManager()->getMPD()->isLive();
     p_stream->p_sys         = p_sys;
@@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
     stream_t                            *p_stream       = (stream_t*) p_obj;
     stream_sys_t                        *p_sys          = (stream_sys_t *) p_stream->p_sys;
     dash::DASHManager                   *p_dashManager  = p_sys->p_dashManager;
-    dash::http::HTTPConnectionManager   *p_conManager   = p_sys->p_conManager;
 
-    delete(p_conManager);
     delete(p_dashManager);
     free(p_sys);
 }
diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
index c0e18b2..6db8956 100644
--- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
+++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
@@ -31,118 +31,70 @@
 using namespace dash::http;
 using namespace dash::logic;
 
-HTTPConnectionManager::HTTPConnectionManager    (stream_t *stream)
+const size_t    HTTPConnectionManager::PIPELINE               = 80;
+const size_t    HTTPConnectionManager::PIPELINELENGTH         = 2;
+const uint64_t  HTTPConnectionManager::CHUNKDEFAULTBITRATE    = 1;
+
+HTTPConnectionManager::HTTPConnectionManager    (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
+                       adaptationLogic          (adaptationLogic),
+                       stream                   (stream),
+                       chunkCount               (0),
+                       bpsAvg                   (0),
+                       bpsLastChunk             (0),
+                       bpsCurrentChunk          (0),
+                       bytesReadSession         (0),
+                       bytesReadChunk           (0),
+                       timeSession              (0),
+                       timeChunk                (0)
 {
-    this->timeSecSession    = 0;
-    this->bytesReadSession  = 0;
-    this->timeSecChunk      = 0;
-    this->bytesReadChunk    = 0;
-    this->bpsAvg            = 0;
-    this->bpsLastChunk      = 0;
-    this->chunkCount        = 0;
-    this->stream            = stream;
 }
 HTTPConnectionManager::~HTTPConnectionManager   ()
 {
     this->closeAllConnections();
 }
 
-bool                HTTPConnectionManager::closeConnection( IHTTPConnection *con )
+void                                HTTPConnectionManager::closeAllConnections      ()
 {
-    for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
-        it != this->chunkMap.end(); ++it)
-    {
-        if( it->second == con )
-        {
-            delete con;
-            this->chunkMap.erase( it );
-            return true;
-        }
-    }
-    return false;
+    vlc_delete_all(this->connectionPool);
+    vlc_delete_all(this->downloadQueue);
 }
-
-bool                HTTPConnectionManager::closeConnection( Chunk *chunk )
-{
-    HTTPConnection *con = this->chunkMap[chunk];
-    bool ret = this->closeConnection(con);
-    this->chunkMap.erase(chunk);
-    delete(chunk);
-    return ret;
-}
-
-void                HTTPConnectionManager::closeAllConnections      ()
+int                                 HTTPConnectionManager::read                     (block_t *block)
 {
-    std::map<Chunk *, HTTPConnection *>::iterator it;
+    if(this->downloadQueue.size() == 0)
+        if(!this->addChunk(this->adaptationLogic->getNextChunk()))
+            return 0;
 
-    for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
-        delete(it->second);
+    if(this->downloadQueue.front()->getPercentDownloaded() > HTTPConnectionManager::PIPELINE &&
+       this->downloadQueue.size() < HTTPConnectionManager::PIPELINELENGTH)
+        this->addChunk(this->adaptationLogic->getNextChunk());
 
-    this->chunkMap.clear();
-}
-
-int                 HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
-{
-    if(this->chunkMap.find(chunk) == this->chunkMap.end())
-    {
-        this->bytesReadChunk    = 0;
-        this->timeSecChunk      = 0;
-
-        if ( this->initConnection( chunk ) == NULL )
-            return -1;
-    }
+    int ret = 0;
 
     mtime_t start = mdate();
-    int ret = this->chunkMap[chunk]->read(p_buffer, len);
+    ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
     mtime_t end = mdate();
 
-    if( ret <= 0 )
-        this->closeConnection( chunk );
-    else
-    {
-        double time = ((double)(end - start)) / 1000000;
-
-        this->bytesReadSession += ret;
-        this->bytesReadChunk   += ret;
-        this->timeSecSession   += time;
-        this->timeSecChunk     += time;
-
+    block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
 
-        if(this->timeSecSession > 0)
-            this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
+    double time = ((double)(end - start)) / 1000000;
 
-        if(this->timeSecChunk > 0)
-            this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
-
-        if(this->chunkCount < 2)
-            this->bpsAvg = 0;
+    if(ret <= 0)
+    {
+        this->bpsLastChunk   = this->bpsCurrentChunk;
+        this->bytesReadChunk = 0;
+        this->timeChunk      = 0;
 
-        if(this->chunkCount < 2)
-            this->bpsLastChunk = 0;
+        delete(this->downloadQueue.front());
+        this->downloadQueue.pop_front();
 
-        this->notify();
+        return this->read(block);
     }
-    return ret;
-}
-
-int                 HTTPConnectionManager::peek                     (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
-{
-    if(this->chunkMap.find(chunk) == this->chunkMap.end())
+    else
     {
-        if ( this->initConnection(chunk) == NULL )
-            return -1;
+        this->updateStatistics(ret, time);
     }
-    return this->chunkMap[chunk]->peek(pp_peek, i_peek);
-}
 
-IHTTPConnection*     HTTPConnectionManager::initConnection(Chunk *chunk)
-{
-    HTTPConnection *con = new HTTPConnection(this->stream);
-    if ( con->init(chunk) == false )
-        return NULL;
-    this->chunkMap[chunk] = con;
-    this->chunkCount++;
-    return con;
+    return ret;
 }
 void                HTTPConnectionManager::attach                   (IDownloadRateObserver *observer)
 {
@@ -155,3 +107,60 @@ void                HTTPConnectionManager::notify                   ()
     for(size_t i = 0; i < this->rateObservers.size(); i++)
         this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
 }
+std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost    (const std::string &hostname)
+{
+    std::vector<PersistentConnection *> cons;
+
+    for(size_t i = 0; i < this->connectionPool.size(); i++)
+        if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
+            cons.push_back(this->connectionPool.at(i));
+
+    return cons;
+}
+void                                HTTPConnectionManager::updateStatistics         (int bytes, double time)
+{
+    this->bytesReadSession  += bytes;
+    this->bytesReadChunk    += bytes;
+    this->timeSession       += time;
+    this->timeChunk         += time;
+
+    this->bpsAvg            = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
+    this->bpsCurrentChunk   = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
+
+    if(this->bpsAvg < 0)
+        this->bpsAvg = 0;
+
+    if(this->bpsCurrentChunk < 0)
+        this->bpsCurrentChunk = 0;
+
+    this->notify();
+}
+bool                                HTTPConnectionManager::addChunk                 (Chunk *chunk)
+{
+    if(chunk == NULL)
+        return false;
+
+    this->downloadQueue.push_back(chunk);
+
+    std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
+
+    if(cons.size() == 0)
+    {
+        PersistentConnection *con = new PersistentConnection(this->stream);
+        this->connectionPool.push_back(con);
+        cons.push_back(con);
+    }
+
+    size_t pos = this->chunkCount % cons.size();
+
+    cons.at(pos)->addChunk(chunk);
+
+    chunk->setConnection(cons.at(pos));
+
+    this->chunkCount++;
+
+    if(chunk->getBitrate() <= 0)
+        chunk->setBitrate(HTTPConnectionManager::CHUNKDEFAULTBITRATE);
+
+    return true;
+}
diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
index 006ca12..b71b3c1 100644
--- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
+++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
@@ -29,14 +29,13 @@
 
 #include <string>
 #include <vector>
+#include <deque>
 #include <iostream>
 #include <ctime>
-#include <map>
 #include <limits.h>
 
-#include "http/HTTPConnection.h"
-#include "http/Chunk.h"
-#include "adaptationlogic/IDownloadRateObserver.h"
+#include "http/PersistentConnection.h"
+#include "adaptationlogic/IAdaptationLogic.h"
 
 namespace dash
 {
@@ -45,31 +44,36 @@ namespace dash
         class HTTPConnectionManager
         {
             public:
-                HTTPConnectionManager           (stream_t *stream);
+                HTTPConnectionManager           (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
                 virtual ~HTTPConnectionManager  ();
 
-                void                closeAllConnections ();
-                bool                closeConnection     (IHTTPConnection *con);
-                int                 read                (Chunk *chunk, void *p_buffer, size_t len);
-                int                 peek                (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
-                void                attach              (dash::logic::IDownloadRateObserver *observer);
-                void                notify              ();
+                void    closeAllConnections ();
+                bool    addChunk            (Chunk *chunk);
+                int     read                (block_t *block);
+                void    attach              (dash::logic::IDownloadRateObserver *observer);
+                void    notify              ();
 
             private:
-                std::map<Chunk *, HTTPConnection *>                 chunkMap;
-                std::map<std::string, HTTPConnection *>             urlMap;
                 std::vector<dash::logic::IDownloadRateObserver *>   rateObservers;
-                uint64_t                                            bpsAvg;
-                uint64_t                                            bpsLastChunk;
-                long                                                bytesReadSession;
-                double                                              timeSecSession;
-                long                                                bytesReadChunk;
-                double                                              timeSecChunk;
+                std::deque<Chunk *>                                 downloadQueue;
+                std::vector<PersistentConnection *>                 connectionPool;
+                logic::IAdaptationLogic                             *adaptationLogic;
                 stream_t                                            *stream;
                 int                                                 chunkCount;
+                int64_t                                             bpsAvg;
+                int64_t                                             bpsLastChunk;
+                int64_t                                             bpsCurrentChunk;
+                int64_t                                             bytesReadSession;
+                int64_t                                             bytesReadChunk;
+                double                                              timeSession;
+                double                                              timeChunk;
 
-                bool                closeConnection( Chunk *chunk );
-                IHTTPConnection*    initConnection( Chunk *chunk );
+                static const size_t     PIPELINE;
+                static const size_t     PIPELINELENGTH;
+                static const uint64_t   CHUNKDEFAULTBITRATE;
+
+                std::vector<PersistentConnection *>     getConnectionsForHost   (const std::string &hostname);
+                void                                    updateStatistics        (int bytes, double time);
 
         };
     }
-- 
1.7.0.4




More information about the vlc-devel mailing list