1
- using System ;
1
+ using System ;
2
2
using System . Collections . Generic ;
3
3
using System . IO ;
4
4
using System . IO . Compression ;
15
15
namespace Nlog . RabbitMQ . Target
16
16
{
17
17
/// <summary>
18
- /// TODO
18
+ /// NLog target for writing to RabbitMQ Topic
19
19
/// </summary>
20
20
[ Target ( "RabbitMQ" ) ]
21
21
public class RabbitMQTarget : TargetWithContext
@@ -31,8 +31,8 @@ public enum CompressionTypes
31
31
private string _ModelExchange ;
32
32
private readonly Encoding _Encoding = Encoding . UTF8 ;
33
33
34
- private readonly Queue < Tuple < byte [ ] , IBasicProperties , string > > _UnsentMessages
35
- = new Queue < Tuple < byte [ ] , IBasicProperties , string > > ( 512 ) ;
34
+ private readonly Queue < Tuple < byte [ ] , IBasicProperties , Func < IBasicProperties , IBasicProperties > , string > > _UnsentMessages
35
+ = new Queue < Tuple < byte [ ] , IBasicProperties , Func < IBasicProperties , IBasicProperties > , string > > ( 512 ) ;
36
36
37
37
private readonly object _sync = new object ( ) ;
38
38
@@ -259,11 +259,10 @@ protected override void Write(LogEventInfo logEvent)
259
259
modelExchange = _ModelExchange ;
260
260
}
261
261
262
- var basicProperties = GetBasicProperties ( logEvent , model ) ;
263
-
262
+ var basicProperties = model != null ? GetBasicProperties ( logEvent , model ) : null ;
264
263
if ( model == null || ! model . IsOpen )
265
264
{
266
- if ( ! AddUnsent ( routingKey , basicProperties , message ) )
265
+ if ( ! AddUnsent ( logEvent , routingKey , basicProperties , message ) )
267
266
{
268
267
throw new InvalidOperationException ( "LogEvent discarded because RabbitMQ instance is offline and reached MaxBuffer" ) ;
269
268
}
@@ -281,13 +280,13 @@ protected override void Write(LogEventInfo logEvent)
281
280
catch ( IOException e )
282
281
{
283
282
InternalLogger . Error ( e , "RabbitMQTarget(Name={0}): Could not send to RabbitMQ instance: {1}" , Name , e . Message ) ;
284
- if ( ! AddUnsent ( routingKey , basicProperties , message ) )
283
+ if ( ! AddUnsent ( logEvent , routingKey , basicProperties , message ) )
285
284
throw ;
286
285
}
287
286
catch ( ObjectDisposedException e )
288
287
{
289
288
InternalLogger . Error ( e , "RabbitMQTarget(Name={0}): Could not send to RabbitMQ instance: {1}" , Name , e . Message ) ;
290
- if ( ! AddUnsent ( routingKey , basicProperties , message ) )
289
+ if ( ! AddUnsent ( logEvent , routingKey , basicProperties , message ) )
291
290
throw ;
292
291
}
293
292
catch ( Exception e )
@@ -305,11 +304,14 @@ protected override void Write(LogEventInfo logEvent)
305
304
}
306
305
}
307
306
308
- private bool AddUnsent ( string routingKey , IBasicProperties basicProperties , byte [ ] message )
307
+ private bool AddUnsent ( LogEventInfo logEvent , string routingKey , IBasicProperties basicProperties , byte [ ] message )
309
308
{
310
309
if ( _UnsentMessages . Count < MaxBuffer )
311
310
{
312
- _UnsentMessages . Enqueue ( Tuple . Create ( message , basicProperties , routingKey ) ) ;
311
+ Func < IBasicProperties , IBasicProperties > propertyResolver = ( props ) => props ;
312
+ if ( basicProperties == null )
313
+ propertyResolver = ( props ) => _Model != null ? GetBasicProperties ( logEvent , _Model ) : null ;
314
+ _UnsentMessages . Enqueue ( Tuple . Create ( message , basicProperties , propertyResolver , routingKey ) ) ;
313
315
return true ;
314
316
}
315
317
else
@@ -326,7 +328,8 @@ private void CheckUnsent(IModel model, string exchange)
326
328
{
327
329
var tuple = _UnsentMessages . Dequeue ( ) ;
328
330
InternalLogger . Info ( "RabbitMQTarget(Name={0}): Publishing unsent message: {1}." , Name , tuple ) ;
329
- Publish ( model , tuple . Item1 , tuple . Item2 , tuple . Item3 , exchange ) ;
331
+ var basicProperties = tuple . Item3 . Invoke ( tuple . Item2 ) ;
332
+ Publish ( model , tuple . Item1 , basicProperties , tuple . Item4 , exchange ) ;
330
333
}
331
334
}
332
335
@@ -497,7 +500,7 @@ private void StartConnection(IConnection oldConnection, int timeoutMilliseconds,
497
500
498
501
if ( shutdownConnection == null )
499
502
{
500
- InternalLogger . Error ( e , string . Format ( "RabbitMQTarget(Name={0}): Could not connect to Rabbit instance: {1}" , Name , e . Message ) ) ;
503
+ InternalLogger . Error ( e , string . Format ( "RabbitMQTarget(Name={0}): Could not connect to RabbitMQ instance: {1}" , Name , e . Message ) ) ;
501
504
}
502
505
else
503
506
{
@@ -530,11 +533,11 @@ private void StartConnection(IConnection oldConnection, int timeoutMilliseconds,
530
533
var completedTask = Task . WhenAny ( t , Task . Delay ( TimeSpan . FromMilliseconds ( timeoutMilliseconds ) ) ) . ConfigureAwait ( false ) . GetAwaiter ( ) . GetResult ( ) ;
531
534
if ( ! ReferenceEquals ( completedTask , t ) )
532
535
{
533
- InternalLogger . Warn ( "RabbitMQTarget(Name={0}): Starting connection-task timed out, continuing " , Name ) ;
536
+ InternalLogger . Warn ( "RabbitMQTarget(Name={0}): Connection timeout to RabbitMQ instance after {1}ms " , Name , timeoutMilliseconds ) ;
534
537
}
535
538
else if ( completedTask . Exception != null )
536
539
{
537
- InternalLogger . Error ( completedTask . Exception , "RabbitMQTarget(Name={0}): Starting connection-task failed: {0 }" , Name , completedTask . Exception . Message ) ;
540
+ InternalLogger . Error ( completedTask . Exception , "RabbitMQTarget(Name={0}): Connection attempt to RabbitMQ instance failed: {1 }" , Name , completedTask . Exception . Message ) ;
538
541
}
539
542
}
540
543
0 commit comments