diff --git a/.gitignore b/.gitignore
index c8717a0d..b578d461 100644
--- a/.gitignore
+++ b/.gitignore
@@ -136,6 +136,7 @@ publish/
# NuGet Packages Directory
## TODO: If you have NuGet Package Restore enabled, uncomment the next line
packages/
+*.nupkg
# Windows Azure Build Output
csx
diff --git a/src/TestHarness/TestHarness.csproj b/src/TestHarness/TestHarness.csproj
index f83bcb5a..4ce3dd5b 100644
--- a/src/TestHarness/TestHarness.csproj
+++ b/src/TestHarness/TestHarness.csproj
@@ -10,15 +10,20 @@
TestHarness
TestHarness
v4.5
- 512
+ 4096
..\
true
+ False
+ False
+ False
+ False
+ obj\$(Configuration)\
- AnyCPU
+ x86
true
- full
- false
+ Full
+ False
bin\Debug\
DEBUG;TRACE
prompt
@@ -33,6 +38,15 @@
prompt
4
+
+ False
+ obj\
+
+
+ 4194304
+ False
+ Auto
+
diff --git a/src/kafka-net.nuspec b/src/kafka-net.nuspec
index 292fe471..b98b2c1f 100644
--- a/src/kafka-net.nuspec
+++ b/src/kafka-net.nuspec
@@ -16,6 +16,6 @@
C# Apache Kafka
-
+
\ No newline at end of file
diff --git a/src/kafka-net.sln b/src/kafka-net.sln
index 7405a564..cb525736 100644
--- a/src/kafka-net.sln
+++ b/src/kafka-net.sln
@@ -1,14 +1,9 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 2013
+# Visual Studio 2012
+# SharpDevelop 4.4
VisualStudioVersion = 12.0.30723.0
MinimumVisualStudioVersion = 10.0.40219.1
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}"
-EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{106F20D4-E22F-4C73-9D48-7F38E2A77163}"
ProjectSection(SolutionItems) = preProject
.nuget\NuGet.Config = .nuget\NuGet.Config
@@ -23,6 +18,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\README.md = ..\README.md
EndProjectSection
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs
index 268740e0..35501594 100644
--- a/src/kafka-net/Common/Extensions.cs
+++ b/src/kafka-net/Common/Extensions.cs
@@ -116,5 +116,11 @@ public static async Task WithCancellation(this Task task, CancellationT
return await task;
}
+
+ public static void times(this int n, Action action){
+ for (int i = 0; i < n; i++) {
+ action.Invoke();
+ }
+ }
}
}
diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs
index 681fcd21..619b2ce9 100755
--- a/src/kafka-net/Consumer.cs
+++ b/src/kafka-net/Consumer.cs
@@ -18,17 +18,17 @@ namespace KafkaNet
///
public class Consumer : IMetadataQueries, IDisposable
{
- private readonly ConsumerOptions _options;
- private readonly BlockingCollection _fetchResponseQueue;
- private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
- private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary();
- private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary();
- private readonly IScheduledTimer _topicPartitionQueryTimer;
- private readonly IMetadataQueries _metadataQueries;
-
- private int _disposeCount;
- private int _ensureOneThread;
- private Topic _topic;
+ protected readonly ConsumerOptions _options;
+ protected readonly BlockingCollection _fetchResponseQueue;
+ protected readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
+ protected readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary();
+ protected readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary();
+ protected readonly IScheduledTimer _topicPartitionQueryTimer;
+ protected readonly IMetadataQueries _metadataQueries;
+
+ protected int _disposeCount;
+ protected int _ensureOneThread;
+ protected Topic _topic;
public Consumer(ConsumerOptions options, params OffsetPosition[] positions)
{
@@ -87,7 +87,7 @@ public List GetOffsetPosition()
return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList();
}
- private void RefreshTopicPartitions()
+ protected void RefreshTopicPartitions()
{
try
{
@@ -143,7 +143,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
{
Topic = topic,
PartitionId = partitionId,
- Offset = offset
+ Offset = offset,
+ MaxBytes = _options.MaxMessageSize
}
};
@@ -164,13 +165,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
foreach (var message in response.Messages)
{
_fetchResponseQueue.Add(message, _disposeToken.Token);
-
if (_disposeToken.IsCancellationRequested) return;
}
var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset);
-
// sleep is not needed if responses were received
continue;
}
diff --git a/src/kafka-net/Interfaces/IMetadataQueries.cs b/src/kafka-net/Interfaces/IMetadataQueries.cs
index c9443915..61b85d21 100644
--- a/src/kafka-net/Interfaces/IMetadataQueries.cs
+++ b/src/kafka-net/Interfaces/IMetadataQueries.cs
@@ -7,7 +7,7 @@ namespace KafkaNet
///
/// Contains common metadata query commands that are used by both a consumer and producer.
///
- interface IMetadataQueries : IDisposable
+ public interface IMetadataQueries : IDisposable
{
///
/// Get metadata on the given topic.
diff --git a/src/kafka-net/Model/ConsumerOptions.cs b/src/kafka-net/Model/ConsumerOptions.cs
index 3c55d8ba..b68b8495 100644
--- a/src/kafka-net/Model/ConsumerOptions.cs
+++ b/src/kafka-net/Model/ConsumerOptions.cs
@@ -7,7 +7,7 @@ public class ConsumerOptions
{
private const int DefaultMaxConsumerBufferSize = 50;
private const int DefaultBackoffIntervalMS = 1000;
-
+ private const int DefaultMaxMsgSize = 4096 * 1024; // default to be 4 MB for max message size.
///
/// The topic to consume messages from.
///
@@ -37,6 +37,8 @@ public class ConsumerOptions
///
public TimeSpan BackoffInterval { get; set; }
+ public int MaxMessageSize { get; set; }
+
public ConsumerOptions(string topic, IBrokerRouter router)
{
Topic = topic;
@@ -46,6 +48,7 @@ public ConsumerOptions(string topic, IBrokerRouter router)
TopicPartitionQueryTimeMs = (int)TimeSpan.FromMinutes(15).TotalMilliseconds;
ConsumerBufferSize = DefaultMaxConsumerBufferSize;
BackoffInterval = TimeSpan.FromMilliseconds(DefaultBackoffIntervalMS);
+ MaxMessageSize = DefaultMaxMsgSize;
}
}
}
\ No newline at end of file
diff --git a/src/kafka-net/NativeHLConsumer.cs b/src/kafka-net/NativeHLConsumer.cs
new file mode 100644
index 00000000..bde57150
--- /dev/null
+++ b/src/kafka-net/NativeHLConsumer.cs
@@ -0,0 +1,159 @@
+/*
+ * Created by SharpDevelop.
+ * User: peng.zang
+ * Date: 11/11/2014
+ * Time: 10:49 AM
+ *
+ * To change this template use Tools | Options | Coding | Edit Standard Headers.
+ */
+using System;
+using System.Linq;
+using System.Threading;
+using System.Collections.Generic;
+
+using KafkaNet.Protocol;
+using KafkaNet.Model;
+using KafkaNet.Common;
+
+namespace KafkaNet
+{
+ ///
+ /// A High level API with consumer group support. Automatic commits the offset for the group, and will return a non-blocking
+ /// message list to client.
+ /// TODO: Make sure offset tracking works in parallel (right now it will consume in a "at least once" manner)
+ ///
+ public class NativeHLConsumer : Consumer
+ {
+
+ protected string _consumerGroup;
+
+ public NativeHLConsumer(ConsumerOptions options, string consumerGroup, params OffsetPosition[] positions)
+ : base(options, positions)
+ {
+ if (_topic == null || _topic.Name != _options.Topic)
+ _topic = _metadataQueries.GetTopic(_options.Topic);
+ _consumerGroup = consumerGroup;
+ RefreshOffsets();
+ }
+
+ ///
+ /// Refresh offset by fetching the offset from kafka server for this._consumerGroup; also check if the offset is within the range of
+ /// min-max offset in current topic, if not, set to minimum offset.
+ ///
+ public void RefreshOffsets()
+ {
+ var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result;
+ var maxminGroups = actualOffsets.Select(x => new { pid = x.PartitionId, min = x.Offsets.Min(), max = x.Offsets.Max() });
+
+ _topic.Partitions.ForEach(
+ partition =>
+ {
+ _options.Router.SelectBrokerRoute(_topic.Name, partition.PartitionId).Connection
+ .SendAsync(CreateOffsetFetchRequest(_consumerGroup, partition.PartitionId))
+ .Result.ForEach(
+ offsetResp =>
+ {
+ Console.WriteLine("fetch offset: " + offsetResp.ToString());
+
+ if (actualOffsets.Any(x => x.PartitionId == partition.PartitionId))
+ {
+ var actual = maxminGroups.First(x => x.pid == partition.PartitionId);
+ if (actual.min > offsetResp.Offset || actual.max < offsetResp.Offset)
+ {
+ offsetResp.Offset = actual.min;
+ }
+ }
+ _partitionOffsetIndex.AddOrUpdate(partition.PartitionId, i => offsetResp.Offset, (i, l) => offsetResp.Offset);
+ });
+ }
+ );
+
+ }
+
+ ///
+ /// One time consuming certain num of messages specified, and stop consuming more at the end of call. It'll automatically increase
+ /// the offset by num and commit it. If fail to commit offset, it'll return null result.
+ ///
+ ///
+ ///
+ public IEnumerable Consume(int num, int timeout=1000)
+ {
+ List result = new List();
+
+ _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic);
+ _topicPartitionQueryTimer.Begin();
+
+ while (result.Count < num) {
+ Message temp = null;
+ if(!_fetchResponseQueue.TryTake(out temp, timeout)){
+ return null;
+ }
+
+ if(temp != null){
+ var conn = _options.Router.SelectBrokerRoute(_topic.Name, temp.Meta.PartitionId).Connection;
+ var offsets = conn.SendAsync(CreateOffsetFetchRequest(_consumerGroup, temp.Meta.PartitionId )).Result;
+ var x = offsets.FirstOrDefault();
+
+ if(x != null && x.PartitionId == temp.Meta.PartitionId){
+ if(x.Offset > temp.Meta.Offset)
+ _options.Log.DebugFormat("GET Duplicated message");
+ else {
+ if(CommitOffset(conn, temp.Meta.PartitionId, temp.Meta.Offset+1))
+ result.Add(temp);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ protected bool CommitOffset(IKafkaConnection conn, int pid, long offset)
+ {
+ var resp = conn.SendAsync(CreateOffsetCommitRequest(_consumerGroup, pid, offset)).Result.FirstOrDefault();
+ if (resp != null && ((int)resp.Error) == (int)ErrorResponseCode.NoError)
+ return true;
+ else
+ {
+ return false;
+ }
+ }
+
+ protected OffsetFetchRequest CreateOffsetFetchRequest(string consumerGroup, int partitionId)
+ {
+ var request = new OffsetFetchRequest
+ {
+ ConsumerGroup = consumerGroup,
+ Topics = new List
+ {
+ new OffsetFetch
+ {
+ PartitionId = partitionId,
+ Topic = _options.Topic
+ }
+ }
+ };
+
+ return request;
+ }
+
+ protected OffsetCommitRequest CreateOffsetCommitRequest(string consumerGroup, int partitionId, long offset, string metadata = null)
+ {
+ var commit = new OffsetCommitRequest
+ {
+ ConsumerGroup = consumerGroup,
+ OffsetCommits = new List
+ {
+ new OffsetCommit
+ {
+ PartitionId = partitionId,
+ Topic = _topic.Name,
+ Offset = offset,
+ Metadata = metadata
+ }
+ }
+ };
+
+ return commit;
+ }
+ }
+}
diff --git a/src/kafka-net/Protocol/Message.cs b/src/kafka-net/Protocol/Message.cs
index f23cb92d..185c62bb 100644
--- a/src/kafka-net/Protocol/Message.cs
+++ b/src/kafka-net/Protocol/Message.cs
@@ -181,6 +181,7 @@ public static IEnumerable DecodeMessage(long offset, byte[] payload)
throw new NotSupportedException(string.Format("Codec type of {0} is not supported.", codec));
}
}
+
}
///
@@ -200,5 +201,7 @@ public class MessageMetadata
/// The partition id this offset is from.
///
public int PartitionId { get; set; }
+
+
}
}
diff --git a/src/kafka-net/ZookeeperHLConsumer.cs b/src/kafka-net/ZookeeperHLConsumer.cs
new file mode 100644
index 00000000..e7171d70
--- /dev/null
+++ b/src/kafka-net/ZookeeperHLConsumer.cs
@@ -0,0 +1,132 @@
+/*
+ * Created by SharpDevelop.
+ * User: peng.zang
+ * Date: 11/10/2014
+ * Time: 2:44 PM
+ *
+ * To change this template use Tools | Options | Coding | Edit Standard Headers.
+ */
+using System;
+using System.Text;
+using System.Linq;
+using System.Collections.Generic;
+
+using KafkaNet.Model;
+using KafkaNet.Protocol;
+using KafkaNet.Common;
+
+using ZooKeeperNet;
+
+namespace KafkaNet
+{
+ ///
+ /// High level consumer using zookeeper for coordination.
+ ///
+ public class ZookeeperHLConsumer
+ {
+ KafkaOptions _options;
+ BrokerRouter _router;
+ Consumer _consumer;
+ ZooKeeper _zookeeper;
+ string _topic;
+ IWatcher _watcher;
+
+ public ZookeeperHLConsumer(string topic, List brokerList, string zookps, TimeSpan? timeout = null, IWatcher watcher = null)
+ {
+ _options = new KafkaOptions();
+ _options.KafkaServerUri = brokerList.ConvertAll(x => new Uri(x));
+
+ _router = new BrokerRouter(_options);
+ _consumer = new KafkaNet.Consumer(new ConsumerOptions(topic, _router));
+
+ _topic = topic;
+
+ _zookeeper = new ZooKeeper(zookps, timeout.HasValue ? timeout.Value : new TimeSpan(7, 0, 0), watcher);
+ _watcher = watcher;
+ }
+
+ ///
+ /// consume topic specified in the constructor with groupID here. Only consume n messages.
+ ///
+ ///
+ ///
+ ///
+ public IEnumerable Consume(string groupID, int n)
+ {
+ var p = "/consumers/" + groupID + "/offsets/" + this._topic;
+ try
+ {
+ if (_zookeeper.Exists(p, _watcher) == null)
+ {
+ CreateZookeeperPath("/consumers", "/" + groupID, "/offsets", "/" + this._topic);
+ var common = new MetadataQueries(_router);
+ var offsets = common.GetTopicOffsetAsync(groupID).Result;
+ _consumer.SetOffsetPosition(offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Min())).ToArray());
+ offsets.ForEach(off =>
+ {
+ _zookeeper.Create(p + "/" + off.PartitionId.ToString(),
+ System.Text.Encoding.UTF8.GetBytes(off.Offsets.Min().ToString()),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PersistentSequential);
+ });
+ }
+ else
+ {
+ var children = _zookeeper.GetChildren(p, _watcher); //TODO: add watcher and stat support.
+ var offsets = new List();
+ children.ToList().ForEach(x =>
+ {
+ int partition;
+ if (int.TryParse(x, out partition))
+ {
+ var data = _zookeeper.GetData(p + "/" + partition, _watcher, null);
+ if (data != null && data.Length > 0)
+ {
+ long offset = 0;
+ if (long.TryParse(System.Text.Encoding.Default.GetString(data), out offset))
+ {
+ offsets.Add(new OffsetPosition(partition, offset));
+ }
+ }
+ }
+ });
+ _consumer.SetOffsetPosition(offsets.ToArray());
+ }
+ }
+ catch (Exception)
+ {
+ //TODO: Log the error, or handle it?
+ }
+
+ return _consumer.Consume().Take(n);
+ }
+
+ ///
+ /// create zookeeper path hierarchically.
+ ///
+ /// paths by level, the next path should be append to the previous one to form a valid path
+ ///
+ public bool CreateZookeeperPath(params string[] path)
+ {
+ var sb = new StringBuilder();
+ var success = true;
+ try
+ {
+ for (int i = 0; i < path.Length; i++)
+ {
+ sb.Append(path[i]);
+ if (_zookeeper.Exists(sb.ToString(), _watcher) == null)
+ {
+ _zookeeper.Create(sb.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent);
+ }
+ }
+ }
+ catch (Exception)
+ {
+ success = false;
+ //TODO:
+ }
+ return success;
+ }
+ }
+}
diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj
index 8e15f79d..2fe8a410 100644
--- a/src/kafka-net/kafka-net.csproj
+++ b/src/kafka-net/kafka-net.csproj
@@ -10,18 +10,27 @@
KafkaNet
kafka-net
v4.5
- 512
+ 4096
+ ..\
+ true
+ False
+ False
+ False
+ False
+ obj\$(Configuration)\
+ False
+ False
+ OnBuildSuccess
true
- full
- false
+ Full
+ False
bin\Debug\
DEBUG;TRACE
prompt
4
- false
pdbonly
@@ -32,7 +41,20 @@
4
false
+
+ False
+ obj\
+
+
+ 4194304
+ x86
+ False
+ Auto
+
+
+ ..\packages\log4net.2.0.3\lib\net40-full\log4net.dll
+
@@ -40,10 +62,14 @@
+
+ ..\packages\ZooKeeper.Net.3.4.6.0\lib\net40\ZooKeeperNet.dll
+
+
@@ -59,6 +85,7 @@
+
@@ -86,11 +113,13 @@
+
+