Skip to content

Commit c4931a8

Browse files
committed
组包时需要指定包类型,区分实时、历史数据
1 parent b0b5ef6 commit c4931a8

File tree

5 files changed

+51
-17
lines changed

5 files changed

+51
-17
lines changed

src/JT1078.Protocol.Test/Extensions/JT1078PackageExtensionsTest.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using JT1078.Protocol.Extensions;
1+
using JT1078.Protocol.Enums;
2+
using JT1078.Protocol.Extensions;
23
using System;
34
using System.Collections.Generic;
45
using System.IO;
@@ -23,7 +24,7 @@ public void Test()
2324
var bytes = data[6].ToHexBytes();
2425
JT1078Package package = JT1078Serializer.Deserialize(bytes);
2526
mergeBodyLength += package.DataBodyLength;
26-
merge = JT1078Serializer.Merge(package);
27+
merge = JT1078Serializer.Merge(package,JT808ChannelType.Live);
2728
}
2829
var packages = merge.Bodies.ConvertVideo(merge.SIM, merge.LogicChannelNumber, merge.Label2.PT, merge.Label3.DataType,
2930
merge.Timestamp, merge.LastFrameInterval, merge.LastFrameInterval);

src/JT1078.Protocol.Test/H264/H264DecoderTest.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using JT1078.Protocol.H264;
1010
using JT808.Protocol.Extensions;
1111
using JT1078.Protocol.MessagePack;
12+
using JT1078.Protocol.Enums;
1213

