diff --git a/README.md b/README.md index 804244aa..51ff1bf5 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/_bootstrap/README.md b/_bootstrap/README.md index f39f5895..008e6cc5 100644 --- a/_bootstrap/README.md +++ b/_bootstrap/README.md @@ -101,7 +101,7 @@ Each client generates up to 2000 msgs/sec. Each generated message is close to 1K "moreData22": 12.12345678901234 }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-1554", + "deviceId": "contoso-device-id-000554", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/akskafka-databricks-cosmosdb/README.md b/akskafka-databricks-cosmosdb/README.md index 57b87776..9fb236c5 100644 --- a/akskafka-databricks-cosmosdb/README.md +++ b/akskafka-databricks-cosmosdb/README.md @@ -105,7 +105,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java index 2c0d992c..f5ed479c 100644 --- a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java +++ b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java @@ -84,7 +84,7 @@ public void testProcessingLogicState() throws Exception { sampleRecord1.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord1.value = 80.80967678165356d; sampleRecord1.type = "CO2"; - sampleRecord1.deviceId = "contoso://device-id-428"; + sampleRecord1.deviceId = "contoso-device-id-428"; sampleRecord1.deviceSequenceNumber = 3L; sampleRecord1.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord1.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); @@ -97,7 +97,7 @@ public void testProcessingLogicState() throws Exception { sampleRecord2.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord2.value = 70.80967678165356d; sampleRecord2.type = "TEMP"; - sampleRecord2.deviceId = "contoso://device-id-428"; + sampleRecord2.deviceId = "contoso-device-id-428"; sampleRecord2.deviceSequenceNumber = 3L; sampleRecord2.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord2.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); diff --git a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleData.java b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleData.java index b82b7223..2bdd367c 100644 --- a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleData.java +++ b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleData.java @@ -10,7 +10,7 @@ public static SampleRecord record() { sampleRecord.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord.value = 45.80967678165356d; sampleRecord.type = "CO2"; - sampleRecord.deviceId = "contoso://device-id-428"; + sampleRecord.deviceId = "contoso-device-id-428"; sampleRecord.deviceSequenceNumber = 3L; sampleRecord.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); diff --git a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleRecordTest.java b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleRecordTest.java index 097b9386..2d1bd1d7 100755 --- a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleRecordTest.java +++ b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleRecordTest.java @@ -18,7 +18,7 @@ public class SampleRecordTest { private SampleRecord sampleRecord; - private String serialized = "{\"eventId\":\"4fa25e6c-50d3-4189-9613-d486b71412df\",\"complexData\":null,\"value\":45.80967678165356,\"type\":\"CO2\",\"deviceId\":\"contoso://device-id-428\",\"deviceSequenceNumber\":3,\"createdAt\":\"2019-10-15T12:43:27.748Z\",\"enqueuedAt\":\"2019-10-16T12:43:27.748Z\",\"processedAt\":\"2019-10-17T12:43:27.748Z\"}"; + private String serialized = "{\"eventId\":\"4fa25e6c-50d3-4189-9613-d486b71412df\",\"complexData\":null,\"value\":45.80967678165356,\"type\":\"CO2\",\"deviceId\":\"contoso-device-id-428\",\"deviceSequenceNumber\":3,\"createdAt\":\"2019-10-15T12:43:27.748Z\",\"enqueuedAt\":\"2019-10-16T12:43:27.748Z\",\"processedAt\":\"2019-10-17T12:43:27.748Z\"}"; private JsonMapperSchema mapper = new JsonMapperSchema<>(SampleRecord.class); @@ -32,7 +32,7 @@ public void setUp() { sampleRecord.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord.value = 45.80967678165356d; sampleRecord.type = "CO2"; - sampleRecord.deviceId = "contoso://device-id-428"; + sampleRecord.deviceId = "contoso-device-id-428"; sampleRecord.deviceSequenceNumber = 3L; sampleRecord.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); diff --git a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleStateTest.java b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleStateTest.java index 1031ee73..e2989b36 100755 --- a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleStateTest.java +++ b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/data/SampleStateTest.java @@ -71,7 +71,7 @@ public void getLastRecord01() { sampleRecord1.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord1.value = 45.80967678165356d; sampleRecord1.type = "CO2"; - sampleRecord1.deviceId = "contoso://device-id-428"; + sampleRecord1.deviceId = "contoso-device-id-428"; sampleRecord1.deviceSequenceNumber = 3L; sampleRecord1.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord1.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); @@ -81,7 +81,7 @@ public void getLastRecord01() { sampleRecord2.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord2.value = 45.80967678165356d; sampleRecord2.type = "CO2"; - sampleRecord2.deviceId = "contoso://device-id-428"; + sampleRecord2.deviceId = "contoso-device-id-428"; sampleRecord2.deviceSequenceNumber = 3L; sampleRecord2.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord2.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); @@ -100,7 +100,7 @@ public void recordsSize01() { sampleRecord1.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord1.value = 45.80967678165356d; sampleRecord1.type = "CO2"; - sampleRecord1.deviceId = "contoso://device-id-428"; + sampleRecord1.deviceId = "contoso-device-id-428"; sampleRecord1.deviceSequenceNumber = 3L; sampleRecord1.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord1.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); @@ -110,7 +110,7 @@ public void recordsSize01() { sampleRecord2.eventId = "4fa25e6c-50d3-4189-9613-d486b71412df"; sampleRecord2.value = 45.80967678165356d; sampleRecord2.type = "CO2"; - sampleRecord2.deviceId = "contoso://device-id-428"; + sampleRecord2.deviceId = "contoso-device-id-428"; sampleRecord2.deviceSequenceNumber = 3L; sampleRecord2.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord2.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); @@ -133,7 +133,7 @@ public void addRecord01() { sampleRecord.eventId = Integer.toString(i); sampleRecord.value = 45.80967678165356d; sampleRecord.type = "CO2"; - sampleRecord.deviceId = "contoso://device-id-428"; + sampleRecord.deviceId = "contoso-device-id-428"; sampleRecord.deviceSequenceNumber = 3L; sampleRecord.createdAt = Instant.parse("2019-10-15T12:43:27.748Z"); sampleRecord.enqueuedAt = Instant.parse("2019-10-16T12:43:27.748Z"); diff --git a/components/azure-cosmosdb/create-cosmosdb.sh b/components/azure-cosmosdb/create-cosmosdb.sh index 415e96dc..20f7fa5c 100755 --- a/components/azure-cosmosdb/create-cosmosdb.sh +++ b/components/azure-cosmosdb/create-cosmosdb.sh @@ -9,7 +9,7 @@ echo ". account name: $COSMOSDB_SERVER_NAME" echo ". database name: $COSMOSDB_DATABASE_NAME" echo ". collection name: $COSMOSDB_COLLECTION_NAME" -az group deployment create \ +az deployment group create \ --resource-group $RESOURCE_GROUP \ --template-file ../components/azure-cosmosdb/cosmosdb-arm-template.json \ --parameters \ diff --git a/components/azure-databricks/create-databricks.sh b/components/azure-databricks/create-databricks.sh index 8f24d7bd..edde0553 100755 --- a/components/azure-databricks/create-databricks.sh +++ b/components/azure-databricks/create-databricks.sh @@ -16,7 +16,7 @@ else if ! az resource show -g $RESOURCE_GROUP --resource-type Microsoft.Databricks/workspaces -n $ADB_WORKSPACE -o none 2>/dev/null; then echo 'creating databricks workspace' echo ". name: $ADB_WORKSPACE" -az group deployment create \ +az deployment group create \ --name $ADB_WORKSPACE \ --resource-group $RESOURCE_GROUP \ --template-file ../components/azure-databricks/databricks-arm-template.json \ diff --git a/components/azure-dataexplorer/create-dataexplorer.sh b/components/azure-dataexplorer/create-dataexplorer.sh index 6d4769c9..ff542db9 100755 --- a/components/azure-dataexplorer/create-dataexplorer.sh +++ b/components/azure-dataexplorer/create-dataexplorer.sh @@ -61,7 +61,7 @@ done DATAEXPLORER_CONNECTION="eventhub" echo 'creating Data Explorer Event Hub connection' echo ". name: $DATAEXPLORER_CONNECTION" -az group deployment create \ +az deployment group create \ --resource-group $RESOURCE_GROUP \ --template-file ../components/azure-dataexplorer/eventhub-connection-arm-template.json \ --parameters \ diff --git a/components/azure-event-hubs/get-eventhubs-kafka-brokers.sh b/components/azure-event-hubs/get-eventhubs-kafka-brokers.sh index d0def81c..7a7690e6 100755 --- a/components/azure-event-hubs/get-eventhubs-kafka-brokers.sh +++ b/components/azure-event-hubs/get-eventhubs-kafka-brokers.sh @@ -12,10 +12,12 @@ eh_resource=$(az resource show -g $RESOURCE_GROUP --resource-type Microsoft.Even export KAFKA_BROKERS="$namespace.servicebus.windows.net:9093" export KAFKA_SECURITY_PROTOCOL=SASL_SSL export KAFKA_SASL_MECHANISM=PLAIN +export KAFKA_SASL_USERNAME='$ConnectionString' +export KAFKA_SASL_PASSWORD="$EVENTHUB_CS" # For running outside of Databricks: org.apache.kafka.common.security.plain.PlainLoginModule # For running within Databricks: kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule loginModule="org.apache.kafka.common.security.plain.PlainLoginModule" loginModuleDatabricks="kafkashaded.$loginModule" -export KAFKA_SASL_JAAS_CONFIG="$loginModule required username=\"\$ConnectionString\" password=\"$EVENTHUB_CS\";" -export KAFKA_SASL_JAAS_CONFIG_DATABRICKS="$loginModuleDatabricks required username=\"\$ConnectionString\" password=\"$EVENTHUB_CS\";" +export KAFKA_SASL_JAAS_CONFIG="$loginModule required username=\"$KAFKA_SASL_USERNAME\" password=\"$KAFKA_SASL_PASSWORD\";" +export KAFKA_SASL_JAAS_CONFIG_DATABRICKS="$loginModuleDatabricks required username=\"$KAFKA_SASL_USERNAME\" password=\"$KAFKA_SASL_PASSWORD\";" diff --git a/components/azure-hdinsight/get-hdinsight-kafka-brokers.sh b/components/azure-hdinsight/get-hdinsight-kafka-brokers.sh index 9bb7a3e4..fff6d8c8 100644 --- a/components/azure-hdinsight/get-hdinsight-kafka-brokers.sh +++ b/components/azure-hdinsight/get-hdinsight-kafka-brokers.sh @@ -25,5 +25,7 @@ fi export KAFKA_BROKERS=${kafka_brokers:1} export KAFKA_SECURITY_PROTOCOL=PLAINTEXT export KAFKA_SASL_MECHANISM= +export KAFKA_SASL_USERNAME= +export KAFKA_SASL_PASSWORD= export KAFKA_SASL_JAAS_CONFIG= export KAFKA_SASL_JAAS_CONFIG_DATABRICKS= diff --git a/components/azure-kubernetes-service/get-aks-kafka-brokers.sh b/components/azure-kubernetes-service/get-aks-kafka-brokers.sh index 9c0e86d8..d816cbff 100644 --- a/components/azure-kubernetes-service/get-aks-kafka-brokers.sh +++ b/components/azure-kubernetes-service/get-aks-kafka-brokers.sh @@ -26,6 +26,8 @@ fi export KAFKA_BROKERS="$kafka_ip:9094" export KAFKA_SECURITY_PROTOCOL=PLAINTEXT export KAFKA_SASL_MECHANISM= +export KAFKA_SASL_USERNAME= +export KAFKA_SASL_PASSWORD= export KAFKA_SASL_JAAS_CONFIG= export KAFKA_SASL_JAAS_CONFIG_DATABRICKS= diff --git a/components/azure-monitor/create-log-analytics.sh b/components/azure-monitor/create-log-analytics.sh index d07db5d0..eb116a69 100755 --- a/components/azure-monitor/create-log-analytics.sh +++ b/components/azure-monitor/create-log-analytics.sh @@ -5,7 +5,7 @@ set -euo pipefail if ! az resource show -g $RESOURCE_GROUP -n $LOG_ANALYTICS_WORKSPACE --resource-type Microsoft.OperationalInsights/workspaces -o none 2>/dev/null; then echo 'creating Log Analytics workspace' echo ". name: $LOG_ANALYTICS_WORKSPACE" - az group deployment create \ + az deployment group create \ --resource-group $RESOURCE_GROUP \ --template-file "../components/azure-monitor/template.json" \ --parameters \ diff --git a/components/azure-sql/provision/provision.sh b/components/azure-sql/provision/provision.sh index a1fa199c..438a8bf1 100644 --- a/components/azure-sql/provision/provision.sh +++ b/components/azure-sql/provision/provision.sh @@ -6,7 +6,7 @@ function run_sqlcmd { /opt/mssql-tools/bin/sqlcmd -b -h-1 -I -S "$SQL_SERVER_NAME.database.windows.net" -d "$SQL_DATABASE_NAME" -U serveradmin -P "$SQL_ADMIN_PASS" "$@" } -run_sqlcmd -e -Q "SET NOCOUNT ON; IF OBJECT_ID(N'sqlprovision', N'U') IS NULL CREATE TABLE sqlprovision(script nvarchar(max))" +run_sqlcmd -e -Q "SET NOCOUNT ON; IF OBJECT_ID(N'sqlprovision', N'U') IS NULL CREATE TABLE sqlprovision(script nvarchar(4000))" for provisioning_script in /sqlprovision/*.sql; do echo -n "[$provisioning_script] " script_name=$(basename $provisioning_script) diff --git a/components/azure-storage/create-storage-hfs.sh b/components/azure-storage/create-storage-hfs.sh index 4669e789..48b0ef4c 100755 --- a/components/azure-storage/create-storage-hfs.sh +++ b/components/azure-storage/create-storage-hfs.sh @@ -5,7 +5,7 @@ set -euo pipefail echo 'creating ADLS Gen2 storage account' echo ". name: $AZURE_STORAGE_ACCOUNT_GEN2" -az group deployment create \ +az deployment group create \ --resource-group $RESOURCE_GROUP \ --template-file ../components/azure-storage/storage-hfs-arm-template.json \ --parameters \ diff --git a/eventhubs-databricks-azuresql/README.md b/eventhubs-databricks-azuresql/README.md index 35f6f843..79fde6bb 100644 --- a/eventhubs-databricks-azuresql/README.md +++ b/eventhubs-databricks-azuresql/README.md @@ -101,7 +101,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-databricks-cosmosdb/README.md b/eventhubs-databricks-cosmosdb/README.md index 61005768..33d2f0fd 100644 --- a/eventhubs-databricks-cosmosdb/README.md +++ b/eventhubs-databricks-cosmosdb/README.md @@ -101,7 +101,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-databricks-delta/README.md b/eventhubs-databricks-delta/README.md index 28f952bd..d20b6b6b 100644 --- a/eventhubs-databricks-delta/README.md +++ b/eventhubs-databricks-delta/README.md @@ -100,7 +100,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-dataexplorer/README.md b/eventhubs-dataexplorer/README.md index 00782a3c..b3fbc9c5 100644 --- a/eventhubs-dataexplorer/README.md +++ b/eventhubs-dataexplorer/README.md @@ -95,7 +95,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-functions-azuresql/README.md b/eventhubs-functions-azuresql/README.md index 19cbee28..064ad11f 100644 --- a/eventhubs-functions-azuresql/README.md +++ b/eventhubs-functions-azuresql/README.md @@ -106,7 +106,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-functions-azuresql/create-solution.sh b/eventhubs-functions-azuresql/create-solution.sh index 616990ef..9d04714b 100755 --- a/eventhubs-functions-azuresql/create-solution.sh +++ b/eventhubs-functions-azuresql/create-solution.sh @@ -235,7 +235,7 @@ echo echo "***** [T] Starting up TEST clients" - export SIMULATOR_DUPLICATE_EVERY_N_EVENTS=-1 + export SIMULATOR_DUPLICATE_EVERY_N_EVENTS=0 RUN=`echo $STEPS | grep T -o || true` if [ ! -z "$RUN" ]; then diff --git a/eventhubs-functions-cosmosdb/README.md b/eventhubs-functions-cosmosdb/README.md index 6eb5a98e..9e7e23db 100644 --- a/eventhubs-functions-cosmosdb/README.md +++ b/eventhubs-functions-cosmosdb/README.md @@ -108,7 +108,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-streamanalytics-azuresql/README.md b/eventhubs-streamanalytics-azuresql/README.md index a7cd42df..5a7f6881 100644 --- a/eventhubs-streamanalytics-azuresql/README.md +++ b/eventhubs-streamanalytics-azuresql/README.md @@ -114,7 +114,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-streamanalytics-azuresql/create-solution.sh b/eventhubs-streamanalytics-azuresql/create-solution.sh index b59ae2ab..072f0dbb 100755 --- a/eventhubs-streamanalytics-azuresql/create-solution.sh +++ b/eventhubs-streamanalytics-azuresql/create-solution.sh @@ -204,7 +204,7 @@ echo echo "***** [T] Starting up TEST clients" - export SIMULATOR_DUPLICATE_EVERY_N_EVENTS=-1 + export SIMULATOR_DUPLICATE_EVERY_N_EVENTS=0 RUN=`echo $STEPS | grep T -o || true` if [ ! -z "$RUN" ]; then diff --git a/eventhubs-streamanalytics-azuresql/create-stream-analytics.sh b/eventhubs-streamanalytics-azuresql/create-stream-analytics.sh index 9274bec7..b27ab779 100755 --- a/eventhubs-streamanalytics-azuresql/create-stream-analytics.sh +++ b/eventhubs-streamanalytics-azuresql/create-stream-analytics.sh @@ -7,7 +7,7 @@ EVENTHUB_KEY=`az eventhubs namespace authorization-rule keys list -g $RESOURCE_G echo 'creating stream analytics job' echo ". name: $PROC_JOB_NAME" -az group deployment create \ +az deployment group create \ --name $PROC_JOB_NAME \ --resource-group $RESOURCE_GROUP \ --template-file "stream-analytics-job-arm-template.json" \ diff --git a/eventhubs-streamanalytics-azuresql/time-series-query-sample.sql b/eventhubs-streamanalytics-azuresql/time-series-query-sample.sql index 03442d45..040036d9 100644 --- a/eventhubs-streamanalytics-azuresql/time-series-query-sample.sql +++ b/eventhubs-streamanalytics-azuresql/time-series-query-sample.sql @@ -28,7 +28,7 @@ select top (100) from dbo.[rawdata2] r where - [r].[DeviceId] = 'contoso://device-id-154' + [r].[DeviceId] = 'contoso-device-id-000154' and r.[PartitionId] = 5 order by diff --git a/eventhubs-streamanalytics-cosmosdb/README.md b/eventhubs-streamanalytics-cosmosdb/README.md index 63e2ea90..b8c5b506 100644 --- a/eventhubs-streamanalytics-cosmosdb/README.md +++ b/eventhubs-streamanalytics-cosmosdb/README.md @@ -96,7 +96,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-streamanalytics-cosmosdb/create-stream-analytics.sh b/eventhubs-streamanalytics-cosmosdb/create-stream-analytics.sh index 943348bb..7d4303f4 100755 --- a/eventhubs-streamanalytics-cosmosdb/create-stream-analytics.sh +++ b/eventhubs-streamanalytics-cosmosdb/create-stream-analytics.sh @@ -10,7 +10,7 @@ COSMOSDB_MASTER_KEY=`az cosmosdb keys list -g $RESOURCE_GROUP -n $COSMOSDB_SERVE echo 'creating stream analytics job' echo ". name: $PROC_JOB_NAME" -az group deployment create \ +az deployment group create \ --name $PROC_JOB_NAME \ --resource-group $RESOURCE_GROUP \ --template-file "stream-analytics-job-arm-template.json" \ diff --git a/eventhubs-streamanalytics-eventhubs/README.md b/eventhubs-streamanalytics-eventhubs/README.md index 70ab5e80..ba03cf0a 100644 --- a/eventhubs-streamanalytics-eventhubs/README.md +++ b/eventhubs-streamanalytics-eventhubs/README.md @@ -96,7 +96,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubs-streamanalytics-eventhubs/create-stream-analytics.sh b/eventhubs-streamanalytics-eventhubs/create-stream-analytics.sh index 9358807a..d3a86ca2 100755 --- a/eventhubs-streamanalytics-eventhubs/create-stream-analytics.sh +++ b/eventhubs-streamanalytics-eventhubs/create-stream-analytics.sh @@ -8,7 +8,7 @@ EVENTHUB_KEY_OUT=`az eventhubs namespace authorization-rule keys list -g $RESOUR echo 'creating stream analytics job' echo ". name: $PROC_JOB_NAME" -az group deployment create \ +az deployment group create \ --name $PROC_JOB_NAME \ --resource-group $RESOURCE_GROUP \ --template-file stream-analytics-job-$STREAM_ANALYTICS_JOBTYPE-arm-template.json \ diff --git a/eventhubs-timeseriesinsights/README.md b/eventhubs-timeseriesinsights/README.md index cf93d7cc..76b426cd 100644 --- a/eventhubs-timeseriesinsights/README.md +++ b/eventhubs-timeseriesinsights/README.md @@ -98,7 +98,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubskafka-databricks-cosmosdb/README.md b/eventhubskafka-databricks-cosmosdb/README.md index be5f3e5d..c8f9baa8 100644 --- a/eventhubskafka-databricks-cosmosdb/README.md +++ b/eventhubskafka-databricks-cosmosdb/README.md @@ -101,7 +101,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubskafka-flink-eventhubskafka/README.md b/eventhubskafka-flink-eventhubskafka/README.md index 2409958c..2d706add 100644 --- a/eventhubskafka-flink-eventhubskafka/README.md +++ b/eventhubskafka-flink-eventhubskafka/README.md @@ -104,7 +104,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/eventhubskafka-functions-cosmosdb/README.md b/eventhubskafka-functions-cosmosdb/README.md index 92208440..2d55e7e3 100644 --- a/eventhubskafka-functions-cosmosdb/README.md +++ b/eventhubskafka-functions-cosmosdb/README.md @@ -108,7 +108,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/hdinsightkafka-databricks-sqldw/README.md b/hdinsightkafka-databricks-sqldw/README.md index 923a6264..bad540d3 100644 --- a/hdinsightkafka-databricks-sqldw/README.md +++ b/hdinsightkafka-databricks-sqldw/README.md @@ -102,7 +102,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/hdinsightkafka-flink-hdinsightkafka/README.md b/hdinsightkafka-flink-hdinsightkafka/README.md index 03235a72..99e8e717 100644 --- a/hdinsightkafka-flink-hdinsightkafka/README.md +++ b/hdinsightkafka-flink-hdinsightkafka/README.md @@ -101,7 +101,7 @@ Streamed data simulates an IoT device sending the following JSON data: [...] }, "value": 49.02278128887753, - "deviceId": "contoso://device-id-154", + "deviceId": "contoso-device-id-000154", "deviceSequenceNumber": 0, "type": "CO2", "createdAt": "2019-05-16T17:16:40.000003Z" diff --git a/simulator/README.md b/simulator/README.md deleted file mode 100644 index 96f95bf4..00000000 --- a/simulator/README.md +++ /dev/null @@ -1,87 +0,0 @@ -# Simulator: Standalone Setup with Event Hub with Kafka Head + JSON producer - -The simulator is written using PySpark library that uses pre-compiled Apache Spark JARS deployed into Python virtual environments. - -The simulator can only be used with `create-solution.sh` bash integration script under every folder in this repo. - -To run the simulator in a development environment, perform the following steps. - -## Prerequisites - -1. [Apache Maven 3.6.2](https://maven.apache.org/) -2. [Python 3.6 and above](https://www.python.org/downloads/) -3. Event Hub and Event Hub Namespace in Azure -4. Azure Container Registry -5. If using anaconda, install conda and follow steps [here](https://docs.conda.io/projects/conda/en/latest/commands/install.html) - -## Steps to install development setup - -1. Create a python virtual environment or use conda environment - -`python3 -m venv env` - -2. Activate the environment - -`source env/bin/activate` - -3. Pip install libraries from simulator/generator/requirements.txt - -`pip install -r requirements.txt` - -4. Navigate to the simulator/generator folder and install maven artifacts - -`mvn -f ./dependencies dependency:copy-dependencies` - -This will generate the following jar files in the target/dependency folder - -``` -azure-eventhubs-2.3.2.jar proton-j-0.31.0.jar slf4j-api-1.7.25.jar unused-1.0.0.jar -azure-eventhubs-spark_2.11-2.3.13.jar qpid-proton-j-extensions-1.2.0.jar snappy-java-1.1.7.1.jar -kafka-clients-2.0.0.jar scala-java8-compat_2.11-0.9.0.jar spark-sql-kafka-0-10_2.11-2.4.3.jar -lz4-java-1.4.1.jar scala-library-2.11.12.jar spark-tags_2.11-2.4.3.jar -``` - -5. Copy these jar files to env/lib/python3.6/site-packages/pyspark/jars folder - -If using conda, navigate to the conda environment. - -6. Set the following environment variables using a bash script - -``` -export EVENTHUBNS="Event Hub Namespace" -export RESOURCE_GROUP="Resource Group in Azure where Event Hub is deployed" -export EVENTHUB_CS="Event hub connection string" -export CONTAINER_REGISTRY="Azure Container Registry" -export KAFKA_TOPIC="streaming" - -export KAFKA_BROKERS= -$EVENTHUBNS.servicebus.windows.net:9093 -export KAFKA_SECURITY_PROTOCOL=SASL_SSL -export KAFKA_SASL_MECHANISM=PLAIN -export SIMULATOR_INSTANCES=1 -export loginModule="org.apache.kafka.common.security.plain.PlainLoginModule" -export KAFKA_SASL_JAAS_CONFIG="$loginModule required username=\"\$ConnectionString\" password=\"$EVENTHUB_CS\";" -export OUTPUT_OPTIONS=$(cat <> log.txt -REGISTRY_LOGIN_SERVER=$(az acr show -n $CONTAINER_REGISTRY --query loginServer -o tsv) -REGISTRY_LOGIN_PASS=$(az acr credential show -n $CONTAINER_REGISTRY --query passwords[0].value -o tsv) - -if ! az acr repository show --name $CONTAINER_REGISTRY --image generator:latest -o none 2>/dev/null; then - echo "building generator container..." - az acr build --registry $CONTAINER_REGISTRY --image generator:latest ../simulator/generator \ - -o tsv >> log.txt -fi - -if [ -n "${VNET_NAME:-}" ]; then - vnet_options="--vnet $VNET_NAME --subnet producers-subnet" -else - vnet_options="" -fi - -echo "creating generator container instances..." -echo ". number of instances: $SIMULATOR_INSTANCES" -echo ". events/second per instance: $EVENTS_PER_SECOND" -for i in $(seq 1 $SIMULATOR_INSTANCES); do - name="data-generator-$i" - az container delete -g $RESOURCE_GROUP -n "$name" --yes \ - -o tsv >> log.txt 2>/dev/null - az container create -g $RESOURCE_GROUP -n "$name" \ - --image $REGISTRY_LOGIN_SERVER/generator:latest \ - $vnet_options \ - --registry-login-server $REGISTRY_LOGIN_SERVER \ - --registry-username $CONTAINER_REGISTRY --registry-password "$REGISTRY_LOGIN_PASS" \ - -e \ - EXECUTORS=2 \ - OUTPUT_FORMAT="$OUTPUT_FORMAT" \ - OUTPUT_OPTIONS="$OUTPUT_OPTIONS" \ - EVENTS_PER_SECOND="$EVENTS_PER_SECOND" \ - DUPLICATE_EVERY_N_EVENTS="${SIMULATOR_DUPLICATE_EVERY_N_EVENTS:-1000}" \ - COMPLEX_DATA_COUNT=${SIMULATOR_COMPLEX_DATA_COUNT:-} \ - --secure-environment-variables SECURE_OUTPUT_OPTIONS="$SECURE_OUTPUT_OPTIONS" \ - --cpu 4 --memory 4 \ - --no-wait \ - -o tsv >> log.txt -done diff --git a/simulator/generator/Dockerfile b/simulator/generator/Dockerfile deleted file mode 100644 index bfae7a7c..00000000 --- a/simulator/generator/Dockerfile +++ /dev/null @@ -1,41 +0,0 @@ -FROM mcr.microsoft.com/java/maven:8u192-zulu-debian9 AS maven - -ADD dependencies /dep - -RUN mvn -f /dep dependency:copy-dependencies - -FROM mcr.microsoft.com/java/jre:8u192-zulu-alpine AS build - -# Install Python -RUN apk add --no-cache python3-dev - -RUN python3 -m venv /venv - -# install requirements separately to prevent pip from downloading and -# installing pypi dependencies every time a file in your project changes -ADD ./requirements.txt /project/ -RUN /venv/bin/pip install -r /project/requirements.txt - -# install the project, basically copying its code, into the virtualenv. -# this assumes the project has a functional setup.py -ADD . /project -RUN /venv/bin/pip install /project - -# this won't have any effect on our production image, is only meant for -# if we want to run commands like pytest in the build image -WORKDIR /project - -# the second, production stage can be much more lightweight: -FROM mcr.microsoft.com/java/jre:8u192-zulu-alpine AS production - -# Install Python -RUN apk add --no-cache python3 bash - -COPY --from=build /venv /venv - -COPY --from=maven /dep/target/dependency /venv/lib/python3.6/site-packages/pyspark/jars - -ENV PYSPARK_PYTHON /venv/bin/python3 - -# remember to run python from the virtualenv -CMD ["/venv/bin/python3", "-m", "generator"] diff --git a/simulator/generator/dependencies/pom.xml b/simulator/generator/dependencies/pom.xml deleted file mode 100644 index 80f5c605..00000000 --- a/simulator/generator/dependencies/pom.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - 4.0.0 - com.microsoft.azure-samples.streaming-at-scale - spark-dependency-gatherer - 0.0.1 - - - com.microsoft.azure - azure-eventhubs-spark_2.11 - 2.3.13 - - - org.apache.spark - spark-sql-kafka-0-10_2.11 - 2.4.3 - - - diff --git a/simulator/generator/generator/__init__.py b/simulator/generator/generator/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/simulator/generator/generator/__main__.py b/simulator/generator/generator/__main__.py deleted file mode 100644 index 3025cbf0..00000000 --- a/simulator/generator/generator/__main__.py +++ /dev/null @@ -1,93 +0,0 @@ -import os -import time -import datetime -import uuid -import json - -from pyspark.sql import SparkSession -import pyspark.sql.functions as F -from pyspark.sql.types import StringType - -executors = int(os.environ.get('EXECUTORS') or 1) -rowsPerSecond = int(os.environ.get('EVENTS_PER_SECOND') or 1000) -numberOfDevices = int(os.environ.get('NUMBER_OF_DEVICES') or 1000) -complexDataCount = int(os.environ.get("COMPLEX_DATA_COUNT") or 23) -duplicateEveryNEvents = int(os.environ.get("DUPLICATE_EVERY_N_EVENTS") or 0) - -outputFormat = os.environ.get('OUTPUT_FORMAT') or "kafka" -outputOptions = json.loads(os.environ.get('OUTPUT_OPTIONS') or "{}") -secureOutputOptions = json.loads(os.environ.get('SECURE_OUTPUT_OPTIONS') or "{}") - -generate_uuid = F.udf(lambda : str(uuid.uuid4()), StringType()) - -spark = (SparkSession - .builder - .master("local[%d]" % executors) - .appName("DataGenerator") - .getOrCreate() - ) - -stream = (spark - .readStream - .format("rate") - .option("rowsPerSecond", rowsPerSecond) - .load() - ) -# Rate stream has columns "timestamp" and "value" - -stream = (stream - .withColumn("deviceId", F.concat(F.lit("contoso://device-id-"), F.expr("mod(value, %d)" % numberOfDevices))) - .withColumn("deviceSequenceNumber", F.expr("value div %d" % numberOfDevices)) - .withColumn("type", F.expr("CASE WHEN rand()<0.5 THEN 'TEMP' ELSE 'CO2' END")) - .withColumn("partitionKey", F.col("deviceId")) - .withColumn("eventId", generate_uuid()) - # current_timestamp is later than rate stream timestamp, therefore more accurate to measure end-to-end latency - .withColumn("createdAt", F.current_timestamp()) - .withColumn("value", F.rand() * 90 + 10) - ) - -for i in range(complexDataCount): - stream = stream.withColumn("moreData{}".format(i), F.rand() * 90 + 10) - -stream = stream.withColumn("complexData", F.struct([F.col("moreData{}".format(i)) for i in range(complexDataCount)])) - -if duplicateEveryNEvents > 0: - stream = stream.withColumn("repeated", F.expr("CASE WHEN rand() < {} THEN array(1,2) ELSE array(1) END".format(1/duplicateEveryNEvents))) - stream = stream.withColumn("repeated", F.explode("repeated")) - -if outputFormat == "eventhubs": - bodyColumn = "body" -else: #Kafka format - bodyColumn = "value" - -query = (stream - .selectExpr("to_json(struct(eventId, type, deviceId, deviceSequenceNumber, createdAt, value, complexData)) AS %s" % bodyColumn, "partitionKey") - .writeStream - .partitionBy("partitionKey") - .format(outputFormat) - .options(**outputOptions) - .options(**secureOutputOptions) - .option("checkpointLocation", "/tmp/checkpoint") - .start() - ) - -lastTimestamp = "" -nextPrintedTimestamp = time.monotonic() -lastPrintedTimestamp = 0 -lastPrintedTimestampRows = 0 -totalRows = 0 -while (query.isActive): - now = time.monotonic() - for rp in query.recentProgress: - if rp['timestamp'] > lastTimestamp: - lastTimestamp = rp['timestamp'] - totalRows += rp['numInputRows'] - rps = (totalRows - lastPrintedTimestampRows) / (now - lastPrintedTimestamp) - lastPrintedTimestamp = now - nextPrintedTimestamp += 60 - if lastPrintedTimestamp > 0: - print("%s %10.1f events/s" % (datetime.datetime.now().isoformat(), rps)) - lastPrintedTimestampRows = totalRows - time.sleep(nextPrintedTimestamp - now) - -print(query.exception()) diff --git a/simulator/generator/requirements.txt b/simulator/generator/requirements.txt deleted file mode 100644 index 5814598b..00000000 --- a/simulator/generator/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pyspark==2.4.3 diff --git a/simulator/generator/setup.py b/simulator/generator/setup.py deleted file mode 100755 index 2fd02c2d..00000000 --- a/simulator/generator/setup.py +++ /dev/null @@ -1,6 +0,0 @@ -import setuptools - -setuptools.setup( - name='generator', - packages=setuptools.find_packages(), -) diff --git a/simulator/run-generator-eventhubs.sh b/simulator/run-generator-eventhubs.sh index 449bce45..800cedd9 100644 --- a/simulator/run-generator-eventhubs.sh +++ b/simulator/run-generator-eventhubs.sh @@ -19,8 +19,7 @@ fi echo 'getting Event Hub connection string' source ../components/azure-event-hubs/get-eventhubs-connection-string.sh "$EVENTHUB_NAMESPACE" "Send" -OUTPUT_FORMAT="eventhubs" -OUTPUT_OPTIONS="{}" -SECURE_OUTPUT_OPTIONS="{\"eventhubs.connectionstring\": \"$EVENTHUB_CS;EntityPath=$EVENTHUB_NAME\"}" +SIMULATOR_VARIABLES="" +SIMULATOR_CONNECTION_SETTING="EventHubConnectionString=$EVENTHUB_CS;EntityPath=$EVENTHUB_NAME" -source ../simulator/create-generator-instances.sh +source ../simulator/run-simulator.sh diff --git a/simulator/run-generator-kafka.sh b/simulator/run-generator-kafka.sh index 8d3288aa..d741b463 100644 --- a/simulator/run-generator-kafka.sh +++ b/simulator/run-generator-kafka.sh @@ -4,16 +4,19 @@ set -euo pipefail OUTPUT_FORMAT="kafka" -OUTPUT_OPTIONS=$(cat <> log.txt 2>/dev/null + az container create -g $RESOURCE_GROUP -n "$name" \ + --image $image_name \ + $vnet_options \ + -e \ + DeviceCount=$number_of_devices \ + DeviceIndex=0 \ + Template='{ "eventId": "$.Guid", "value": 0.$.value, "deviceId": "$.DeviceId", "deviceSequenceNumber": $.Counter, "type": "$.type", "createdAt": "$.Time", "complexData": { '"${complex_data_template:2}"' } }' \ + Interval="$interval" \ + DevicePrefix=contoso-device-id- \ + MessageCount=0 \ + Variables='[ {"name": "value", "random": true}, {"name": "Counter", "min": 0}, {"name": "type", "values": ["TEMP", "CO2"]}'"$complex_data_variables"' ]' \ + DuplicateEvery="${SIMULATOR_DUPLICATE_EVERY_N_EVENTS:-1000}" \ + PartitionKey="$.DeviceId" \ + $SIMULATOR_VARIABLES \ + --secure-environment-variables \ + "$SIMULATOR_CONNECTION_SETTING" \ + --cpu 4 --memory 4 \ + --no-wait \ + -o tsv >> log.txt +done diff --git a/streaming/databricks/job/run-databricks-job.sh b/streaming/databricks/job/run-databricks-job.sh index f5646436..cf44f180 100755 --- a/streaming/databricks/job/run-databricks-job.sh +++ b/streaming/databricks/job/run-databricks-job.sh @@ -39,7 +39,7 @@ wait_for_run () { cluster_jq_command="$(cat <