[vlc-devel] [PATCH 8/9] dash: add persistentconnection to connectionmanager
Christopher at mailsrv.uni-klu.ac.at
Christopher at mailsrv.uni-klu.ac.at
Fri Mar 9 19:05:26 CET 2012
From: Christopher Mueller <christopher.mueller at itec.aau.at>
---
modules/stream_filter/dash/DASHDownloader.cpp | 40 ++---
modules/stream_filter/dash/DASHDownloader.h | 3 +-
modules/stream_filter/dash/DASHManager.cpp | 8 +-
modules/stream_filter/dash/DASHManager.h | 2 +-
modules/stream_filter/dash/dash.cpp | 21 +--
.../dash/http/HTTPConnectionManager.cpp | 184 ++++++++++----------
.../dash/http/HTTPConnectionManager.h | 47 +++---
7 files changed, 147 insertions(+), 158 deletions(-)
diff --git a/modules/stream_filter/dash/DASHDownloader.cpp b/modules/stream_filter/dash/DASHDownloader.cpp
index 0fd5c92..9388e34 100644
--- a/modules/stream_filter/dash/DASHDownloader.cpp
+++ b/modules/stream_filter/dash/DASHDownloader.cpp
@@ -32,11 +32,11 @@ using namespace dash::http;
using namespace dash::logic;
using namespace dash::buffer;
-DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, IAdaptationLogic *adaptationLogic, BlockBuffer *buffer)
+
+DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager, BlockBuffer *buffer)
{
this->t_sys = (thread_sys_t *) malloc(sizeof(thread_sys_t));
this->t_sys->conManager = conManager;
- this->t_sys->adaptationLogic = adaptationLogic;
this->t_sys->buffer = buffer;
}
DASHDownloader::~DASHDownloader ()
@@ -57,42 +57,24 @@ void* DASHDownloader::download (void *thread_sys)
{
thread_sys_t *t_sys = (thread_sys_t *) thread_sys;
HTTPConnectionManager *conManager = t_sys->conManager;
- IAdaptationLogic *adaptationLogic = t_sys->adaptationLogic;
BlockBuffer *buffer = t_sys->buffer;
- Chunk *currentChunk = NULL;
block_t *block = block_Alloc(BLOCKSIZE);
+ int ret = 0;
do
{
- if(currentChunk == NULL)
- {
- currentChunk = adaptationLogic->getNextChunk();
- if(currentChunk == NULL)
- {
- buffer->setEOF(true);
- }
- }
- else
+ ret = conManager->read(block);
+ if(ret > 0)
{
- int ret = conManager->read(currentChunk, block->p_buffer, block->i_buffer);
- if(ret <= 0)
- {
- currentChunk = NULL;
- }
- else
- {
- block_t *bufBlock = block_Alloc(ret);
- memcpy(bufBlock->p_buffer, block->p_buffer, ret);
-
- if(currentChunk->getBitrate() <= 0)
- currentChunk->setBitrate(CHUNKDEFAULTBITRATE);
+ block_t *bufBlock = block_Alloc(ret);
+ memcpy(bufBlock->p_buffer, block->p_buffer, ret);
- bufBlock->i_length = (mtime_t)((ret * 8) / ((float)currentChunk->getBitrate() / 1000000));
- buffer->put(bufBlock);
- }
+ bufBlock->i_length = block->i_length;
+ buffer->put(bufBlock);
}
- }while(!buffer->getEOF());
+ }while(ret && !buffer->getEOF());
+ buffer->setEOF(true);
block_Release(block);
return NULL;
diff --git a/modules/stream_filter/dash/DASHDownloader.h b/modules/stream_filter/dash/DASHDownloader.h
index 1fcdf19..7807ec5 100644
--- a/modules/stream_filter/dash/DASHDownloader.h
+++ b/modules/stream_filter/dash/DASHDownloader.h
@@ -39,14 +39,13 @@ namespace dash
struct thread_sys_t
{
dash::http::HTTPConnectionManager *conManager;
- logic::IAdaptationLogic *adaptationLogic;
buffer::BlockBuffer *buffer;
};
class DASHDownloader
{
public:
- DASHDownloader (http::HTTPConnectionManager *conManager, logic::IAdaptationLogic *adaptationLogic, buffer::BlockBuffer *buffer);
+ DASHDownloader (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
virtual ~DASHDownloader ();
bool start ();
diff --git a/modules/stream_filter/dash/DASHManager.cpp b/modules/stream_filter/dash/DASHManager.cpp
index f649015..bb2f3f0 100644
--- a/modules/stream_filter/dash/DASHManager.cpp
+++ b/modules/stream_filter/dash/DASHManager.cpp
@@ -34,9 +34,8 @@ using namespace dash::logic;
using namespace dash::mpd;
using namespace dash::buffer;
-DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
+DASHManager::DASHManager ( MPD *mpd,
IAdaptationLogic::LogicType type, stream_t *stream) :
- conManager ( conManager ),
currentChunk ( NULL ),
adaptationLogic( NULL ),
logicType ( type ),
@@ -51,8 +50,10 @@ DASHManager::DASHManager ( HTTPConnectionManager *conManager, MPD *mpd,
if ( this->adaptationLogic == NULL )
return ;
+ this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
+
this->buffer = new BlockBuffer(this->stream);
- this->downloader = new DASHDownloader(this->conManager, this->adaptationLogic, this->buffer);
+ this->downloader = new DASHDownloader(this->conManager, this->buffer);
this->conManager->attach(this->adaptationLogic);
this->buffer->attach(this->adaptationLogic);
@@ -61,6 +62,7 @@ DASHManager::~DASHManager ()
{
delete this->downloader;
delete this->buffer;
+ delete this->conManager;
delete this->adaptationLogic;
delete this->mpdManager;
}
diff --git a/modules/stream_filter/dash/DASHManager.h b/modules/stream_filter/dash/DASHManager.h
index a09e98e..ab76a4a 100644
--- a/modules/stream_filter/dash/DASHManager.h
+++ b/modules/stream_filter/dash/DASHManager.h
@@ -40,7 +40,7 @@ namespace dash
class DASHManager
{
public:
- DASHManager( http::HTTPConnectionManager *conManager, mpd::MPD *mpd,
+ DASHManager( mpd::MPD *mpd,
logic::IAdaptationLogic::LogicType type, stream_t *stream);
virtual ~DASHManager ();
diff --git a/modules/stream_filter/dash/dash.cpp b/modules/stream_filter/dash/dash.cpp
index d1ca988..ee42beb 100644
--- a/modules/stream_filter/dash/dash.cpp
+++ b/modules/stream_filter/dash/dash.cpp
@@ -74,11 +74,10 @@ vlc_module_end ()
*****************************************************************************/
struct stream_sys_t
{
- dash::DASHManager *p_dashManager;
- dash::http::HTTPConnectionManager *p_conManager;
- dash::mpd::MPD *p_mpd;
- int position;
- bool isLive;
+ dash::DASHManager *p_dashManager;
+ dash::mpd::MPD *p_mpd;
+ int position;
+ bool isLive;
};
static int Read (stream_t *p_stream, void *p_buffer, unsigned int i_len);
@@ -114,24 +113,20 @@ static int Open(vlc_object_t *p_obj)
return VLC_ENOMEM;
p_sys->p_mpd = mpd;
- dash::http::HTTPConnectionManager *p_conManager =
- new dash::http::HTTPConnectionManager( p_stream );
- dash::DASHManager*p_dashManager =
- new dash::DASHManager( p_conManager, p_sys->p_mpd,
- dash::logic::IAdaptationLogic::RateBased, p_stream);
+ dash::DASHManager*p_dashManager = new dash::DASHManager(p_sys->p_mpd,
+ dash::logic::IAdaptationLogic::RateBased,
+ p_stream);
if ( p_dashManager->getMpdManager() == NULL ||
p_dashManager->getMpdManager()->getMPD() == NULL ||
p_dashManager->getAdaptionLogic() == NULL ||
p_dashManager->start() == false)
{
- delete p_conManager;
delete p_dashManager;
free( p_sys );
return VLC_EGENERIC;
}
p_sys->p_dashManager = p_dashManager;
- p_sys->p_conManager = p_conManager;
p_sys->position = 0;
p_sys->isLive = p_dashManager->getMpdManager()->getMPD()->isLive();
p_stream->p_sys = p_sys;
@@ -151,9 +146,7 @@ static void Close(vlc_object_t *p_obj)
stream_t *p_stream = (stream_t*) p_obj;
stream_sys_t *p_sys = (stream_sys_t *) p_stream->p_sys;
dash::DASHManager *p_dashManager = p_sys->p_dashManager;
- dash::http::HTTPConnectionManager *p_conManager = p_sys->p_conManager;
- delete(p_conManager);
delete(p_dashManager);
free(p_sys);
}
diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
index c0e18b2..2bb9561 100644
--- a/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
+++ b/modules/stream_filter/dash/http/HTTPConnectionManager.cpp
@@ -31,127 +31,135 @@
using namespace dash::http;
using namespace dash::logic;
-HTTPConnectionManager::HTTPConnectionManager (stream_t *stream)
+HTTPConnectionManager::HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
+ adaptationLogic (adaptationLogic),
+ stream (stream),
+ chunkCount (0),
+ bpsAvg (0),
+ bpsLastChunk (0),
+ bpsCurrentChunk (0),
+ bytesReadSession (0),
+ bytesReadChunk (0),
+ timeSession (0),
+ timeChunk (0)
{
- this->timeSecSession = 0;
- this->bytesReadSession = 0;
- this->timeSecChunk = 0;
- this->bytesReadChunk = 0;
- this->bpsAvg = 0;
- this->bpsLastChunk = 0;
- this->chunkCount = 0;
- this->stream = stream;
}
HTTPConnectionManager::~HTTPConnectionManager ()
{
this->closeAllConnections();
}
-bool HTTPConnectionManager::closeConnection( IHTTPConnection *con )
+void HTTPConnectionManager::closeAllConnections ()
{
- for(std::map<Chunk*, HTTPConnection *>::iterator it = this->chunkMap.begin();
- it != this->chunkMap.end(); ++it)
+ for(size_t i = 0; i < this->connectionPool.size(); i++)
{
- if( it->second == con )
- {
- delete con;
- this->chunkMap.erase( it );
- return true;
- }
+ this->connectionPool.at(i)->closeSocket();
+ delete(this->connectionPool.at(i));
}
- return false;
}
-
-bool HTTPConnectionManager::closeConnection( Chunk *chunk )
+int HTTPConnectionManager::read (block_t *block)
{
- HTTPConnection *con = this->chunkMap[chunk];
- bool ret = this->closeConnection(con);
- this->chunkMap.erase(chunk);
- delete(chunk);
- return ret;
-}
+ if(this->downloadQueue.size() == 0)
+ if(!this->addChunk(this->adaptationLogic->getNextChunk()))
+ return 0;
-void HTTPConnectionManager::closeAllConnections ()
-{
- std::map<Chunk *, HTTPConnection *>::iterator it;
+ if(this->downloadQueue.front()->getPercentDownloaded() > PIPELINE &&
+ this->downloadQueue.size() < PIPELINELENGTH)
+ this->addChunk(this->adaptationLogic->getNextChunk());
- for(it = this->chunkMap.begin(); it != this->chunkMap.end(); ++it)
- delete(it->second);
-
- this->chunkMap.clear();
-}
-
-int HTTPConnectionManager::read( Chunk *chunk, void *p_buffer, size_t len )
-{
- if(this->chunkMap.find(chunk) == this->chunkMap.end())
- {
- this->bytesReadChunk = 0;
- this->timeSecChunk = 0;
-
- if ( this->initConnection( chunk ) == NULL )
- return -1;
- }
+ int ret = 0;
mtime_t start = mdate();
- int ret = this->chunkMap[chunk]->read(p_buffer, len);
+ ret = this->downloadQueue.front()->getConnection()->read(block->p_buffer, block->i_buffer);
mtime_t end = mdate();
- if( ret <= 0 )
- this->closeConnection( chunk );
- else
- {
- double time = ((double)(end - start)) / 1000000;
+ block->i_length = (mtime_t)((ret * 8) / ((float)this->downloadQueue.front()->getBitrate() / 1000000));
- this->bytesReadSession += ret;
- this->bytesReadChunk += ret;
- this->timeSecSession += time;
- this->timeSecChunk += time;
+ double time = ((double)(end - start)) / 1000000;
+ if(ret <= 0)
+ {
+ this->bpsLastChunk = this->bpsCurrentChunk;
+ this->bytesReadChunk = 0;
+ this->timeChunk = 0;
- if(this->timeSecSession > 0)
- this->bpsAvg = (this->bytesReadSession / this->timeSecSession) * 8;
-
- if(this->timeSecChunk > 0)
- this->bpsLastChunk = (this->bytesReadChunk / this->timeSecChunk) * 8;
-
- if(this->chunkCount < 2)
- this->bpsAvg = 0;
-
- if(this->chunkCount < 2)
- this->bpsLastChunk = 0;
+ delete(this->downloadQueue.front());
+ this->downloadQueue.pop();
- this->notify();
+ return this->read(block);
}
- return ret;
-}
-
-int HTTPConnectionManager::peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek)
-{
- if(this->chunkMap.find(chunk) == this->chunkMap.end())
+ else
{
- if ( this->initConnection(chunk) == NULL )
- return -1;
+ this->updateStatistics(ret, time);
}
- return this->chunkMap[chunk]->peek(pp_peek, i_peek);
-}
-IHTTPConnection* HTTPConnectionManager::initConnection(Chunk *chunk)
-{
- HTTPConnection *con = new HTTPConnection(this->stream);
- if ( con->init(chunk) == false )
- return NULL;
- this->chunkMap[chunk] = con;
- this->chunkCount++;
- return con;
+ return ret;
}
-void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
+void HTTPConnectionManager::attach (IDownloadRateObserver *observer)
{
this->rateObservers.push_back(observer);
}
-void HTTPConnectionManager::notify ()
+void HTTPConnectionManager::notify ()
{
if ( this->bpsAvg == 0 )
return ;
for(size_t i = 0; i < this->rateObservers.size(); i++)
this->rateObservers.at(i)->downloadRateChanged(this->bpsAvg, this->bpsLastChunk);
}
+std::vector<PersistentConnection *> HTTPConnectionManager::getConnectionsForHost (const std::string &hostname)
+{
+ std::vector<PersistentConnection *> cons;
+
+ for(size_t i = 0; i < this->connectionPool.size(); i++)
+ if(!this->connectionPool.at(i)->getHostname().compare(hostname) || !this->connectionPool.at(i)->isConnected())
+ cons.push_back(this->connectionPool.at(i));
+
+ return cons;
+}
+void HTTPConnectionManager::updateStatistics (int bytes, double time)
+{
+ this->bytesReadSession += bytes;
+ this->bytesReadChunk += bytes;
+ this->timeSession += time;
+ this->timeChunk += time;
+
+ this->bpsAvg = (int64_t) ((this->bytesReadSession * 8) / this->timeSession);
+ this->bpsCurrentChunk = (int64_t) ((this->bytesReadChunk * 8) / this->timeChunk);
+
+ if(this->bpsAvg < 0)
+ this->bpsAvg = 0;
+
+ if(this->bpsCurrentChunk < 0)
+ this->bpsCurrentChunk = 0;
+
+ this->notify();
+}
+bool HTTPConnectionManager::addChunk (Chunk *chunk)
+{
+ if(chunk == NULL)
+ return false;
+
+ this->downloadQueue.push(chunk);
+
+ std::vector<PersistentConnection *> cons = this->getConnectionsForHost(chunk->getHostname());
+
+ if(cons.size() == 0)
+ {
+ PersistentConnection *con = new PersistentConnection(this->stream);
+ this->connectionPool.push_back(con);
+ cons.push_back(con);
+ }
+
+ size_t pos = this->chunkCount % cons.size();
+
+ cons.at(pos)->addChunk(chunk);
+
+ chunk->setConnection(cons.at(pos));
+
+ this->chunkCount++;
+
+ if(chunk->getBitrate() <= 0)
+ chunk->setBitrate(CHUNKDEFAULTBITRATE);
+
+ return true;
+}
diff --git a/modules/stream_filter/dash/http/HTTPConnectionManager.h b/modules/stream_filter/dash/http/HTTPConnectionManager.h
index 006ca12..f090e63 100644
--- a/modules/stream_filter/dash/http/HTTPConnectionManager.h
+++ b/modules/stream_filter/dash/http/HTTPConnectionManager.h
@@ -29,14 +29,18 @@
#include <string>
#include <vector>
+#include <queue>
#include <iostream>
#include <ctime>
-#include <map>
#include <limits.h>
-#include "http/HTTPConnection.h"
-#include "http/Chunk.h"
-#include "adaptationlogic/IDownloadRateObserver.h"
+#include "http/PersistentConnection.h"
+#include "adaptationlogic/IAdaptationLogic.h"
+
+#define CONNECTIONS 1
+#define PIPELINE 80
+#define PIPELINELENGTH 3
+#define CHUNKDEFAULTBITRATE 1
namespace dash
{
@@ -45,31 +49,32 @@ namespace dash
class HTTPConnectionManager
{
public:
- HTTPConnectionManager (stream_t *stream);
+ HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
virtual ~HTTPConnectionManager ();
- void closeAllConnections ();
- bool closeConnection (IHTTPConnection *con);
- int read (Chunk *chunk, void *p_buffer, size_t len);
- int peek (Chunk *chunk, const uint8_t **pp_peek, size_t i_peek);
- void attach (dash::logic::IDownloadRateObserver *observer);
- void notify ();
+ void closeAllConnections ();
+ bool addChunk (Chunk *chunk);
+ int read (block_t *block);
+ void attach (dash::logic::IDownloadRateObserver *observer);
+ void notify ();
private:
- std::map<Chunk *, HTTPConnection *> chunkMap;
- std::map<std::string, HTTPConnection *> urlMap;
std::vector<dash::logic::IDownloadRateObserver *> rateObservers;
- uint64_t bpsAvg;
- uint64_t bpsLastChunk;
- long bytesReadSession;
- double timeSecSession;
- long bytesReadChunk;
- double timeSecChunk;
+ std::queue<Chunk *> downloadQueue;
+ std::vector<PersistentConnection *> connectionPool;
+ logic::IAdaptationLogic *adaptationLogic;
stream_t *stream;
int chunkCount;
+ int64_t bpsAvg;
+ int64_t bpsLastChunk;
+ int64_t bpsCurrentChunk;
+ int64_t bytesReadSession;
+ int64_t bytesReadChunk;
+ double timeSession;
+ double timeChunk;
- bool closeConnection( Chunk *chunk );
- IHTTPConnection* initConnection( Chunk *chunk );
+ std::vector<PersistentConnection *> getConnectionsForHost (const std::string &hostname);
+ void updateStatistics (int bytes, double time);
};
}
--
1.7.0.4
More information about the vlc-devel
mailing list