diff --git a/.gitignore b/.gitignore index c8717a0d..b578d461 100644 --- a/.gitignore +++ b/.gitignore @@ -136,6 +136,7 @@ publish/ # NuGet Packages Directory ## TODO: If you have NuGet Package Restore enabled, uncomment the next line packages/ +*.nupkg # Windows Azure Build Output csx diff --git a/src/TestHarness/TestHarness.csproj b/src/TestHarness/TestHarness.csproj index f83bcb5a..4ce3dd5b 100644 --- a/src/TestHarness/TestHarness.csproj +++ b/src/TestHarness/TestHarness.csproj @@ -10,15 +10,20 @@ TestHarness TestHarness v4.5 - 512 + 4096 ..\ true + False + False + False + False + obj\$(Configuration)\ - AnyCPU + x86 true - full - false + Full + False bin\Debug\ DEBUG;TRACE prompt @@ -33,6 +38,15 @@ prompt 4 + + False + obj\ + + + 4194304 + False + Auto + diff --git a/src/kafka-net.nuspec b/src/kafka-net.nuspec index 292fe471..b98b2c1f 100644 --- a/src/kafka-net.nuspec +++ b/src/kafka-net.nuspec @@ -16,6 +16,6 @@ C# Apache Kafka - + \ No newline at end of file diff --git a/src/kafka-net.sln b/src/kafka-net.sln index 7405a564..cb525736 100644 --- a/src/kafka-net.sln +++ b/src/kafka-net.sln @@ -1,14 +1,9 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 2013 +# Visual Studio 2012 +# SharpDevelop 4.4 VisualStudioVersion = 12.0.30723.0 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{106F20D4-E22F-4C73-9D48-7F38E2A77163}" ProjectSection(SolutionItems) = preProject .nuget\NuGet.Config = .nuget\NuGet.Config @@ -23,6 +18,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution ..\README.md = ..\README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 268740e0..35501594 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -116,5 +116,11 @@ public static async Task WithCancellation(this Task task, CancellationT return await task; } + + public static void times(this int n, Action action){ + for (int i = 0; i < n; i++) { + action.Invoke(); + } + } } } diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 681fcd21..619b2ce9 100755 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -18,17 +18,17 @@ namespace KafkaNet /// public class Consumer : IMetadataQueries, IDisposable { - private readonly ConsumerOptions _options; - private readonly BlockingCollection _fetchResponseQueue; - private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); - private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); - private readonly IScheduledTimer _topicPartitionQueryTimer; - private readonly IMetadataQueries _metadataQueries; - - private int _disposeCount; - private int _ensureOneThread; - private Topic _topic; + protected readonly ConsumerOptions _options; + protected readonly BlockingCollection _fetchResponseQueue; + protected readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); + protected readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); + protected readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); + protected readonly IScheduledTimer _topicPartitionQueryTimer; + protected readonly IMetadataQueries _metadataQueries; + + protected int _disposeCount; + protected int _ensureOneThread; + protected Topic _topic; public Consumer(ConsumerOptions options, params OffsetPosition[] positions) { @@ -87,7 +87,7 @@ public List GetOffsetPosition() return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList(); } - private void RefreshTopicPartitions() + protected void RefreshTopicPartitions() { try { @@ -143,7 +143,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { Topic = topic, PartitionId = partitionId, - Offset = offset + Offset = offset, + MaxBytes = _options.MaxMessageSize } }; @@ -164,13 +165,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) foreach (var message in response.Messages) { _fetchResponseQueue.Add(message, _disposeToken.Token); - if (_disposeToken.IsCancellationRequested) return; } var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1; _partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset); - // sleep is not needed if responses were received continue; } diff --git a/src/kafka-net/Interfaces/IMetadataQueries.cs b/src/kafka-net/Interfaces/IMetadataQueries.cs index c9443915..61b85d21 100644 --- a/src/kafka-net/Interfaces/IMetadataQueries.cs +++ b/src/kafka-net/Interfaces/IMetadataQueries.cs @@ -7,7 +7,7 @@ namespace KafkaNet /// /// Contains common metadata query commands that are used by both a consumer and producer. /// - interface IMetadataQueries : IDisposable + public interface IMetadataQueries : IDisposable { /// /// Get metadata on the given topic. diff --git a/src/kafka-net/Model/ConsumerOptions.cs b/src/kafka-net/Model/ConsumerOptions.cs index 3c55d8ba..b68b8495 100644 --- a/src/kafka-net/Model/ConsumerOptions.cs +++ b/src/kafka-net/Model/ConsumerOptions.cs @@ -7,7 +7,7 @@ public class ConsumerOptions { private const int DefaultMaxConsumerBufferSize = 50; private const int DefaultBackoffIntervalMS = 1000; - + private const int DefaultMaxMsgSize = 4096 * 1024; // default to be 4 MB for max message size. /// /// The topic to consume messages from. /// @@ -37,6 +37,8 @@ public class ConsumerOptions /// public TimeSpan BackoffInterval { get; set; } + public int MaxMessageSize { get; set; } + public ConsumerOptions(string topic, IBrokerRouter router) { Topic = topic; @@ -46,6 +48,7 @@ public ConsumerOptions(string topic, IBrokerRouter router) TopicPartitionQueryTimeMs = (int)TimeSpan.FromMinutes(15).TotalMilliseconds; ConsumerBufferSize = DefaultMaxConsumerBufferSize; BackoffInterval = TimeSpan.FromMilliseconds(DefaultBackoffIntervalMS); + MaxMessageSize = DefaultMaxMsgSize; } } } \ No newline at end of file diff --git a/src/kafka-net/NativeHLConsumer.cs b/src/kafka-net/NativeHLConsumer.cs new file mode 100644 index 00000000..bde57150 --- /dev/null +++ b/src/kafka-net/NativeHLConsumer.cs @@ -0,0 +1,159 @@ +/* + * Created by SharpDevelop. + * User: peng.zang + * Date: 11/11/2014 + * Time: 10:49 AM + * + * To change this template use Tools | Options | Coding | Edit Standard Headers. + */ +using System; +using System.Linq; +using System.Threading; +using System.Collections.Generic; + +using KafkaNet.Protocol; +using KafkaNet.Model; +using KafkaNet.Common; + +namespace KafkaNet +{ + /// + /// A High level API with consumer group support. Automatic commits the offset for the group, and will return a non-blocking + /// message list to client. + /// TODO: Make sure offset tracking works in parallel (right now it will consume in a "at least once" manner) + /// + public class NativeHLConsumer : Consumer + { + + protected string _consumerGroup; + + public NativeHLConsumer(ConsumerOptions options, string consumerGroup, params OffsetPosition[] positions) + : base(options, positions) + { + if (_topic == null || _topic.Name != _options.Topic) + _topic = _metadataQueries.GetTopic(_options.Topic); + _consumerGroup = consumerGroup; + RefreshOffsets(); + } + + /// + /// Refresh offset by fetching the offset from kafka server for this._consumerGroup; also check if the offset is within the range of + /// min-max offset in current topic, if not, set to minimum offset. + /// + public void RefreshOffsets() + { + var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result; + var maxminGroups = actualOffsets.Select(x => new { pid = x.PartitionId, min = x.Offsets.Min(), max = x.Offsets.Max() }); + + _topic.Partitions.ForEach( + partition => + { + _options.Router.SelectBrokerRoute(_topic.Name, partition.PartitionId).Connection + .SendAsync(CreateOffsetFetchRequest(_consumerGroup, partition.PartitionId)) + .Result.ForEach( + offsetResp => + { + Console.WriteLine("fetch offset: " + offsetResp.ToString()); + + if (actualOffsets.Any(x => x.PartitionId == partition.PartitionId)) + { + var actual = maxminGroups.First(x => x.pid == partition.PartitionId); + if (actual.min > offsetResp.Offset || actual.max < offsetResp.Offset) + { + offsetResp.Offset = actual.min; + } + } + _partitionOffsetIndex.AddOrUpdate(partition.PartitionId, i => offsetResp.Offset, (i, l) => offsetResp.Offset); + }); + } + ); + + } + + /// + /// One time consuming certain num of messages specified, and stop consuming more at the end of call. It'll automatically increase + /// the offset by num and commit it. If fail to commit offset, it'll return null result. + /// + /// + /// + public IEnumerable Consume(int num, int timeout=1000) + { + List result = new List(); + + _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); + _topicPartitionQueryTimer.Begin(); + + while (result.Count < num) { + Message temp = null; + if(!_fetchResponseQueue.TryTake(out temp, timeout)){ + return null; + } + + if(temp != null){ + var conn = _options.Router.SelectBrokerRoute(_topic.Name, temp.Meta.PartitionId).Connection; + var offsets = conn.SendAsync(CreateOffsetFetchRequest(_consumerGroup, temp.Meta.PartitionId )).Result; + var x = offsets.FirstOrDefault(); + + if(x != null && x.PartitionId == temp.Meta.PartitionId){ + if(x.Offset > temp.Meta.Offset) + _options.Log.DebugFormat("GET Duplicated message"); + else { + if(CommitOffset(conn, temp.Meta.PartitionId, temp.Meta.Offset+1)) + result.Add(temp); + } + } + } + } + return result; + } + + protected bool CommitOffset(IKafkaConnection conn, int pid, long offset) + { + var resp = conn.SendAsync(CreateOffsetCommitRequest(_consumerGroup, pid, offset)).Result.FirstOrDefault(); + if (resp != null && ((int)resp.Error) == (int)ErrorResponseCode.NoError) + return true; + else + { + return false; + } + } + + protected OffsetFetchRequest CreateOffsetFetchRequest(string consumerGroup, int partitionId) + { + var request = new OffsetFetchRequest + { + ConsumerGroup = consumerGroup, + Topics = new List + { + new OffsetFetch + { + PartitionId = partitionId, + Topic = _options.Topic + } + } + }; + + return request; + } + + protected OffsetCommitRequest CreateOffsetCommitRequest(string consumerGroup, int partitionId, long offset, string metadata = null) + { + var commit = new OffsetCommitRequest + { + ConsumerGroup = consumerGroup, + OffsetCommits = new List + { + new OffsetCommit + { + PartitionId = partitionId, + Topic = _topic.Name, + Offset = offset, + Metadata = metadata + } + } + }; + + return commit; + } + } +} diff --git a/src/kafka-net/Protocol/Message.cs b/src/kafka-net/Protocol/Message.cs index f23cb92d..185c62bb 100644 --- a/src/kafka-net/Protocol/Message.cs +++ b/src/kafka-net/Protocol/Message.cs @@ -181,6 +181,7 @@ public static IEnumerable DecodeMessage(long offset, byte[] payload) throw new NotSupportedException(string.Format("Codec type of {0} is not supported.", codec)); } } + } /// @@ -200,5 +201,7 @@ public class MessageMetadata /// The partition id this offset is from. /// public int PartitionId { get; set; } + + } } diff --git a/src/kafka-net/ZookeeperHLConsumer.cs b/src/kafka-net/ZookeeperHLConsumer.cs new file mode 100644 index 00000000..e7171d70 --- /dev/null +++ b/src/kafka-net/ZookeeperHLConsumer.cs @@ -0,0 +1,132 @@ +/* + * Created by SharpDevelop. + * User: peng.zang + * Date: 11/10/2014 + * Time: 2:44 PM + * + * To change this template use Tools | Options | Coding | Edit Standard Headers. + */ +using System; +using System.Text; +using System.Linq; +using System.Collections.Generic; + +using KafkaNet.Model; +using KafkaNet.Protocol; +using KafkaNet.Common; + +using ZooKeeperNet; + +namespace KafkaNet +{ + /// + /// High level consumer using zookeeper for coordination. + /// + public class ZookeeperHLConsumer + { + KafkaOptions _options; + BrokerRouter _router; + Consumer _consumer; + ZooKeeper _zookeeper; + string _topic; + IWatcher _watcher; + + public ZookeeperHLConsumer(string topic, List brokerList, string zookps, TimeSpan? timeout = null, IWatcher watcher = null) + { + _options = new KafkaOptions(); + _options.KafkaServerUri = brokerList.ConvertAll(x => new Uri(x)); + + _router = new BrokerRouter(_options); + _consumer = new KafkaNet.Consumer(new ConsumerOptions(topic, _router)); + + _topic = topic; + + _zookeeper = new ZooKeeper(zookps, timeout.HasValue ? timeout.Value : new TimeSpan(7, 0, 0), watcher); + _watcher = watcher; + } + + /// + /// consume topic specified in the constructor with groupID here. Only consume n messages. + /// + /// + /// + /// + public IEnumerable Consume(string groupID, int n) + { + var p = "/consumers/" + groupID + "/offsets/" + this._topic; + try + { + if (_zookeeper.Exists(p, _watcher) == null) + { + CreateZookeeperPath("/consumers", "/" + groupID, "/offsets", "/" + this._topic); + var common = new MetadataQueries(_router); + var offsets = common.GetTopicOffsetAsync(groupID).Result; + _consumer.SetOffsetPosition(offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Min())).ToArray()); + offsets.ForEach(off => + { + _zookeeper.Create(p + "/" + off.PartitionId.ToString(), + System.Text.Encoding.UTF8.GetBytes(off.Offsets.Min().ToString()), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PersistentSequential); + }); + } + else + { + var children = _zookeeper.GetChildren(p, _watcher); //TODO: add watcher and stat support. + var offsets = new List(); + children.ToList().ForEach(x => + { + int partition; + if (int.TryParse(x, out partition)) + { + var data = _zookeeper.GetData(p + "/" + partition, _watcher, null); + if (data != null && data.Length > 0) + { + long offset = 0; + if (long.TryParse(System.Text.Encoding.Default.GetString(data), out offset)) + { + offsets.Add(new OffsetPosition(partition, offset)); + } + } + } + }); + _consumer.SetOffsetPosition(offsets.ToArray()); + } + } + catch (Exception) + { + //TODO: Log the error, or handle it? + } + + return _consumer.Consume().Take(n); + } + + /// + /// create zookeeper path hierarchically. + /// + /// paths by level, the next path should be append to the previous one to form a valid path + /// + public bool CreateZookeeperPath(params string[] path) + { + var sb = new StringBuilder(); + var success = true; + try + { + for (int i = 0; i < path.Length; i++) + { + sb.Append(path[i]); + if (_zookeeper.Exists(sb.ToString(), _watcher) == null) + { + _zookeeper.Create(sb.ToString(), new byte[0], Ids.CREATOR_ALL_ACL, CreateMode.Persistent); + } + } + } + catch (Exception) + { + success = false; + //TODO: + } + return success; + } + } +} diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index 8e15f79d..2fe8a410 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -10,18 +10,27 @@ KafkaNet kafka-net v4.5 - 512 + 4096 + ..\ + true + False + False + False + False + obj\$(Configuration)\ + False + False + OnBuildSuccess true - full - false + Full + False bin\Debug\ DEBUG;TRACE prompt 4 - false pdbonly @@ -32,7 +41,20 @@ 4 false + + False + obj\ + + + 4194304 + x86 + False + Auto + + + ..\packages\log4net.2.0.3\lib\net40-full\log4net.dll + @@ -40,10 +62,14 @@ + + ..\packages\ZooKeeper.Net.3.4.6.0\lib\net40\ZooKeeperNet.dll + + @@ -59,6 +85,7 @@ + @@ -86,11 +113,13 @@ + +