-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathCosmosProcessor.cs
69 lines (59 loc) · 2.18 KB
/
CosmosProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace cosmos.trigger
{
public static class CosmosProcessor
{
// Support for retries, see: https://learn.microsoft.com/azure/azure-functions/functions-bindings-error-pages#retry-strategies
[FixedDelayRetry(3, "00:00:05")]
[FunctionName("CosmosProcessor")]
public static async Task Run(
// Trigger to react to events
[CosmosDBTrigger(
databaseName: "%DATABASE%",
containerName: "%EVENTCONTAINER%",
Connection = "cosmosConnection",
LeaseContainerName = "leases",
PreferredLocations = "%REGION%",
MaxItemsPerInvocation = 1000,
LeaseContainerPrefix = "EventProcessor")]IReadOnlyList<DeviceTelemetry> events,
// Output binding to generate summary
[CosmosDB(
databaseName: "%DATABASE%",
containerName: "%SUMMARYCONTAINER%",
PreferredLocations = "%REGION%",
Connection = "cosmosConnection")]IAsyncCollector<Summary> summary,
ILogger log)
{
foreach (var group in events.GroupBy(singleEvent => singleEvent.DeviceId))
{
log.LogInformation($"Generating summary for device {group.Key}...");
await summary.AddAsync(new Summary()
{
Id = Guid.NewGuid().ToString(),
DeviceId = group.Key,
Time = DateTime.UtcNow.ToString("s"),
MaxValue = group.Max(item => item.Value)
});
}
}
public class DeviceTelemetry
{
public string DeviceId { get; set; }
public double Value { get; set; }
}
public class Summary
{
[JsonPropertyName("id")]
public string Id { get; set; }
public string DeviceId { get; set; }
public string Time { get; set; }
public double MaxValue { get; set; }
}
}
}