-
Notifications
You must be signed in to change notification settings - Fork 59
Flink Spector throws please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation) error #52
Comments
Hi, You are using scala which is not yet officially supported, but shouldn't pose any problems here as far I can see, since you're using the java api of Flink. You're indeed right that I should provide a way here to provide Typeinformation. I will try to include this in the next release. But I think your problem is right here: I use the first element later to extract TypeInformation. I think your test would work fine if the first element in your TestStream / Input would contain actual data. You could change the logic to provide the input like this: var testStream :EventTimeSourceBuilder[Tuple2[String, Report]] = null
val it = ReportList.iterator()
var report:Tuple2[String,Report] = null
report = it.next()
testStream = createTimedTestStreamWith(Tuple2.of(report.f0, report.f1))
while(it.hasNext)
{
report = it.next()
testStream = testStream.emit(Tuple2.of(report.f0, report.f1))
}
This way the first element contains actual data. And since your'e not providing any timestamps but do Windowing do a I have not tested any of this. Please provide some Feedback if this resolves your problem. Cheers, |
Hello, I have found somewhat a solution that works:
The only issue I see is that the first record isn't inserted into a window, so I am not sure where the first record in the list ends up. Maybe |
We got flink-spector working with scala driver by adding following in the sbt "junit" % "junit" % "4.11" % Test, Implementation is described as above by chris. class Consumer extends DataStreamTestBase with AssertionsForJUnit{ if (reports.size < 1) { builder = createTimedTestStreamWith(reports.head) } } you can run the test with "sbt test" |
@cslotterback You can already to this by using the underlying
But I'm going to expose this in the high level api in the next release. @rohanbolar If I'm not mistaken according to the inputs your still using the java api of flink? |
This issue has been bundled into #57 |
Hi There. i am having issue with running the test case
Any help would be great help.
package com.xyz.ips
import java.io.{InputStream, InputStreamReader}
import java.util.concurrent.TimeUnit
import com.xyz.ips.mediation.Report
import com.xyz.ips.test.utii.TestDataReader
import com.xyz.ips.transformation.{Counter, EMMFlatMap}
import com.typesafe.config.ConfigException.Null
import abc.efg.EReport
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.windowing.time.Time
import org.flinkspector.core.quantify.{MatchTuples, OutputMatcher}
import org.flinkspector.datastream.{DataStreamTestBase, DataStreamTestEnvironment}
import org.flinkspector.datastream.input.time.InWindow
import org.hamcrest.CoreMatchers._
import org.junit.Test
import org.scalatest.junit.AssertionsForJUnit
import scala.io.Source
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericRecord}
import org.apache.avro.io.{DatumReader, Decoder, DecoderFactory, JsonDecoder}
import org.apache.avro.specific.{SpecificData, SpecificDatumReader, SpecificRecord}
import org.apache.commons.io.IOUtils
import org.apache.avro.io.Decoder
import org.apache.avro.io.JsonDecoder
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.flinkspector.datastream.input.EventTimeSourceBuilder
import scala.collection.mutable.ArrayBuffer
class ReportConsumerTest extends DataStreamTestBase with AssertionsForJUnit{
private var schema = null
/**
* Reader that deserializes byte array into a record.
*/
private var datumReader = null
/**
* Input stream to read message from.
*/
private var inputStream = null
/**
* Avro decoder that decodes binary data.
*/
private var decoder = null
/**
* Record to deserialize byte array to.
*/
private var record = null
val serDataType: TypeInformation[Tuple2[String, Report]] = createTypeInformation[Tuple2[String, Report]]
@test def testCounter() = {
/*
System.out.println("test****************")
*/
}
def window(stream: DataStream[Tuple2[String,Report]]):DataStream[Tuple2[String,EReport]] = {
}
}
-Getting following issue
val testStream1 = testStream.close(); line is throwing error
[error] Test com.xyz.ips.ReportConsumerTest.testCounter failed: java.lang.RuntimeException: Could not startWith TypeInformation for type class org.apache.flink.streaming.runtime.streamrecord.StreamRecord; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation), took 1.131 sec
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromCollectionWithTimestamp(DataStreamTestEnvironment.java:188)
[error] at org.flinkspector.datastream.DataStreamTestEnvironment.fromInput(DataStreamTestEnvironment.java:142)
[error] at org.flinkspector.datastream.input.EventTimeSourceBuilder.close(EventTimeSourceBuilder.java:62)
[error] at com.xyz.ips.ReportConsumerTest.testCounter(ReportConsumerTest.scala:92)
The text was updated successfully, but these errors were encountered: