Skip to content

Commit

Permalink
Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
algattik committed Oct 13, 2019
1 parent 6a20333 commit e2d3924
Show file tree
Hide file tree
Showing 26 changed files with 54 additions and 15 deletions.
2 changes: 1 addition & 1 deletion components/azure-dataexplorer/create-dataexplorer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ echo 'creating Data Explorer table'
kustoQuery "/v1/rest/mgmt" ".create table EventTable ( eventId: string, complexData: dynamic, value: string, type: string, deviceId: string, deviceSequenceNumber: long, createdAt: datetime)"
echo 'creating Data Explorer table mapping'
if ! kustoQuery "/v1/rest/mgmt" ".show table EventTable ingestion json mapping \\\"EventMapping\\\"" 2>/dev/null; then
kustoQuery "/v1/rest/mgmt" ".create table EventTable ingestion json mapping 'EventMapping' '[ { \\\"column\\\": \\\"eventId\\\", \\\"path\\\": \\\"$.eventId\\\" }, { \\\"column\\\": \\\"complexData\\\", \\\"path\\\": \\\"$.complexData\\\" }, { \\\"column\\\": \\\"value\\\", \\\"path\\\": \\\"$.value\\\" }, { \\\"column\\\": \\\"type\\\", \\\"path\\\": \\\"$.type\\\" }, { \\\"column\\\": \\\"deviceId\\\", \\\"path\\\": \\\"$.deviceId\\\" }, { \\\"column\\\": \\\deviceSequenceNumber\\\", \\\"path\\\": \\\"$deviceSequenceNumber\\\" }, { \\\"column\\\": \\\"createdAt\\\", \\\"path\\\": \\\"$.createdAt\\\" } ]'"
kustoQuery "/v1/rest/mgmt" ".create table EventTable ingestion json mapping 'EventMapping' '[ { \\\"column\\\": \\\"eventId\\\", \\\"path\\\": \\\"$.eventId\\\" }, { \\\"column\\\": \\\"complexData\\\", \\\"path\\\": \\\"$.complexData\\\" }, { \\\"column\\\": \\\"value\\\", \\\"path\\\": \\\"$.value\\\" }, { \\\"column\\\": \\\"type\\\", \\\"path\\\": \\\"$.type\\\" }, { \\\"column\\\": \\\"deviceId\\\", \\\"path\\\": \\\"$.deviceId\\\" }, { \\\"column\\\": \\\"deviceSequenceNumber\\\", \\\"path\\\": \\\"$.deviceSequenceNumber\\\" }, { \\\"column\\\": \\\"createdAt\\\", \\\"path\\\": \\\"$.createdAt\\\" } ]'"
fi

echo "getting Service Principal ID"
Expand Down
1 change: 1 addition & 0 deletions eventhubs-databricks-azuresql/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "eda1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-databricks-cosmosdb/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "edc1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-databricks-delta/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "1",
"short": "edd1",
"steps": "CIPTMV",
Expand Down
11 changes: 11 additions & 0 deletions eventhubs-dataexplorer/test_spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[
{
"enabled": false,
"stage": "3",
"short": "ed1",
"steps": "CIDTMV",
"minutes": "45",
"throughput": "1",
"extra_args": []
}
]
1 change: 1 addition & 0 deletions eventhubs-functions-azuresql/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "efa1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-functions-cosmosdb/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "efc1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-streamanalytics-azuresql/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "esa1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-streamanalytics-cosmosdb/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "esc1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-streamanalytics-eventhubs/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "ese1",
"steps": "CIPTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubs-timeseriesinsights/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "2",
"short": "eti1",
"steps": "CIDTMV",
Expand Down
1 change: 1 addition & 0 deletions eventhubskafka-databricks-cosmosdb/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": false,
"stage": "2",
"short": "kdc1",
"steps": "CIDPTMV",
Expand Down
1 change: 1 addition & 0 deletions hdinsightkafka-databricks-sqldw/test_spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"enabled": true,
"stage": "3",
"short": "hdw1",
"steps": "CIDPTMV",
Expand Down
10 changes: 8 additions & 2 deletions integration-tests/test_solutions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def pytest_generate_tests(metafunc):
test_id = "{} {} ({})".format(
spec["short"], spec["folder"], " ".join(spec["extra_args"]))
test_ids.append(test_id)
argnames = ["folder", "short", "steps",
argnames = ["enabled", "folder", "short", "steps",
"minutes", "throughput", "extra_args"]
metafunc.parametrize(
argnames,
Expand All @@ -38,7 +38,9 @@ class TestSolutions():
# Flaky is used to rerun tests that may fail because of transient cloud issues.
#@flaky(max_runs=3)
def test_solution(self, folder, steps, minutes, throughput, extra_args):

print(self, folder, steps, minutes, throughput, extra_args)

cmd = ["./create-solution.sh",
"-d", self.rg,
"-s", steps,
Expand All @@ -52,7 +54,11 @@ def test_solution(self, folder, steps, minutes, throughput, extra_args):
assert test_output == ""

@pytest.fixture(autouse=True)
def run_around_tests(self, short):
def run_around_tests(self, short, enabled):

if not enabled:
pytest.skip("Disabled in test_spec.json")

self.rg = os.environ['RESOURCE_GROUP_PREFIX'] + short
# Delete solution resource group if already exists
subprocess.run(["./check-resource-group.sh", self.rg], check=True)
Expand Down
2 changes: 1 addition & 1 deletion streaming/databricks/notebooks/eventhubs-to-azuresql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var writeDataBatch : java.sql.PreparedStatement = null

val WriteToSQLQuery = dataToWrite
.writeStream
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/streaming-azuresql")
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/eventhubs-to-azuresql")
.foreachBatch((batchDF: DataFrame, batchId: Long) => retry(6, 0) {

// Load data into staging table.
Expand Down
3 changes: 2 additions & 1 deletion streaming/databricks/notebooks/eventhubs-to-cosmosdb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ val streamData = eventhubs
// for the description of the available configurations.
val cosmosDbConfig = Map(
"Endpoint" -> dbutils.widgets.get("cosmosdb-endpoint"),
"ConnectionMode" -> "DirectHttps",
"Masterkey" -> dbutils.secrets.get(scope = "MAIN", key = "cosmosdb-write-master-key"),
"Database" -> dbutils.widgets.get("cosmosdb-database"),
"Collection" -> dbutils.widgets.get("cosmosdb-collection")
Expand All @@ -67,7 +68,7 @@ import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
streamDataMutated
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/streaming-cosmosdb")
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/eventhubs-to-cosmosdb")
.outputMode("append")
.options(cosmosDbConfig)
.start()
2 changes: 1 addition & 1 deletion streaming/databricks/notebooks/eventhubs-to-delta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ streamData
.withColumn("storedAt", current_timestamp)
.writeStream
.outputMode("append")
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/streaming-delta/" + dbutils.widgets.get("delta-table"))
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/eventhubs-to-delta/" + dbutils.widgets.get("delta-table"))
.format("delta")
.option("path", s"abfss://streamingatscale@$gen2account.dfs.core.windows.net/" + dbutils.widgets.get("delta-table"))
.table(dbutils.widgets.get("delta-table"))
2 changes: 1 addition & 1 deletion streaming/databricks/notebooks/kafka-to-cosmosdb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
streamDataMutated
.writeStream
.format(classOf[CosmosDBSinkProvider].getName)
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/streaming-cosmosdb")
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/kafka-to-cosmosdb")
.outputMode("append")
.options(cosmosDbConfig)
.start()
2 changes: 1 addition & 1 deletion streaming/databricks/notebooks/kafka-to-sqldw.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ dataToWrite.writeStream
.option("forwardSparkAzureStorageCredentials", "true")
.option("maxStrLength", "4000")
.option("dbTable", dbutils.widgets.get("sqldw-table"))
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/streaming-sqldw")
.option("checkpointLocation", "dbfs:/streaming_at_scale/checkpoints/kafka-to-sqldw")
.start()
2 changes: 1 addition & 1 deletion streaming/databricks/notebooks/verify-common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ if (assertLatencyMilliseconds.nonEmpty) {
val expected = assertLatencyMilliseconds.get
val actual = stats.minLatencySeconds
if (actual.isEmpty || ((actual.get * 1000) > expected)) {
assertionsFailed += s"max latency in milliseconds: expected max $expected, got $actual"
assertionsFailed += s"max latency in milliseconds: expected max $expected milliseconds, got $actual seconds"
}
}

Expand Down
4 changes: 3 additions & 1 deletion streaming/databricks/notebooks/verify-dataexplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dbutils.widgets.text("dataexplorer-storage-container", "dataexplorer")
dbutils.widgets.text("assert-events-per-second", "900", "Assert min events per second (computed over 1 min windows)")
dbutils.widgets.text("assert-latency-milliseconds", "15000", "Assert max latency in milliseconds (averaged over 1 min windows)")
dbutils.widgets.text("assert-duplicate-fraction", "0", "Assert max proportion of duplicate events")
dbutils.widgets.text("assert-outofsequence-fraction", "0", "Assert max proportion of out-of-sequence events")

// COMMAND ----------

Expand Down Expand Up @@ -49,5 +50,6 @@ dbutils.notebook.run("verify-common", 0, Map(
"input-table" -> (spark.conf.get("spark.sql.globalTempDatabase") + "." + tempTable),
"assert-events-per-second" -> dbutils.widgets.get("assert-events-per-second"),
"assert-latency-milliseconds" -> dbutils.widgets.get("assert-latency-milliseconds"),
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction")
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction"),
"assert-outofsequence-fraction" -> dbutils.widgets.get("assert-outofsequence-fraction")
))
4 changes: 3 additions & 1 deletion streaming/databricks/notebooks/verify-delta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dbutils.widgets.text("delta-table", "streaming_events", "Delta table containing
dbutils.widgets.text("assert-events-per-second", "900", "Assert min events per second (computed over 1 min windows)")
dbutils.widgets.text("assert-latency-milliseconds", "15000", "Assert max latency in milliseconds (averaged over 1 min windows)")
dbutils.widgets.text("assert-duplicate-fraction", "0", "Assert max proportion of duplicate events")
dbutils.widgets.text("assert-outofsequence-fraction", "0", "Assert max proportion of out-of-sequence events")

// COMMAND ----------

Expand All @@ -12,5 +13,6 @@ dbutils.notebook.run("verify-common", 0, Map(
"input-table" -> dbutils.widgets.get("delta-table"),
"assert-events-per-second" -> dbutils.widgets.get("assert-events-per-second"),
"assert-latency-milliseconds" -> dbutils.widgets.get("assert-latency-milliseconds"),
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction")
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction"),
"assert-outofsequence-fraction" -> dbutils.widgets.get("assert-outofsequence-fraction")
))
4 changes: 3 additions & 1 deletion streaming/databricks/notebooks/verify-eventhubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dbutils.widgets.text("eventhub-maxEventsPerTrigger", "1000000", "Event Hubs max
dbutils.widgets.text("assert-events-per-second", "900", "Assert min events per second (computed over 1 min windows)")
dbutils.widgets.text("assert-latency-milliseconds", "15000", "Assert max latency in milliseconds (averaged over 1 min windows)")
dbutils.widgets.text("assert-duplicate-fraction", "0", "Assert max proportion of duplicate events")
dbutils.widgets.text("assert-outofsequence-fraction", "0", "Assert max proportion of out-of-sequence events")

// COMMAND ----------

Expand Down Expand Up @@ -81,7 +82,8 @@ dbutils.notebook.run("verify-common", 0, Map(
"input-table" -> stagingTable,
"assert-events-per-second" -> dbutils.widgets.get("assert-events-per-second"),
"assert-latency-milliseconds" -> dbutils.widgets.get("assert-latency-milliseconds"),
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction")
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction"),
"assert-outofsequence-fraction" -> dbutils.widgets.get("assert-outofsequence-fraction")
))

// COMMAND ----------
Expand Down
4 changes: 3 additions & 1 deletion streaming/databricks/notebooks/verify-sqldw.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dbutils.widgets.text("sqldw-table", "rawdata_cs")
dbutils.widgets.text("assert-events-per-second", "900", "Assert min events per second (computed over 1 min windows)")
dbutils.widgets.text("assert-latency-milliseconds", "15000", "Assert max latency in milliseconds (averaged over 1 min windows)")
dbutils.widgets.text("assert-duplicate-fraction", "0", "Assert max proportion of duplicate events")
dbutils.widgets.text("assert-outofsequence-fraction", "0", "Assert max proportion of out-of-sequence events")

// COMMAND ----------
val tempStorageAccount = dbutils.widgets.get("sqldw-tempstorage-account")
Expand Down Expand Up @@ -45,5 +46,6 @@ dbutils.notebook.run("verify-common", 0, Map(
"input-table" -> (spark.conf.get("spark.sql.globalTempDatabase") + "." + tempTable),
"assert-events-per-second" -> dbutils.widgets.get("assert-events-per-second"),
"assert-latency-milliseconds" -> dbutils.widgets.get("assert-latency-milliseconds"),
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction")
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction"),
"assert-outofsequence-fraction" -> dbutils.widgets.get("assert-outofsequence-fraction")
))
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dbutils.widgets.text("test-output-path", "dbfs:/test-output/test-output.txt", "D
dbutils.widgets.text("storage-path", "", "WASB URL to data storage container")
dbutils.widgets.text("assert-events-per-second", "900", "Assert min events per second (computed over 1 min windows)")
dbutils.widgets.text("assert-duplicate-fraction", "0", "Assert max proportion of duplicate events")
dbutils.widgets.text("assert-outofsequence-fraction", "0", "Assert max proportion of out-of-sequence events")

// COMMAND ----------

Expand Down Expand Up @@ -37,7 +38,8 @@ dbutils.notebook.run("verify-common", 0, Map(
"input-table" -> (spark.conf.get("spark.sql.globalTempDatabase") + "." + tempView),
"assert-events-per-second" -> dbutils.widgets.get("assert-events-per-second"),
"assert-latency-milliseconds" -> "0", // As we use event timestamp as stored timestamp, measured latency should be 0
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction")
"assert-duplicate-fraction" -> dbutils.widgets.get("assert-duplicate-fraction"),
"assert-outofsequence-fraction" -> dbutils.widgets.get("assert-outofsequence-fraction")
))

// COMMAND ----------
Expand Down
2 changes: 1 addition & 1 deletion streaming/databricks/runners/verify-dataexplorer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ databricks secrets put --scope "MAIN" --key "dataexplorer-client-password" --str
databricks secrets put --scope "MAIN" --key "dataexplorer-storage-key" --string-value "$AZURE_STORAGE_KEY"

source ../streaming/databricks/job/run-databricks-job.sh verify-dataexplorer true "$(cat <<JQ
.libraries += [ { "maven": { "coordinates": "com.microsoft.azure.kusto:spark-kusto-connector:1.0.0-BETA-06", "exclusions": ["javax.mail:mail"] } } ]
.libraries += [ { "maven": { "coordinates": "com.microsoft.azure.kusto:spark-kusto-connector:1.0.0-BETA-04", "exclusions": ["javax.mail:mail"] } } ]
| .notebook_task.base_parameters."test-output-path" = "$DATABRICKS_TESTOUTPUTPATH"
| .notebook_task.base_parameters."dataexplorer-cluster" = "$kustoURL"
| .notebook_task.base_parameters."dataexplorer-database" = "$DATAEXPLORER_DATABASE"
Expand Down

0 comments on commit e2d3924

Please sign in to comment.