Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of nested collections #28

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

snuyanzin
Copy link
Contributor

within the PR I introduces a new POJO FieldInfo and it simplifies to specify expressions, null-rate, length per any elements in case of complex types like maps of arrays of row and so on.

In fact this PR will be in conflict with #21.

Once one of them will be merged I can rebase another one

@snuyanzin snuyanzin force-pushed the nested_collections branch 2 times, most recently from c214e71 to dbcf354 Compare January 7, 2022 12:16
@snuyanzin
Copy link
Contributor Author

now rebased

@snuyanzin
Copy link
Contributor Author

@knaufk could you please have a look once you have time?

@knaufk
Copy link
Owner

knaufk commented Feb 23, 2022

Thank you for your patience. I've rebased. I am not 100% sure about the syntax. Can you elaborate why the element keyword is needed? Is this supposed to support arbitrary nesting or only one level?

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Feb 24, 2022

yes, element is supposed to support arbitrary nesting example is below.
Like key and value for maps it allows to get access to elements of array to set up conditions like null-rate, length and expression

CREATE TEMPORARY TABLE t (
  `arrays` ARRAY<ARRAY<ARRAY<STRING>>>
) WITH (
  'connector' = 'faker',
  'fields.arrays.element.element.expression' = '#{harry_potter.character}',
  'fields.arrays.element.element.length' = '2',
  'fields.arrays.element.length' = '3',
  'fields.arrays.length' = '4',
  'fields.arrays.element.element.null-rate' = '0.1',
  'fields.arrays.element.null-rate' = '0.1',  
  'fields.arrays.null-rate' = '0.1'
);

UPD: the issue below could be fixed with apache/flink#19254
The issue I've just noticed is that this works with Flink master's branch (1.15.0) however it fails with Flink's NPE for 1.14.3 in case of specifying null-rate for element

Flink 1.14.3's NPE

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.NullPointerException
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyColumnarArray(ArrayDataSerializer.java:158)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
	at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
	at com.github.knaufk.flink.faker.FlinkFakerSourceFunction.run(FlinkFakerSourceFunction.java:44)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

without null-rate it works with 1.14.3 as well (so it looks like not flink-faker but flink's problem), e.g.

CREATE TEMPORARY TABLE t2 (
   `arrays` ARRAY<ARRAY<ARRAY<STRING>>>
 ) WITH (
   'connector' = 'faker',
   'fields.arrays.element.element.expression' = '#{harry_potter.character}',
   'fields.arrays.element.element.length' = '2',
   'fields.arrays.element.length' = '3',
   'fields.arrays.length' = '4'
 );

@snuyanzin
Copy link
Contributor Author

snuyanzin commented Mar 28, 2022

Rebased
the NPE issue mentioned above for Flink 1.14.x could be fixed with backporting of apache/flink#19254 to 1.14.X

@snuyanzin
Copy link
Contributor Author

Rebased.
It's supposed to support arbitrary nesting.
@knaufk could you please have a look once you have time?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants