Skip to content

added several errors and error stream emission #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 165 additions & 97 deletions lib/m2ts/m2ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ var Stream = require('../utils/stream.js'),
StreamTypes = require('./stream-types');

var Stream = require('../utils/stream.js');
var m2tsStreamTypes = require('./stream-types.js');

// object types
var
Expand All @@ -33,6 +32,7 @@ var
TransportPacketStream = function() {
var
buffer = new Uint8Array(MP2T_PACKET_LENGTH),
sinceFlushDataEmitted = 0,
bytesInBuffer = 0;

TransportPacketStream.prototype.init.call(this);
Expand All @@ -41,11 +41,17 @@ TransportPacketStream = function() {

this.push = function(bytes) {
var
i = 0,
startIndex = 0,
endIndex = MP2T_PACKET_LENGTH,
everything;

// if bytes is not a valid stream
if (typeof bytes !== 'object' ||
typeof bytes.byteLength === 'undefined' ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We gotta find a better way to test for undefined...

bytes.byteLength === 0) {
return;
}

// If there are bytes remaining from the last segment, prepend them to the
// bytes that were pushed in
if (bytesInBuffer) {
Expand All @@ -63,7 +69,7 @@ TransportPacketStream = function() {
if (everything[startIndex] === SYNC_BYTE && everything[endIndex] === SYNC_BYTE) {
// We found a packet so emit it and jump one whole packet forward in
// the stream
this.trigger('data', everything.subarray(startIndex, endIndex));
this.triggerData_(everything.subarray(startIndex, endIndex));
startIndex += MP2T_PACKET_LENGTH;
endIndex += MP2T_PACKET_LENGTH;
continue;
Expand All @@ -84,14 +90,23 @@ TransportPacketStream = function() {
}
};

this.triggerData_ = function(data) {
sinceFlushDataEmitted++;
this.trigger('data', data);
};

this.flush = function () {
// If the buffer contains a whole packet when we are being flushed, emit it
// and empty the buffer. Otherwise hold onto the data because it may be
// important for decoding the next segment
if (bytesInBuffer === MP2T_PACKET_LENGTH && buffer[0] === SYNC_BYTE) {
this.trigger('data', buffer);
this.triggerData_(buffer);
bytesInBuffer = 0;
}
if (sinceFlushDataEmitted === 0) {
this.trigger('error', 'No data has been emitted since last flush in TransportPacketStream');
}
sinceFlushDataEmitted = 0;
this.trigger('done');
};
};
Expand All @@ -102,7 +117,14 @@ TransportPacketStream.prototype = new Stream();
* forms of the individual transport stream packets.
*/
TransportParseStream = function() {
var parsePsi, parsePat, parsePmt, parsePes, self;
var
parsePsi,
parsePat,
parsePmt,
parsePes,
self,
sinceFlushDataEmitted = 0;

TransportParseStream.prototype.init.call(this);
self = this;

Expand Down Expand Up @@ -197,6 +219,14 @@ TransportParseStream = function() {
result = {},
offset = 4;

// if bytes is not a valid stream
if (typeof packet !== 'object' ||
typeof packet.byteLength === 'undefined' ||
packet.byteLength === 0) {
this.trigger('error', 'invalid packet was passed to TransportParseStream');
return;
}

result.payloadUnitStartIndicator = !!(packet[1] & 0x40);

// pid is a 13-bit field starting at the last bit of packet[1]
Expand All @@ -217,11 +247,11 @@ TransportParseStream = function() {
if (result.pid === 0) {
result.type = 'pat';
parsePsi(packet.subarray(offset), result);
this.trigger('data', result);
this.triggerData_(result);
} else if (result.pid === this.pmtPid) {
result.type = 'pmt';
parsePsi(packet.subarray(offset), result);
this.trigger('data', result);
this.triggerData_(result);
} else if (this.programMapTable === undefined) {
// When we have not seen a PMT yet, defer further processing of
// PES packets until one has been parsed
Expand All @@ -231,20 +261,29 @@ TransportParseStream = function() {
}
};

this.triggerData_ = function(data) {
sinceFlushDataEmitted++;
this.trigger('data', data);
};

this.flush = function() {
if (sinceFlushDataEmitted === 0) {
this.trigger('error', 'flush called before data was emitted in TransportParseStream');
}
sinceFlushDataEmitted = 0;
this.trigger('done');
};

this.processPes_ = function (packet, offset, result) {
result.streamType = this.programMapTable[result.pid];
result.type = 'pes';
result.data = packet.subarray(offset);

this.trigger('data', result);
this.triggerData_(result);
};

};
TransportParseStream.prototype = new Stream();
TransportParseStream.STREAM_TYPES = {
h264: 0x1b,
adts: 0x0f
};


/**
* Reconsistutes program elementary stream (PES) packets from parsed
Expand All @@ -255,7 +294,9 @@ TransportParseStream.STREAM_TYPES = {
* container.
*/
ElementaryStream = function() {
ElementaryStream.prototype.init.call(this);
var
sinceFlushDataEmitted = 0,
// PES packet fragments
video = {
data: [],
Expand All @@ -268,8 +309,82 @@ ElementaryStream = function() {
timedMetadata = {
data: [],
size: 0
},
parsePes = function(payload, pes) {
};
this.pes_ = function(data) {
var stream, streamType;
switch (data.streamType) {
case StreamTypes.H264_STREAM_TYPE:
stream = video;
streamType = 'video';
break;
case StreamTypes.ADTS_STREAM_TYPE:
stream = audio;
streamType = 'audio';
break;
case StreamTypes.METADATA_STREAM_TYPE:
stream = timedMetadata;
streamType = 'timed-metadata';
break;
case undefined:
this.trigger('error', 'Found an undefined pes stream type');
return;
default:
this.trigger('error', 'Found an unknown pes stream type ' + data.streamType.toString(16));
return;
}

// if a new packet is starting, we can flush the completed
// packet
if (data.payloadUnitStartIndicator) {
this.flushStream_(stream, streamType);
}

// buffer this fragment until we are sure we've received the
// complete payload
stream.data.push(data);
stream.size += data.data.byteLength;
};
this.pmt_ = function(data) {
var event = {
type: 'metadata',
tracks: []
},
programMapTable = data.programMapTable,
k,
track;

// translate streams to tracks
for (k in programMapTable) {
if (programMapTable.hasOwnProperty(k)) {
track = {
timelineStartInfo: {
baseMediaDecodeTime: 0
}
};
track.id = +k;
if (programMapTable[k] === StreamTypes.H264_STREAM_TYPE) {
track.codec = 'avc';
track.type = 'video';
} else if (programMapTable[k] === StreamTypes.ADTS_STREAM_TYPE) {
track.codec = 'adts';
track.type = 'audio';
} else {
this.trigger('error', 'got some unknown kind ElementaryStream stream type: 0x' + programMapTable[k].toString(16));
continue;
}
event.tracks.push(track);
}
}

this.triggerData_(event);
};
this.pat_ = function(data) {
// we have to wait for the PMT to arrive as well before we
// have any meaningful metadata
return;
};

this.parsePes_ = function(payload, pes) {
var ptsDtsFlags;

// find out if this packets starts a new keyframe
Expand Down Expand Up @@ -314,8 +429,8 @@ ElementaryStream = function() {
// pes_header_data_length specifies the number of header bytes
// that follow the last byte of the field.
pes.data = payload.subarray(9 + payload[8]);
},
flushStream = function(stream, type) {
};
this.flushStream_ = function(stream, type) {
var
packetData = new Uint8Array(stream.size),
event = {
Expand All @@ -339,88 +454,37 @@ ElementaryStream = function() {
}

// parse assembled packet's PES header
parsePes(packetData, event);

this.parsePes_(packetData, event);
stream.size = 0;
this.triggerData_(event);
};

self.trigger('data', event);
},
self;

ElementaryStream.prototype.init.call(this);
self = this;
this.triggerData_ = function(data) {
sinceFlushDataEmitted++;
this.trigger('data', data);
};

this.push = function(data) {
({
pat: function() {
// we have to wait for the PMT to arrive as well before we
// have any meaningful metadata
},
pes: function() {
var stream, streamType;

switch (data.streamType) {
case StreamTypes.H264_STREAM_TYPE:
case m2tsStreamTypes.H264_STREAM_TYPE:
stream = video;
streamType = 'video';
break;
case StreamTypes.ADTS_STREAM_TYPE:
stream = audio;
streamType = 'audio';
break;
case StreamTypes.METADATA_STREAM_TYPE:
stream = timedMetadata;
streamType = 'timed-metadata';
break;
default:
// ignore unknown stream types
return;
}

// if a new packet is starting, we can flush the completed
// packet
if (data.payloadUnitStartIndicator) {
flushStream(stream, streamType);
}

// buffer this fragment until we are sure we've received the
// complete payload
stream.data.push(data);
stream.size += data.data.byteLength;
},
pmt: function() {
var
event = {
type: 'metadata',
tracks: []
},
programMapTable = data.programMapTable,
k,
track;

// translate streams to tracks
for (k in programMapTable) {
if (programMapTable.hasOwnProperty(k)) {
track = {
timelineStartInfo: {
baseMediaDecodeTime: 0
}
};
track.id = +k;
if (programMapTable[k] === m2tsStreamTypes.H264_STREAM_TYPE) {
track.codec = 'avc';
track.type = 'video';
} else if (programMapTable[k] === m2tsStreamTypes.ADTS_STREAM_TYPE) {
track.codec = 'adts';
track.type = 'audio';
}
event.tracks.push(track);
}
}
self.trigger('data', event);
}
})[data.type]();
if (typeof data !== 'object' ||
!data.type) {
this.trigger('error', 'Data with no type passed to ElementaryStream');
return;
}
switch (data.type) {
case 'pmt':
this.pmt_(data);
break;
case 'pat':
this.pat_(data);
break;
case 'pes':
this.pes_(data);
break;
default:
this.trigger('error', 'Unknown packet type!' + data.type);
break;
}
};

/**
Expand All @@ -435,9 +499,13 @@ ElementaryStream = function() {
this.flush = function() {
// !!THIS ORDER IS IMPORTANT!!
// video first then audio
flushStream(video, 'video');
flushStream(audio, 'audio');
flushStream(timedMetadata, 'timed-metadata');
this.flushStream_(video, 'video');
this.flushStream_(audio, 'audio');
this.flushStream_(timedMetadata, 'timed-metadata');
if (sinceFlushDataEmitted === 0) {
this.trigger('error', 'No data emitted since flush in ElementaryStream');
}
sinceFlushDataEmitted = 0;
this.trigger('done');
};
};
Expand Down
Loading