Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Scala API for flinkspector. #7

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,21 @@
import org.flinkspector.core.trigger.VerifyFinishedTrigger;
import scala.concurrent.duration.FiniteDuration;


import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mortbay.util.IO.bufferSize;

/**
* This class is responsible for orchestrating tests run with Flinkspector
*/
public abstract class Runner {


/**
* {@link LocalFlinkMiniCluster} used for running the test.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void testMap() throws Throwable {
* You assign String identifiers to your Tuple,
* and add hamcrest matchers testing the values.
*/

OutputMatcher<Tuple2<String, Integer>> matcher =
//name the values in your tuple with keys:
new MatchTuples<Tuple2<String, Integer>>("name", "value")
Expand All @@ -85,6 +86,7 @@ public void testMap() throws Throwable {
//define how many records need to fulfill the
.onEachRecord();


/*
* Use assertDataSet to map DataSet to an OutputMatcher.
* ExpectedRecords extends OutputMatcher and thus can be used in this way.
Expand Down
221 changes: 221 additions & 0 deletions flinkspector-datastream-scala/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2015 Otto (GmbH & Co KG)
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<!--
~ Copyright 2015 Otto (GmbH & Co KG)
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.flinkspector</groupId>
<artifactId>flinkspector-parent_2.11</artifactId>
<version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flinkspector-datastream-scala_2.11</artifactId>
<name>flinkspector-datastream-scala</name>

<dependencies>
<dependency>
<groupId>org.flinkspector</groupId>
<artifactId>flinkspector-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.flinkspector</groupId>
<artifactId>flinkspector-datastream_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.7</source>
<target>1.7</target>
<!-- The output of Xlint is not shown by default, but we activate it for the QA bot
to be able to get more warnings -->
<compilerArgument>-Xlint:all</compilerArgument>
</configuration>
</plugin>
<!-- maven jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>

<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
</configuration>
</plugin>

<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Adding scala source directories to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>



</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2015 Otto (GmbH & Co KG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.flinkspector.scala.datastream

import org.hamcrest.{Description, TypeSafeDiagnosingMatcher}
import scala.collection.mutable.ArrayBuffer
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._

class AssertBlock[T <: Product,M <: Product : Manifest]
extends TypeSafeDiagnosingMatcher[JIterable[T]] {

var v: M = _

val assertions = ArrayBuffer.empty[() => Unit]

def field(assert: => Unit) = {
assertions += (() => assert)
}

def valid(product: Product): Option[String] = {
v = productToCaseClass[M](product)
assertions.foreach { assert =>
try {
assert()
} catch {
case t: Throwable =>
return Some(t.toString)
}
}
None
}

override def matchesSafely(iterable: JIterable[T], mismatchDescription: Description): Boolean = {
iterable.foreach { item =>
valid(item) match {
case None => true
case Some(s: String) =>
mismatchDescription.appendText(s)
return false
}
}
true
}

override def describeTo(description: Description): Unit = {

}

def productToCaseClass[P <: Product](product: Product)(implicit manifest: Manifest[P]): P = {
val runtimeClass = manifest.runtimeClass
val innerCaseClass = """(.*\$)([0-9]+)$""".r

val caseClassCompanionName: String = innerCaseClass findFirstIn runtimeClass.getName match {
case Some(innerCaseClass(name, index)) =>
throw new IllegalArgumentException(s"class $name+$index is not accessable from this context")
case None => runtimeClass.getName + "$"
}

val companionObjectClass = Class.forName(caseClassCompanionName)
val ccCompanionConstructor = companionObjectClass.getDeclaredConstructor()
ccCompanionConstructor.setAccessible(true)
val ccCompanionObject = ccCompanionConstructor.newInstance()

val applyMethod = companionObjectClass
.getMethods
.find(x => x.getName == "apply" && x.isBridge).get

val values = (product.productIterator map (_.asInstanceOf[AnyRef])).toSeq

val caseClass: P = applyMethod.invoke(ccCompanionObject, values: _*)
.asInstanceOf[P]

caseClass
}

}


Loading