import { assert, ConditionVariable } from '@fuman/utils' import { load } from 'cheerio' import { ffetch } from './fetch.ts' import { writeWebStreamToFile } from './fs.ts' interface SimpleMpd { codecs: string initUrl: string segmentUrls: string[] } export function parseSimpleMpd(xml: string): SimpleMpd { const $ = load(xml, { xml: true }) const period = $('Period') assert(period.length === 1, 'expected exactly one period') const adaptations = period.find('AdaptationSet') assert(adaptations.length === 1, 'expected exactly one adaptation set') const representation = adaptations.find('Representation') assert(representation.length === 1, 'expected exactly one representation') const segmentTemplate = representation.find('SegmentTemplate') assert(segmentTemplate.length === 1, 'expected exactly one segment template') const initUrl = segmentTemplate.attr('initialization') const templateUrl = segmentTemplate.attr('media') const startNum = segmentTemplate.attr('startNumber') assert(initUrl !== undefined, 'expected initialization url') assert(templateUrl !== undefined, 'expected template url') assert(!templateUrl.match(/\$(RepresentationID|Bandwidth|Time)\$/), 'unsupported template url') assert(startNum !== undefined, 'expected start number') const timeline = segmentTemplate.find('SegmentTimeline') assert(timeline.length === 1, 'expected exactly one segment timeline') const segments = timeline.find('S') assert(segments.length > 0, 'expected at least one segment') const segmentUrls: string[] = [] let segmentNum = Number(startNum) for (const segment of segments) { const duration = $(segment).attr('d') assert(duration !== undefined, 'expected duration') const r = $(segment).attr('r') const repeats = r ? Number.parseInt(r) + 1 : 1 for (let i = 0; i < repeats; i++) { segmentUrls.push(templateUrl.replace('$Number$', String(segmentNum))) segmentNum++ } } return { codecs: representation.attr('codecs')!, initUrl, segmentUrls, } } export function concatMpdSegments(options: { mpd: SimpleMpd fetch: (url: string) => Promise poolSize?: number }): ReadableStream { const { mpd, fetch, poolSize = 8 } = options let nextSegmentIdx = -1 let nextWorkerSegmentIdx = -1 const nextSegmentCv = new ConditionVariable() const buffer: Record = {} const downloadSegment = async (idx = nextWorkerSegmentIdx++) => { // console.log('downloading segment %s', idx) const url = idx === -1 ? mpd.initUrl : mpd.segmentUrls[idx] const chunk = await fetch(url) buffer[idx] = chunk if (idx === nextSegmentIdx) { nextSegmentCv.notify() } if (nextWorkerSegmentIdx < mpd.segmentUrls.length) { return downloadSegment() } } let error: unknown void Promise.all(Array.from({ length: Math.min(poolSize, mpd.segmentUrls.length), }, downloadSegment)) .catch((e) => { error = e nextSegmentCv.notify() }) return new ReadableStream({ async start(controller) { while (true) { await nextSegmentCv.wait() if (error) { controller.error(error) return } while (nextSegmentIdx in buffer) { const buf = buffer[nextSegmentIdx] delete buffer[nextSegmentIdx] nextSegmentIdx++ controller.enqueue(buf) } if (nextSegmentIdx >= mpd.segmentUrls.length) { controller.close() return } } }, }) }