1314
namespace JT1078.Protocol.Test.H264
1415
{
@@ -41,7 +42,7 @@ public void ParseNALUTest2()
4142
var bytes = data[6].ToHexBytes();
4243
JT1078Package package = JT1078Serializer.Deserialize(bytes);
4344
mergeBodyLength += package.DataBodyLength;
44-
Package = JT1078Serializer.Merge(package);
45+
Package = JT1078Serializer.Merge(package,JT808ChannelType.Live);
4546
}
4647
H264Decoder decoder = new H264Decoder();
4748
var nalus = decoder.ParseNALU(Package);
@@ -128,7 +129,7 @@ public void ParseNALUTest5()
128129
{
129130
var bytes = line.ToHexBytes();
130131
JT1078Package package = JT1078Serializer.Deserialize(bytes);
131-
var packageMerge = JT1078Serializer.Merge(package);
132+
var packageMerge = JT1078Serializer.Merge(package,JT808ChannelType.Live);
132133
if (packageMerge != null)
133134
{
134135
var nalus = decoder.ParseNALU(packageMerge);

src/JT1078.Protocol.Test/JT1078SerializerTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ public void MergeTest()
345345
var bytes = data[6].ToHexBytes();
346346
JT1078Package package = JT1078Serializer.Deserialize(bytes);
347347
mergeBodyLength += package.DataBodyLength;
348-
merge = JT1078Serializer.Merge(package);
348+
merge = JT1078Serializer.Merge(package,JT808ChannelType.Live);
349349
}
350350
Assert.NotNull(merge);
351351
Assert.Equal(mergeBodyLength, merge.Bodies.Length);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace JT1078.Protocol.Enums
2+
{
3+
/// <summary>
4+
/// data channel type
5+
/// </summary>
6+
public enum JT808ChannelType
7+
{
8+
/// <summary>
9+
/// live channel
10+
/// </summary>
11+
Live,
12+
/// <summary>
13+
/// history channel
14+
/// </summary>
15+
History
16+
}
17+
}

src/JT1078.Protocol/JT1078Serializer.cs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
using JT1078.Protocol.Enums;
2-
using JT1078.Protocol.Extensions;
3-
using JT1078.Protocol.MessagePack;
4-
using System;
1+
using System;
52
using System.Collections.Concurrent;
6-
using System.Collections.Generic;
73
using System.IO;
84
using System.Linq;
95
using System.Text;
106
using System.Text.Json;
7+
using JT1078.Protocol.Enums;
8+
using JT1078.Protocol.Extensions;
9+
using JT1078.Protocol.MessagePack;
1110

1211
namespace JT1078.Protocol
1312
{
1413
public static class JT1078Serializer
1514
{
16-
private readonly static ConcurrentDictionary<string, JT1078Package> JT1078PackageGroupDict = new ConcurrentDictionary<string, JT1078Package>(StringComparer.OrdinalIgnoreCase);
15+
private readonly static ConcurrentDictionary<string, JT1078Package> livePackageGroup = new(StringComparer.OrdinalIgnoreCase);
16+
private readonly static ConcurrentDictionary<string, JT1078Package> historyPackageGroup = new(StringComparer.OrdinalIgnoreCase);
1717
public static byte[] Serialize(JT1078Package package, int minBufferSize = 4096)
1818
{
1919
byte[] buffer = JT1078ArrayPool.Rent(minBufferSize);
@@ -72,18 +72,33 @@ public static JT1078Package Deserialize(ReadOnlySpan<byte> bytes)
7272
jT1078Package.Bodies = jT1078MessagePackReader.ReadRemainArray().ToArray();
7373
return jT1078Package;
7474
}
75-
public static JT1078Package Merge(JT1078Package jT1078Package)
75+
76+
/// <summary>
77+
/// merge package
78+
/// <para>due to the agreement cannot distinguish the Stream type, so need clear Stream type at the time of merger, avoid confusion</para>
79+
/// </summary>
80+
/// <param name="jT1078Package">package of jt1078</param>
81+
/// <param name="channelType">package type is live or history</param>
82+
/// <returns></returns>
83+
/// <exception cref="Exception"></exception>
84+
public static JT1078Package Merge(JT1078Package jT1078Package,JT808ChannelType channelType)
7685
{
7786
string cacheKey = jT1078Package.GetKey();
87+
var packageGroup = channelType switch
88+
{
89+
JT808ChannelType.Live=>livePackageGroup,
90+
JT808ChannelType.History=>historyPackageGroup,
91+
_=>throw new Exception("channel type error")
92+
};
7893
if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包)
7994
{
80-
JT1078PackageGroupDict.TryRemove(cacheKey, out _);
81-
JT1078PackageGroupDict.TryAdd(cacheKey, jT1078Package);
95+
packageGroup.TryRemove(cacheKey, out _);
96+
packageGroup.TryAdd(cacheKey, jT1078Package);
8297
return default;
8398
}
8499
else if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包)
85100
{
86-
if (JT1078PackageGroupDict.TryGetValue(cacheKey, out var tmpPackage))
101+
if (packageGroup.TryGetValue(cacheKey, out var tmpPackage))
87102
{
88103
var totalLength = tmpPackage.Bodies.Length + jT1078Package.Bodies.Length;
89104
byte[] poolBytes = JT1078ArrayPool.Rent(totalLength);
@@ -92,13 +107,13 @@ public static JT1078Package Merge(JT1078Package jT1078Package)
92107
jT1078Package.Bodies.CopyTo(tmpSpan.Slice(tmpPackage.Bodies.Length));
93108
tmpPackage.Bodies = tmpSpan.Slice(0, totalLength).ToArray();
94109
JT1078ArrayPool.Return(poolBytes);
95-
JT1078PackageGroupDict[cacheKey] = tmpPackage;
110+
packageGroup[cacheKey] = tmpPackage;
96111
}
97112
return default;
98113
}
99114
else if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包)
100115
{
101-
if (JT1078PackageGroupDict.TryRemove(cacheKey, out var tmpPackage))
116+
if (packageGroup.TryRemove(cacheKey, out var tmpPackage))
102117
{
103118
var totalLength = tmpPackage.Bodies.Length + jT1078Package.Bodies.Length;
104119
byte[] poolBytes = JT1078ArrayPool.Rent(totalLength);

0 commit comments

Comments
 (0)