[vlma-devel] commit: DVBlast implementation. (Adrien Grand )

git version control git at videolan.org
Sun Dec 20 17:40:01 CET 2009


vlma | branch: master | Adrien Grand <jpountz at videolan.org> | Sun Dec 20 17:39:12 2009 +0100| [bad60cae9603353c778ee8be1840d5b6bd4e5c8e] | committer: Adrien Grand 

DVBlast implementation.

> http://git.videolan.org/gitweb.cgi/vlma.git/?a=commit;h=bad60cae9603353c778ee8be1840d5b6bd4e5c8e
---

 vlma-watchdog/src/conf.py             |    6 +-
 vlma-watchdog/src/streamer/api.py     |   28 ++++++++-----
 vlma-watchdog/src/streamer/dvblast.py |   67 +++++++++++++++++++++++++++++++-
 vlma-watchdog/src/web.py              |    4 +-
 4 files changed, 86 insertions(+), 19 deletions(-)

diff --git a/vlma-watchdog/src/conf.py b/vlma-watchdog/src/conf.py
index 9ba628a..d91d8e0 100644
--- a/vlma-watchdog/src/conf.py
+++ b/vlma-watchdog/src/conf.py
@@ -12,8 +12,8 @@ SERVER_LOGIN = "videolan"
 SERVER_PASSWORD = "admin"
 
 STREAMERS = [
-  streamer.vlc.VLC("vlc", "vlc", {streamer.vlc.VLCOption.TELNET_PORT: 4214}),
-  streamer.vlc.VLC("vlc2", "vlc", {streamer.vlc.VLCOption.TELNET_PORT: 4215}),
-#  streamer.dvblast.DVBlast("dvblast", "dvblast", {})
+#  streamer.vlc.VLC("vlc", "vlc", {streamer.vlc.VLCOption.TELNET_PORT: 4214}),
+#  streamer.vlc.VLC("vlc2", "vlc", {streamer.vlc.VLCOption.TELNET_PORT: 4215}),
+  streamer.dvblast.DVBlast("dvblast", "/home/jpountz/src/dvblast/dvblast")
 ]
 
diff --git a/vlma-watchdog/src/streamer/api.py b/vlma-watchdog/src/streamer/api.py
index c8b6937..d32ee9e 100644
--- a/vlma-watchdog/src/streamer/api.py
+++ b/vlma-watchdog/src/streamer/api.py
@@ -128,6 +128,7 @@ class StreamerRunner(threading.Thread):
     self.pid = 0
     self.__shouldRun = True
     self.start_time = 0
+    self.__last_started = 0
     self.__log_queue = queue
     self.__log_queue_lock = queue_lock
     self.__stdout_reader = stdout_reader
@@ -139,8 +140,13 @@ class StreamerRunner(threading.Thread):
       if not self.__shouldRun:
         self.__lock.release()
         break
-      self.__logger.info("Running %s", " ".join(self.__args))
+      if self.__last_started > 1000L * (time.time() - 5):
+        to_sleep = int(5 + self.__last_started / 1000 - time.time())
+        self.__logger.info("Waiting %d seconds before restarting...", to_sleep)
+        time.sleep(to_sleep)
       self.start_time = math.floor(1000L * time.time())
