[vlc-devel] [PATCH 1/6] dash: enabled persistent connections and pipelining
Hugo Beauzée-Luyssen
beauze.h at gmail.com
Wed Apr 11 12:30:51 CEST 2012
On Tue, Apr 3, 2012 at 4:41 PM, Christopher Mueller <"Christopher
Mueller"@mailsrv.uni-klu.ac.at> wrote:
> From: Christopher Mueller <christopher.mueller at itec.aau.at>
>
> ---
> modules/stream_filter/dash/DASHDownloader.cpp | 40 ++---
> modules/stream_filter/dash/DASHDownloader.h | 3 +-
> modules/stream_filter/dash/DASHManager.cpp | 8 +-
> modules/stream_filter/dash/DASHManager.h | 2 +-
> modules/stream_filter/dash/dash.cpp | 17 +--
> .../dash/http/HTTPConnectionManager.cpp | 185 ++++++++++---------
> .../dash/http/HTTPConnectionManager.h | 46 +++---
> 7 files changed, 145 insertions(+), 156 deletions(-)
>
> diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
> index 0fd5c92..9388e34 100644
> --- a/modules/stream_filter/dash/DASHDownloader.cpp
> +++ b/modules/stream_filter/dash/DASHDownloader.cpp
> @@ -32,11 +32,11 @@ using namespace dash::http;
> using namespace dash::logic;
> using namespace dash::buffer;
>
> -DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
> +
> +DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, BlockBuffer *buffer)
> {
> this->t_sys = (thread_sys_t *) malloc(sizeof(thread_sys_t));
> this->t_sys->conManager = conManager;
> - this->t_sys->adaptationLogic = adaptationLogic;
> this->t_sys->buffer = buffer;
> }
> DASHDownloader::~DASHDownloader ()
> @@ -57,42 +57,24 @@ void* DASHDownloader::download (void *thread_sys)
> {
> thread_sys_t *t_sys = (thread_sys_t *) thread_sys;
> HTTPConnectionManager *conManager = t_sys->conManager;
> - IAdaptationLogic *adaptationLogic = t_sys->adaptationLogic;
> BlockBuffer *buffer = t_sys->buffer;
> - Chunk *currentChunk = NULL;
> block_t *block = block_Alloc(BLOCKSIZE);
> + int ret = 0;
>
> do
> {
> - if(currentChunk == NULL)
> - {
> - currentChunk = adaptationLogic->getNextChunk();
> - if(currentChunk == NULL)
> - {
> - buffer->setEOF(true);
> - }
> - }
> - else
> + ret = conManager->read(block);
> + if(ret > 0)
> {
> - int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
> - if(ret <= 0)
> - {
> - currentChunk = NULL;
> - }
> - else
> - {
> - block_t *bufBlock = block_Alloc(ret);
> - memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> -
> - if(currentChunk->getBitrate() <= 0)
> - currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
> + block_t *bufBlock = block_Alloc(ret);
> + memcpy(bufBlock->p_buffer, block->p_buffer, ret);
>
> - bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
> - buffer->put(bufBlock);
> - }
> + bufBlock->i_length = block->i_length;
> + buffer->put(bufBlock);
> }
> - }while(!buffer->getEOF());
> + }while(ret && !buffer->getEOF());
>
> + buffer->setEOF(true);
> block_Release(block);
>
> return NULL;
> diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
> index 1fcdf19..7807ec5 100644
> --- a/modules/stream_filter/dash/DASHDownloader.h
> +++ b/modules/stream_filter/dash/DASHDownloader.h
> @@ -39,14 +39,13 @@ namespace dash
> struct thread_sys_t
> {
> dash::http::HTTPConnectionManager *conManager;
> - logic::IAdaptationLogic *adaptationLogic;
> buffer::BlockBuffer *buffer;
> };
>
> class DASHDownloader
> {
> public:
> - DASHDownloader (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
> + DASHDownloader (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
> virtual ~DASHDownloader ();
>
> bool start ();
> diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
> index 8483c9d..b897019 100644
> --- a/modules/stream_filter/dash/DASHManager.cpp
> +++ b/modules/stream_filter/dash/DASHManager.cpp
> @@ -34,9 +34,8 @@ using namespace dash::logic;
> using namespace dash::mpd;
> using namespace dash::buffer;
>
> -DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
> +DASHManager::DASHManager ( MPD *mpd,
> IAdaptationLogic::LogicType type, stream_t *stream) :
> - conManager ( conManager ),
> currentChunk ( NULL ),
> adaptationLogic( NULL ),
> logicType ( type ),
> @@ -51,8 +50,10 @@ DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
> if ( this->adaptationLogic == NULL )
> return ;
>
> + this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
> +
> this->buffer = new BlockBuffer(this->stream);
> - this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
> + this->downloader = new DASHDownloader(this->conManager, this->buffer);
>
> this->conManager->attach(this->adaptationLogic);
> this->buffer->attach(this->adaptationLogic);
> @@ -61,6 +62,7 @@ DASHManager::~DASHManager ()
> {
> delete this->downloader;
> delete this->buffer;
> + delete this->conManager;
> delete this->adaptationLogic;
> delete this->mpdManager;
> }
> diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
> index fe42761..31ea7dd 100644
> --- a/modules/stream_filter/dash/DASHManager.h
> +++ b/modules/stream_filter/dash/DASHManager.h
> @@ -40,7 +40,7 @@ namespace dash
> class DASHManager
> {
> public:
> - DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
> + DASHManager( mpd::MPD *mpd,
> logic::IAdaptationLogic::LogicType type, stream_t *stream);
> virtual ~DASHManager ();
>
> diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
> index 9003a9d..56eaa9f 100644
> --- a/modules/stream_filter/dash/dash.cpp
> +++ b/modules/stream_filter/dash/dash.cpp
> @@ -74,9 +74,8 @@ vlc_module_end ()
> *****************************************************************************/
> struct stream_sys_t
> {
> - dash::DASHManager *p_dashManager;
> - dash::http::HTTPConnectionManager *p_conManager;
> - dash::mpd::MPD *p_mpd;
> + dash::DASHManager *p_dashManager;
> + dash::mpd::MPD *p_mpd;
> uint64_t position;
> bool isLive;
> };
> @@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
> return VLC_ENOMEM;
>
> p_sys->p_mpd = mpd;
> - dash::http::HTTPConnectionManager *p_conManager =
> - new dash::http::HTTPConnectionManager( p_stream );
> - dash::DASHManager*p_dashManager =
> - new dash::DASHManager( p_conManager, p_sys->p_mpd,
> - dash::logic::IAdaptationLogic::RateBased, p_stream);
> + dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
> + dash::logic::IAdaptationLogic::RateBased,
> + p_stream);
>
> if ( p_dashManager->getMpdManager() == NULL ||
> p_dashManager->getMpdManager()->getMPD() == NULL ||
> p_dashManager->getAdaptionLogic() == NULL ||
> p_dashManager->start() == false)
> {
> - delete p_conManager;
> delete p_dashManager;
> free( p_sys );
> return VLC_EGENERIC;
> }
> p_sys->p_dashManager = p_dashManager;
> - p_sys->p_conManager = p_conManager;
> p_sys->position = 0;
> p_sys->isLive = p_dashManager->getMpdManager()->getMPD()->isLive();
> p_stream->p_sys = p_sys;
> @@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
> stream_t *p_stream = (stream_t*) p_obj;
> stream_sys_t *p_sys = (stream_sys_t *) p_stream->p_sys;
> dash::DASHManager *p_dashManager = p_sys->p_dashManager;
> - dash::http::HTTPConnectionManager *p_conManager = p_sys->p_conManager;
>
> - delete(p_conManager);
> delete(p_dashManager);
> free(p_sys);
> }
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> index c0e18b2..cd4fade 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> @@ -31,118 +31,70 @@
> using namespace dash::http;
> using namespace dash::logic;
>
> -HTTPConnectionManager::HTTPConnectionManager (stream_t *stream)
> +const size_t HTTPConnectionManager::PIPELINE = 80;
> +const size_t HTTPConnectionManager::PIPELINELENGTH = 2;
> +const uint64_t HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
> +
> +HTTPConnectionManager::HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
> + adaptationLogic (adaptationLogic),
> + stream (stream),
> + chunkCount (0),
> + bpsAvg (0),
> + bpsLastChunk (0),
> + bpsCurrentChunk (0),
> + bytesReadSession (0),
> + bytesReadChunk (0),
> + timeSession (0),
> + timeChunk (0)
> {
> - this->timeSecSession = 0;
> - this->bytesReadSession = 0;
> - this->timeSecChunk = 0;
> - this->bytesReadChunk = 0;
> - this->bpsAvg = 0;
> - this->bpsLastChunk = 0;
> - this->chunkCount = 0;
> - this->stream = stream;
> }
> HTTPConnectionManager::~HTTPConnectionManager ()
> {
> this->closeAllConnections();
> }
>
> -bool HTTPConnectionManager::closeConnection( IHTTPConnection *con )
> +void HTTPConnectionManager::closeAllConnections ()
> {
> - for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
> - it != this->chunkMap.end(); ++it)
> - {
> - if( it->second == con )
> - {
> - delete con;
> - this->chunkMap.erase( it );
> - return true;
> - }
> - }
> - return false;
> + vlc_delete_all(this->connectionPool);
> + vlc_delete_all(this->downloadQueue);
> }
> -
> -bool HTTPConnectionManager::closeConnection( Chunk *chunk )
> -{
> - HTTPConnection *con = this->chunkMap[chunk];
> - bool ret = this->closeConnection(con);
> - this->chunkMap.erase(chunk);
> - delete(chunk);
> - return ret;
> -}
> -
> -void HTTPConnectionManager::closeAllConnections ()
> +int HTTPConnectionManager::read (block_t *block)
> {
> - std::map<Chunk *, HTTPConnection *>::iterator it;
> + if(this->downloadQueue.size() == 0)
> + if(!this->addChunk(this->adaptationLogic->getNextChunk()))
> + return 0;
>
> - for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
> - delete(it->second);
> + if(this->downloadQueue.front()->getPercentDownloaded() > this->PIPELINE &&
> + this->downloadQueue.size() < this->PIPELINELENGTH)
You shouldn't access a static member using this. Use
HTTPConnectionManager:: instead.
> + this->addChunk(this->adaptationLogic->getNextChunk());
>
> - this->chunkMap.clear();
> -}
> -
> -int HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
> -{
> - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> - {
> - this->bytesReadChunk = 0;
> - this->timeSecChunk = 0;
> -
> - if ( this->initConnection( chunk ) == NULL )
> - return -1;
> - }
> + int ret = 0;
>
> mtime_t start = mdate();
> - int ret = this->chunkMap[chunk]->read(p_buffer, len);
> + ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
> mtime_t end = mdate();
>
> - if( ret <= 0 )
> - this->closeConnection( chunk );
> - else
> - {
> - double time = ((double)(end - start)) / 1000000;
> -
> - this->bytesReadSession += ret;
> - this->bytesReadChunk += ret;
> - this->timeSecSession += time;
> - this->timeSecChunk += time;
> -
> + block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
>
> - if(this->timeSecSession > 0)
> - this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
> + double time = ((double)(end - start)) / 1000000;
>
> - if(this->timeSecChunk > 0)
> - this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
> -
> - if(this->chunkCount < 2)
> - this->bpsAvg = 0;
> + if(ret <= 0)
> + {
> + this->bpsLastChunk = this->bpsCurrentChunk;
> + this->bytesReadChunk = 0;
> + this->timeChunk = 0;
>
> - if(this->chunkCount < 2)
> - this->bpsLastChunk = 0;
> + delete(this->downloadQueue.front());
> + this->downloadQueue.pop_front();
>
> - this->notify();
> + return this->read(block);
> }
> - return ret;
> -}
> -
> -int HTTPConnectionManager::peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
> -{
> - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> + else
> {
> - if ( this->initConnection(chunk) == NULL )
> - return -1;
> + this->updateStatistics(ret, time);
> }
> - return this->chunkMap[chunk]->peek(pp_peek, i_peek);
> -}
>
> -IHTTPConnection* HTTPConnectionManager::initConnection(Chunk *chunk)
> -{
> - HTTPConnection *con = new HTTPConnection(this->stream);
> - if ( con->init(chunk) == false )
> - return NULL;
> - this->chunkMap[chunk] = con;
> - this->chunkCount++;
> - return con;
> + return ret;
> }
> void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
> {
> @@ -155,3 +107,60 @@ void HTTPConnectionManager::notify ()
> for(size_t i = 0; i < this->rateObservers.size(); i++)
> this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
> }
> +std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost (const std::string &hostname)
> +{
> + std::vector<PersistentConnection *> cons;
> +
> + for(size_t i = 0; i < this->connectionPool.size(); i++)
> + if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
> + cons.push_back(this->connectionPool.at(i));
> +
> + return cons;
> +}
> +void HTTPConnectionManager::updateStatistics (int bytes, double time)
> +{
> + this->bytesReadSession += bytes;
> + this->bytesReadChunk += bytes;
> + this->timeSession += time;
> + this->timeChunk += time;
> +
> + this->bpsAvg = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
> + this->bpsCurrentChunk = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
> +
> + if(this->bpsAvg < 0)
> + this->bpsAvg = 0;
> +
> + if(this->bpsCurrentChunk < 0)
> + this->bpsCurrentChunk = 0;
> +
> + this->notify();
> +}
> +bool HTTPConnectionManager::addChunk (Chunk *chunk)
> +{
> + if(chunk == NULL)
> + return false;
> +
> + this->downloadQueue.push_back(chunk);
> +
> + std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
> +
> + if(cons.size() == 0)
> + {
> + PersistentConnection *con = new PersistentConnection(this->stream);
> + this->connectionPool.push_back(con);
> + cons.push_back(con);
> + }
> +
> + size_t pos = this->chunkCount % cons.size();
> +
> + cons.at(pos)->addChunk(chunk);
> +
> + chunk->setConnection(cons.at(pos));
> +
> + this->chunkCount++;
> +
> + if(chunk->getBitrate() <= 0)
> + chunk->setBitrate(this->CHUNKDEFAULTBITRATE);
Same cosmetic/syntaxic remark.
> +
> + return true;
> +}
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> index 006ca12..b71b3c1 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> @@ -29,14 +29,13 @@
>
> #include <string>
> #include <vector>
> +#include <deque>
> #include <iostream>
> #include <ctime>
> -#include <map>
> #include <limits.h>
>
> -#include "http/HTTPConnection.h"
> -#include "http/Chunk.h"
> -#include "adaptationlogic/IDownloadRateObserver.h"
> +#include "http/PersistentConnection.h"
> +#include "adaptationlogic/IAdaptationLogic.h"
>
> namespace dash
> {
> @@ -45,31 +44,36 @@ namespace dash
> class HTTPConnectionManager
> {
> public:
> - HTTPConnectionManager (stream_t *stream);
> + HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
> virtual ~HTTPConnectionManager ();
>
> - void closeAllConnections ();
> - bool closeConnection (IHTTPConnection *con);
> - int read (Chunk *chunk, void *p_buffer, size_t len);
> - int peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
> - void attach (dash::logic::IDownloadRateObserver *observer);
> - void notify ();
> + void closeAllConnections ();
> + bool addChunk (Chunk *chunk);
> + int read (block_t *block);
> + void attach (dash::logic::IDownloadRateObserver *observer);
> + void notify ();
>
> private:
> - std::map<Chunk *, HTTPConnection *> chunkMap;
> - std::map<std::string, HTTPConnection *> urlMap;
> std::vector<dash::logic::IDownloadRateObserver *> rateObservers;
> - uint64_t bpsAvg;
> - uint64_t bpsLastChunk;
> - long bytesReadSession;
> - double timeSecSession;
> - long bytesReadChunk;
> - double timeSecChunk;
> + std::deque<Chunk *> downloadQueue;
> + std::vector<PersistentConnection *> connectionPool;
> + logic::IAdaptationLogic *adaptationLogic;
> stream_t *stream;
> int chunkCount;
> + int64_t bpsAvg;
> + int64_t bpsLastChunk;
> + int64_t bpsCurrentChunk;
> + int64_t bytesReadSession;
> + int64_t bytesReadChunk;
> + double timeSession;
> + double timeChunk;
>
> - bool closeConnection( Chunk *chunk );
> - IHTTPConnection* initConnection( Chunk *chunk );
> + static const size_t PIPELINE;
> + static const size_t PIPELINELENGTH;
> + static const uint64_t CHUNKDEFAULTBITRATE;
> +
> + std::vector<PersistentConnection *> getConnectionsForHost (const std::string &hostname);
> + void updateStatistics (int bytes, double time);
>
> };
> }
> --
> 1.7.0.4
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> http://mailman.videolan.org/listinfo/vlc-devel
Despite the syntaxic/cosmetic remark, looks good to me.
Regards,
--
Hugo Beauzée-Luyssen
More information about the vlc-devel
mailing list