from contextlib import closing
from io import BytesIO
import json
import xml.etree.ElementTree as ET
from pywb.rewrite.content_rewriter import BufferedRewriter
# ============================================================================
[docs]class RewriteDASH(BufferedRewriter):
[docs] def rewrite_stream(self, stream, rwinfo):
res_buff, best_ids = self.rewrite_dash(stream, rwinfo)
return res_buff
[docs] def rewrite_dash(self, stream, rwinfo):
max_resolution, max_bandwidth = self._get_adaptive_metadata(rwinfo)
ET.register_namespace('', 'urn:mpeg:dash:schema:mpd:2011')
namespaces = {'mpd': 'urn:mpeg:dash:schema:mpd:2011'}
tree = ET.ElementTree()
tree.parse(stream)
root = tree.getroot()
best_ids = []
for period in root.findall('mpd:Period', namespaces):
for adaptset in period.findall('mpd:AdaptationSet', namespaces):
best = None
best_resolution = 0
best_bandwidth = 0
for repres in adaptset.findall('mpd:Representation', namespaces):
curr_resolution = int(repres.get('width', '0')) * int(repres.get('height', '0'))
curr_bandwidth = int(repres.get('bandwidth', 0))
if curr_resolution and max_resolution:
if curr_resolution <= max_resolution and curr_resolution > best_resolution:
best_resolution = curr_resolution
best_bandwidth = curr_bandwidth
best = repres
elif curr_bandwidth <= max_bandwidth and curr_bandwidth > best_bandwidth:
best_resolution = curr_resolution
best_bandwidth = curr_bandwidth
best = repres
if best is not None:
best_ids.append(best.get('id'))
for repres in adaptset.findall('mpd:Representation', namespaces):
if repres != best:
adaptset.remove(repres)
buff_io = BytesIO()
tree.write(buff_io, encoding='UTF-8', xml_declaration=True)
buff_io.seek(0)
return buff_io, best_ids
# ============================================================================
[docs]def rewrite_fb_dash(string, *args):
DASH_SPLITS = [r'\n",dash_prefetched_representation_ids:', r'\n","dash_prefetched_representation_ids":']
inx = -1
split = None
for split in DASH_SPLITS:
inx = string.find(split)
if inx >= 0:
break
if inx < 0:
return
string = string[:inx]
buff = string.encode('utf-8').decode('unicode-escape')
buff = buff.replace('\\/', '/')
buff = buff.encode('utf-8')
io = BytesIO(buff)
io, best_ids = RewriteDASH().rewrite_dash(io, None)
buff = io.read().decode('utf-8')
string = json.dumps(buff)
string = string[1:-1].replace('<', r'\x3C')
string += split
string += json.dumps(best_ids)
return string
[docs]def rewrite_tw_dash(string, *args):
try:
best_variant = None
best_bitrate = 0
best_src = ""
max_bitrate = 5000000
data = json.loads(string)
for variant in data["variants"]:
if (("content_type" in variant and variant["content_type"] != "video/mp4") or
("type" in variant and variant["type"] != "video/mp4")):
continue
bitrate = variant.get("bitrate")
src = variant.get("src")
if bitrate and bitrate > best_bitrate and bitrate <= max_bitrate:
best_variant = variant
best_bitrate = bitrate
# just compare src strings with dimensions
elif src and src > best_src:
best_variant = variant
best_src = src
if best_variant:
data["variants"] = [best_variant]
string = json.dumps(data)
except Exception as e:
print(e)
return string