+      self.__logger.info("Running %s", " ".join(self.__args))
+      self.__last_started = self.start_time
       try:
         process = subprocess.Popen(self.__args, bufsize=8192, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         self.__stdout_reader.set_input(process.stdout)
@@ -155,14 +161,9 @@ class StreamerRunner(threading.Thread):
         break
       finally:
         self.__lock.release()
-      # Restart streaming the orders
-      for order in self.__streamer.orders:
-        try:
-          self.__streamer.start_streaming(order)
-        except BaseException, e:
-          self.__logger.warn("Could not start streaming order %s: %s", order.id, str(e))
       exit_status = process.wait()
       self.__lock.acquire()
+      self.pid = None
       self.start_time = 0
       self.__lock.release()
       self.__logger.info("Streamer %s exited with return code %d", self.__streamer.id, exit_status)
@@ -276,9 +277,12 @@ class StreamerMonitor(threading.Thread):
       try:
         cmd = "ps -p %d -o %s=" %(runner.pid, property)
         result = os.popen(cmd).readline().strip(' ').strip('\n')
-        return float(result)
+        if result == "":
+          return 0
+        else:
+          return float(result)
       except Exception, e:
-        self.__logger.error(e)
+        self.__logger.warn(e)
     return 0.
 
   def getStreamerCpu(self):
@@ -301,14 +305,16 @@ class Order:
 
   def __init__(self, id):
     self.id = id
+    # In case of a DVB order, src is a SID. It is the URI of the resource to
+    # stream otherwise.
     self.programs = {} # src -> dest[]
     self.ttl = 12
     self.streamer = None
 
 
 class Dest:
-   """A destination. Describes what to do with the stream (how and where to
-   stream, transcoding, etc.)"""
+  """A destination. Describes what to do with the stream (how and where to
+  stream, transcoding, etc.)"""
 
   def __init__(self):
     self.streaming = Streaming()
diff --git a/vlma-watchdog/src/streamer/dvblast.py b/vlma-watchdog/src/streamer/dvblast.py
index 79331f0..77696b8 100644
--- a/vlma-watchdog/src/streamer/dvblast.py
+++ b/vlma-watchdog/src/streamer/dvblast.py
@@ -1,9 +1,70 @@
 #! /usr/bin/env python
 # -*- coding: utf-8 -*-
 
-from api import Streamer
+import constants, os.path, tempfile
+from api import *
+
+class DVBlastOption(constants.Enum):
+  def __init__(self, name):
+    constants.Enum.__init__(self, name)
+
+DVBlastOption.ADAPTER = DVBlastOption("ADAPTER")
+
+DEFAULT_DVBLAST_OPTIONS = {
+
+  DVBlastOption.ADAPTER: 0
+
+}
 
 class DVBlast(Streamer):
+  """A DVBlast streamer."""
+
+  def __init__(self, sid, cmd, options={}):
+    opts = DEFAULT_DVBLAST_OPTIONS.copy()
+    for k, v in options.items():
+      opts[k] = v
+    Streamer.__init__(self, sid, cmd, opts)
+    self.type = "DVBLAST"
+    self.filename = os.path.join(os.path.abspath(os.path.curdir), "dvblast-" + self.id + ".conf")
+
+  def can_stream(self, order):
+    # DVBlast can stream if the order is a DVB one and if there is no
+    # transcoding required
+    if isinstance(order, DVBOrder):
+      if order.adapter != self.options[DVBlastOption.ADAPTER]:
+        return False
+      for src, dests in order.programs.items():
+        for dest in dests:
+          if not dest.transcoding is None or not dest.streaming.type == "broadcast" or not dest.streaming.protocol in ["rtp", "udp"] or not dest.streaming.mux in ["ts", "raw"]:
+            break
+      return True
+    return False
+
+  def _write_conf_file(self, order):
+    conf_file = []
+    for src, dests in order.programs.items():
+      for dest in dests:
+        d = dest.streaming.ip + ":" + str(dest.streaming.port)
+        if dest.streaming.protocol == "udp":
+          d += "/rtp"
+        conf_file += [d, " ", "1", " ", str(src), "\n"]
+    f = open(self.filename, 'w')
+    f.writelines(conf_file)
+    f.close()
+
+  def start_streaming(self, order):
+    args = ["-f", str(order.frequency),
+            "-a", str(order.adapter),
+            "-c", self.filename,
+            "-t", str(order.ttl),
+            "-i", "1"]
+    if isinstance(order, DVBSOrder):
+      args += ["-s", str(order.srate),
+               "-v", str(order.voltage),
+               "-b", str(order.bandwidth)]
+    self._write_conf_file(order)
+    self.start(args)
+
+  def stop_streaming(self, order):
+    self.stop()
 
-  def __init__(self):
-    pass
diff --git a/vlma-watchdog/src/web.py b/vlma-watchdog/src/web.py
index 923c9e9..92f1bc3 100644
--- a/vlma-watchdog/src/web.py
+++ b/vlma-watchdog/src/web.py
@@ -209,14 +209,14 @@ class OrderAddResource(AuthenticationRequiredResource):
     for streamer in self.streamers:
       if streamer.can_stream(order):
         order.streamer = streamer.id
-        streamer.orders[order.id] = order
         try:
           streamer.start_streaming(order)
         except Exception, e:
-          error = Error(ErrorCode.STREAMER_FAILED, "Streamer %s could not stream the order: %s" %(str(streamer.id, str(e))))
+          error = Error(ErrorCode.STREAMER_FAILED, "Streamer %s could not stream the order: %s" %(str(streamer.id), str(e)))
           request.setResponseCode(error.code.http_status)
           serialization.error_to_xml(error, buf)
           return str(buf)
+        streamer.orders[order.id] = order
         serialization.order_to_xml(order, buf)
         return str(buf)
 



More information about the vlma-devel mailing list