[vlc-devel] [PATCH 08/12] dash: add persistentconnection to connectionmanager
Hugo Beauzée-Luyssen
beauze.h at gmail.com
Tue Mar 13 16:31:44 CET 2012
On Tue, Mar 13, 2012 at 3:18 PM, <Christopher at mailsrv.uni-klu.ac.at> wrote:
> From: Christopher Mueller <christopher.mueller at itec.aau.at>
>
> ---
> modules/stream_filter/dash/DASHDownloader.cpp | 40 ++---
> modules/stream_filter/dash/DASHDownloader.h | 3 +-
> modules/stream_filter/dash/DASHManager.cpp | 8 +-
> modules/stream_filter/dash/DASHManager.h | 2 +-
> modules/stream_filter/dash/dash.cpp | 21 +--
> .../dash/http/HTTPConnectionManager.cpp | 189 ++++++++++---------
> .../dash/http/HTTPConnectionManager.h | 46 +++---
> 7 files changed, 149 insertions(+), 160 deletions(-)
>
> diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
> index 0fd5c92..9388e34 100644
> --- a/modules/stream_filter/dash/DASHDownloader.cpp
> +++ b/modules/stream_filter/dash/DASHDownloader.cpp
> @@ -32,11 +32,11 @@ using namespace dash::http;
> using namespace dash::logic;
> using namespace dash::buffer;
>
> -DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
> +
> +DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, BlockBuffer *buffer)
> {
> this->t_sys = (thread_sys_t *) malloc(sizeof(thread_sys_t));
> this->t_sys->conManager = conManager;
> - this->t_sys->adaptationLogic = adaptationLogic;
> this->t_sys->buffer = buffer;
> }
> DASHDownloader::~DASHDownloader ()
> @@ -57,42 +57,24 @@ void* DASHDownloader::download (void *thread_sys)
> {
> thread_sys_t *t_sys = (thread_sys_t *) thread_sys;
> HTTPConnectionManager *conManager = t_sys->conManager;
> - IAdaptationLogic *adaptationLogic = t_sys->adaptationLogic;
> BlockBuffer *buffer = t_sys->buffer;
> - Chunk *currentChunk = NULL;
> block_t *block = block_Alloc(BLOCKSIZE);
> + int ret = 0;
>
> do
> {
> - if(currentChunk == NULL)
> - {
> - currentChunk = adaptationLogic->getNextChunk();
> - if(currentChunk == NULL)
> - {
> - buffer->setEOF(true);
> - }
> - }
> - else
> + ret = conManager->read(block);
> + if(ret > 0)
> {
> - int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
> - if(ret <= 0)
> - {
> - currentChunk = NULL;
> - }
> - else
> - {
> - block_t *bufBlock = block_Alloc(ret);
> - memcpy(bufBlock->p_buffer, block->p_buffer, ret);
> -
> - if(currentChunk->getBitrate() <= 0)
> - currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
> + block_t *bufBlock = block_Alloc(ret);
> + memcpy(bufBlock->p_buffer, block->p_buffer, ret);
>
> - bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
> - buffer->put(bufBlock);
> - }
> + bufBlock->i_length = block->i_length;
> + buffer->put(bufBlock);
> }
> - }while(!buffer->getEOF());
> + }while(ret && !buffer->getEOF());
>
> + buffer->setEOF(true);
> block_Release(block);
>
> return NULL;
> diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
> index 1fcdf19..7807ec5 100644
> --- a/modules/stream_filter/dash/DASHDownloader.h
> +++ b/modules/stream_filter/dash/DASHDownloader.h
> @@ -39,14 +39,13 @@ namespace dash
> struct thread_sys_t
> {
> dash::http::HTTPConnectionManager *conManager;
> - logic::IAdaptationLogic *adaptationLogic;
> buffer::BlockBuffer *buffer;
> };
>
> class DASHDownloader
> {
> public:
> - DASHDownloader (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
> + DASHDownloader (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
> virtual ~DASHDownloader ();
>
> bool start ();
> diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
> index f649015..bb2f3f0 100644
> --- a/modules/stream_filter/dash/DASHManager.cpp
> +++ b/modules/stream_filter/dash/DASHManager.cpp
> @@ -34,9 +34,8 @@ using namespace dash::logic;
> using namespace dash::mpd;
> using namespace dash::buffer;
>
> -DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
> +DASHManager::DASHManager ( MPD *mpd,
> IAdaptationLogic::LogicType type, stream_t *stream) :
> - conManager ( conManager ),
> currentChunk ( NULL ),
> adaptationLogic( NULL ),
> logicType ( type ),
> @@ -51,8 +50,10 @@ DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
> if ( this->adaptationLogic == NULL )
> return ;
>
> + this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
> +
> this->buffer = new BlockBuffer(this->stream);
> - this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
> + this->downloader = new DASHDownloader(this->conManager, this->buffer);
>
> this->conManager->attach(this->adaptationLogic);
> this->buffer->attach(this->adaptationLogic);
> @@ -61,6 +62,7 @@ DASHManager::~DASHManager ()
> {
> delete this->downloader;
> delete this->buffer;
> + delete this->conManager;
> delete this->adaptationLogic;
> delete this->mpdManager;
> }
> diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
> index a09e98e..ab76a4a 100644
> --- a/modules/stream_filter/dash/DASHManager.h
> +++ b/modules/stream_filter/dash/DASHManager.h
> @@ -40,7 +40,7 @@ namespace dash
> class DASHManager
> {
> public:
> - DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
> + DASHManager( mpd::MPD *mpd,
> logic::IAdaptationLogic::LogicType type, stream_t *stream);
> virtual ~DASHManager ();
>
> diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
> index d1ca988..ee42beb 100644
> --- a/modules/stream_filter/dash/dash.cpp
> +++ b/modules/stream_filter/dash/dash.cpp
> @@ -74,11 +74,10 @@ vlc_module_end ()
> *****************************************************************************/
> struct stream_sys_t
> {
> - dash::DASHManager *p_dashManager;
> - dash::http::HTTPConnectionManager *p_conManager;
> - dash::mpd::MPD *p_mpd;
> - int position;
> - bool isLive;
> + dash::DASHManager *p_dashManager;
> + dash::mpd::MPD *p_mpd;
> + int position;
> + bool isLive;
> };
>
> static int Read (stream_t *p_stream, void *p_buffer, unsigned int i_len);
> @@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
> return VLC_ENOMEM;
>
> p_sys->p_mpd = mpd;
> - dash::http::HTTPConnectionManager *p_conManager =
> - new dash::http::HTTPConnectionManager( p_stream );
> - dash::DASHManager*p_dashManager =
> - new dash::DASHManager( p_conManager, p_sys->p_mpd,
> - dash::logic::IAdaptationLogic::RateBased, p_stream);
> + dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
> + dash::logic::IAdaptationLogic::RateBased,
> + p_stream);
>
> if ( p_dashManager->getMpdManager() == NULL ||
> p_dashManager->getMpdManager()->getMPD() == NULL ||
> p_dashManager->getAdaptionLogic() == NULL ||
> p_dashManager->start() == false)
> {
> - delete p_conManager;
> delete p_dashManager;
> free( p_sys );
> return VLC_EGENERIC;
> }
> p_sys->p_dashManager = p_dashManager;
> - p_sys->p_conManager = p_conManager;
> p_sys->position = 0;
> p_sys->isLive = p_dashManager->getMpdManager()->getMPD()->isLive();
> p_stream->p_sys = p_sys;
> @@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
> stream_t *p_stream = (stream_t*) p_obj;
> stream_sys_t *p_sys = (stream_sys_t *) p_stream->p_sys;
> dash::DASHManager *p_dashManager = p_sys->p_dashManager;
> - dash::http::HTTPConnectionManager *p_conManager = p_sys->p_conManager;
>
> - delete(p_conManager);
> delete(p_dashManager);
> free(p_sys);
> }
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> index c0e18b2..2059d05 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
> @@ -31,127 +31,136 @@
> using namespace dash::http;
> using namespace dash::logic;
>
> -HTTPConnectionManager::HTTPConnectionManager (stream_t *stream)
> +const int HTTPConnectionManager::PIPELINE = 80;
> +const int HTTPConnectionManager::PIPELINELENGTH = 2;
> +const int HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
> +
> +HTTPConnectionManager::HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
> + adaptationLogic (adaptationLogic),
> + stream (stream),
> + chunkCount (0),
> + bpsAvg (0),
> + bpsLastChunk (0),
> + bpsCurrentChunk (0),
> + bytesReadSession (0),
> + bytesReadChunk (0),
> + timeSession (0),
> + timeChunk (0)
> {
> - this->timeSecSession = 0;
> - this->bytesReadSession = 0;
> - this->timeSecChunk = 0;
> - this->bytesReadChunk = 0;
> - this->bpsAvg = 0;
> - this->bpsLastChunk = 0;
> - this->chunkCount = 0;
> - this->stream = stream;
> }
> HTTPConnectionManager::~HTTPConnectionManager ()
> {
> this->closeAllConnections();
> }
>
> -bool HTTPConnectionManager::closeConnection( IHTTPConnection *con )
> +void HTTPConnectionManager::closeAllConnections ()
> {
> - for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
> - it != this->chunkMap.end(); ++it)
> - {
> - if( it->second == con )
> - {
> - delete con;
> - this->chunkMap.erase( it );
> - return true;
> - }
> - }
> - return false;
> + vlc_delete_all(this->connectionPool);
> + vlc_delete_all(this->downloadQueue);
> }
> -
> -bool HTTPConnectionManager::closeConnection( Chunk *chunk )
> -{
> - HTTPConnection *con = this->chunkMap[chunk];
> - bool ret = this->closeConnection(con);
> - this->chunkMap.erase(chunk);
> - delete(chunk);
> - return ret;
> -}
> -
> -void HTTPConnectionManager::closeAllConnections ()
> +int HTTPConnectionManager::read (block_t *block)
> {
> - std::map<Chunk *, HTTPConnection *>::iterator it;
> + if(this->downloadQueue.size() == 0)
> + if(!this->addChunk(this->adaptationLogic->getNextChunk()))
> + return 0;
>
> - for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
> - delete(it->second);
> + if(this->downloadQueue.front()->getPercentDownloaded() > this->PIPELINE &&
> + this->downloadQueue.size() < this->PIPELINELENGTH)
> + this->addChunk(this->adaptationLogic->getNextChunk());
>
> - this->chunkMap.clear();
> -}
> -
> -int HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
> -{
> - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> - {
> - this->bytesReadChunk = 0;
> - this->timeSecChunk = 0;
> -
> - if ( this->initConnection( chunk ) == NULL )
> - return -1;
> - }
> + int ret = 0;
>
> mtime_t start = mdate();
> - int ret = this->chunkMap[chunk]->read(p_buffer, len);
> + ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
> mtime_t end = mdate();
>
> - if( ret <= 0 )
> - this->closeConnection( chunk );
> - else
> - {
> - double time = ((double)(end - start)) / 1000000;
> -
> - this->bytesReadSession += ret;
> - this->bytesReadChunk += ret;
> - this->timeSecSession += time;
> - this->timeSecChunk += time;
> -
> + block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
>
> - if(this->timeSecSession > 0)
> - this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
> + double time = ((double)(end - start)) / 1000000;
>
> - if(this->timeSecChunk > 0)
> - this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
> -
> - if(this->chunkCount < 2)
> - this->bpsAvg = 0;
> + if(ret <= 0)
> + {
> + this->bpsLastChunk = this->bpsCurrentChunk;
> + this->bytesReadChunk = 0;
> + this->timeChunk = 0;
>
> - if(this->chunkCount < 2)
> - this->bpsLastChunk = 0;
> + delete(this->downloadQueue.front());
> + this->downloadQueue.pop_front();
>
> - this->notify();
> + return this->read(block);
> }
> - return ret;
> -}
> -
> -int HTTPConnectionManager::peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
> -{
> - if(this->chunkMap.find(chunk) == this->chunkMap.end())
> + else
> {
> - if ( this->initConnection(chunk) == NULL )
> - return -1;
> + this->updateStatistics(ret, time);
> }
> - return this->chunkMap[chunk]->peek(pp_peek, i_peek);
> -}
>
> -IHTTPConnection* HTTPConnectionManager::initConnection(Chunk *chunk)
> -{
> - HTTPConnection *con = new HTTPConnection(this->stream);
> - if ( con->init(chunk) == false )
> - return NULL;
> - this->chunkMap[chunk] = con;
> - this->chunkCount++;
> - return con;
> + return ret;
> }
> -void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
> +void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
> {
> this->rateObservers.push_back(observer);
> }
> -void HTTPConnectionManager::notify ()
> +void HTTPConnectionManager::notify ()
> {
> if ( this->bpsAvg == 0 )
> return ;
> for(size_t i = 0; i < this->rateObservers.size(); i++)
> this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
> }
> +std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost (const std::string &hostname)
> +{
> + std::vector<PersistentConnection *> cons;
> +
> + for(size_t i = 0; i < this->connectionPool.size(); i++)
> + if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
> + cons.push_back(this->connectionPool.at(i));
> +
> + return cons;
> +}
> +void HTTPConnectionManager::updateStatistics (int bytes, double time)
> +{
> + this->bytesReadSession += bytes;
> + this->bytesReadChunk += bytes;
> + this->timeSession += time;
> + this->timeChunk += time;
> +
> + this->bpsAvg = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
> + this->bpsCurrentChunk = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
> +
> + if(this->bpsAvg < 0)
> + this->bpsAvg = 0;
> +
> + if(this->bpsCurrentChunk < 0)
> + this->bpsCurrentChunk = 0;
> +
> + this->notify();
> +}
> +bool HTTPConnectionManager::addChunk (Chunk *chunk)
> +{
> + if(chunk == NULL)
> + return false;
> +
> + this->downloadQueue.push_back(chunk);
> +
> + std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
> +
> + if(cons.size() == 0)
> + {
> + PersistentConnection *con = new PersistentConnection(this->stream);
> + this->connectionPool.push_back(con);
> + cons.push_back(con);
> + }
> +
> + size_t pos = this->chunkCount % cons.size();
> +
> + cons.at(pos)->addChunk(chunk);
> +
> + chunk->setConnection(cons.at(pos));
> +
> + this->chunkCount++;
> +
> + if(chunk->getBitrate() <= 0)
> + chunk->setBitrate(this->CHUNKDEFAULTBITRATE);
> +
> + return true;
> +}
> diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> index 006ca12..c2f69cf 100644
> --- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
> +++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
> @@ -29,14 +29,13 @@
>
> #include <string>
> #include <vector>
> +#include <deque>
> #include <iostream>
> #include <ctime>
> -#include <map>
> #include <limits.h>
>
> -#include "http/HTTPConnection.h"
> -#include "http/Chunk.h"
> -#include "adaptationlogic/IDownloadRateObserver.h"
> +#include "http/PersistentConnection.h"
> +#include "adaptationlogic/IAdaptationLogic.h"
>
> namespace dash
> {
> @@ -45,31 +44,36 @@ namespace dash
> class HTTPConnectionManager
> {
> public:
> - HTTPConnectionManager (stream_t *stream);
> + HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
> virtual ~HTTPConnectionManager ();
>
> - void closeAllConnections ();
> - bool closeConnection (IHTTPConnection *con);
> - int read (Chunk *chunk, void *p_buffer, size_t len);
> - int peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
> - void attach (dash::logic::IDownloadRateObserver *observer);
> - void notify ();
> + void closeAllConnections ();
> + bool addChunk (Chunk *chunk);
> + int read (block_t *block);
> + void attach (dash::logic::IDownloadRateObserver *observer);
> + void notify ();
>
> private:
> - std::map<Chunk *, HTTPConnection *> chunkMap;
> - std::map<std::string, HTTPConnection *> urlMap;
> std::vector<dash::logic::IDownloadRateObserver *> rateObservers;
> - uint64_t bpsAvg;
> - uint64_t bpsLastChunk;
> - long bytesReadSession;
> - double timeSecSession;
> - long bytesReadChunk;
> - double timeSecChunk;
> + std::deque<Chunk *> downloadQueue;
> + std::vector<PersistentConnection *> connectionPool;
> + logic::IAdaptationLogic *adaptationLogic;
> stream_t *stream;
> int chunkCount;
> + int64_t bpsAvg;
> + int64_t bpsLastChunk;
> + int64_t bpsCurrentChunk;
> + int64_t bytesReadSession;
> + int64_t bytesReadChunk;
> + double timeSession;
> + double timeChunk;
>
> - bool closeConnection( Chunk *chunk );
> - IHTTPConnection* initConnection( Chunk *chunk );
> + static const int PIPELINE;
> + static const int PIPELINELENGTH;
> + static const int CHUNKDEFAULTBITRATE;
> +
> + std::vector<PersistentConnection *> getConnectionsForHost (const std::string &hostname);
> + void updateStatistics (int bytes, double time);
>
> };
> }
> --
> 1.7.0.4
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> http://mailman.videolan.org/listinfo/vlc-devel
Please split this commit in multiple parts. Not every change are
relative to your commit message.
Looks like the Chunk are deleted from the HTTPConnectionManager DTOR,
so the leak is kinda fixed. Although, with a big stream, this will be
an issue.
Regards,
--
Hugo Beauzée-Luyssen
More information about the vlc-devel
mailing list