128128import java .util .List ;
129129import java .util .Map ;
130130import java .util .Optional ;
131+ import java .util .concurrent .ConcurrentHashMap ;
131132import java .util .concurrent .atomic .AtomicLong ;
132133import java .util .function .Function ;
133134
@@ -422,6 +423,112 @@ public void testConnectorHandlesBinarySerialization()
422423 }
423424 }
424425
426+ @ Test (timeOut = 50000 )
427+ public void testMixedConnectorSerializationWithAndWithoutCodec ()
428+ throws Exception
429+ {
430+ AtomicLong lastActivityNanos = new AtomicLong (System .nanoTime ());
431+ TestingTaskResource testingTaskResource = new TestingTaskResource (lastActivityNanos , TestHttpRemoteTask .FailureScenario .NO_FAILURE );
432+
433+ String connectorWithCodec = "test-with-codec" ;
434+ String connectorWithoutCodec = "test-without-codec" ;
435+ Injector injector = createInjectorWithMixedConnectors (connectorWithCodec , connectorWithoutCodec , testingTaskResource );
436+ HttpRemoteTaskFactory httpRemoteTaskFactory = injector .getInstance (HttpRemoteTaskFactory .class );
437+ JsonCodec <TaskUpdateRequest > jsonCodec = injector .getInstance (Key .get (new TypeLiteral <>() {}));
438+
439+ RemoteTask remoteTask = createRemoteTask (httpRemoteTaskFactory );
440+ try {
441+ testingTaskResource .setInitialTaskInfo (remoteTask .getTaskInfo ());
442+ remoteTask .start ();
443+
444+ Lifespan lifespan = driverGroup (1 );
445+
446+ TestConnectorWithCodecSplit splitWithCodec = new TestConnectorWithCodecSplit ("codec-data" , 100 );
447+ TestConnectorWithoutCodecSplit splitWithoutCodec = new TestConnectorWithoutCodecSplit ("json-data" , 200 );
448+
449+ remoteTask .addSplits (ImmutableMultimap .of (
450+ TaskTestUtils .TABLE_SCAN_NODE_ID ,
451+ new Split (new ConnectorId (connectorWithCodec ), TestingTransactionHandle .create (), splitWithCodec , lifespan , NON_CACHEABLE ),
452+ TaskTestUtils .TABLE_SCAN_NODE_ID ,
453+ new Split (new ConnectorId (connectorWithoutCodec ), TestingTransactionHandle .create (), splitWithoutCodec , lifespan , NON_CACHEABLE )));
454+
455+ TestHttpRemoteTask .poll (() -> testingTaskResource .getTaskSource (TaskTestUtils .TABLE_SCAN_NODE_ID ) != null );
456+ TestHttpRemoteTask .poll (() -> testingTaskResource .getTaskSource (TaskTestUtils .TABLE_SCAN_NODE_ID ).getSplits ().size () == 2 );
457+
458+ TaskUpdateRequest taskUpdateRequest = testingTaskResource .getLastTaskUpdateRequest ();
459+ assertNotNull (taskUpdateRequest , "TaskUpdateRequest should not be null" );
460+
461+ String json = jsonCodec .toJson (taskUpdateRequest );
462+ JsonNode root = OBJECT_MAPPER .readTree (json );
463+ JsonNode splitsNode = root .at ("/sources/0/splits" );
464+
465+ assertTrue (splitsNode .isArray () && splitsNode .size () == 2 ,
466+ "Should have exactly 2 splits" );
467+
468+ JsonNode codecSplitNode = null ;
469+ JsonNode jsonSplitNode = null ;
470+
471+ for (JsonNode splitWrapper : splitsNode ) {
472+ JsonNode connectorIdNode = splitWrapper .at ("/split/connectorId" );
473+ String catalogName = connectorIdNode .asText ();
474+ JsonNode connectorSplitNode = splitWrapper .at ("/split/connectorSplit" );
475+
476+ if (connectorWithCodec .equals (catalogName )) {
477+ codecSplitNode = connectorSplitNode ;
478+ }
479+ else if (connectorWithoutCodec .equals (catalogName )) {
480+ jsonSplitNode = connectorSplitNode ;
481+ }
482+ }
483+
484+ assertNotNull (codecSplitNode , "Should find split from connector with codec" );
485+ assertNotNull (jsonSplitNode , "Should find split from connector without codec" );
486+
487+ assertTrue (codecSplitNode .has ("customSerializedValue" ),
488+ "Split with codec should have customSerializedValue for binary serialization" );
489+ assertFalse (codecSplitNode .has ("data" ),
490+ "Split with codec should not have inline data field" );
491+
492+ assertFalse (jsonSplitNode .has ("customSerializedValue" ),
493+ "Split without codec should not have customSerializedValue" );
494+ assertTrue (jsonSplitNode .has ("data" ),
495+ "Split without codec should have inline data field for JSON serialization" );
496+ assertTrue (jsonSplitNode .has ("sequence" ),
497+ "Split without codec should have inline sequence field for JSON serialization" );
498+
499+ TaskUpdateRequest deserializedRequest = jsonCodec .fromJson (json );
500+ TaskSource deserializedSource = deserializedRequest .getSources ().get (0 );
501+ List <Split > deserializedSplits = ImmutableList .copyOf (deserializedSource .getSplits ().stream ()
502+ .map (splitAssignment -> splitAssignment .getSplit ())
503+ .collect (com .google .common .collect .ImmutableList .toImmutableList ()));
504+
505+ assertEquals (deserializedSplits .size (), 2 , "Should have 2 deserialized splits" );
506+
507+ boolean foundCodecSplit = false ;
508+ boolean foundJsonSplit = false ;
509+
510+ for (Split split : deserializedSplits ) {
511+ if (split .getConnectorSplit () instanceof TestConnectorWithCodecSplit ) {
512+ TestConnectorWithCodecSplit deserialized = (TestConnectorWithCodecSplit ) split .getConnectorSplit ();
513+ assertEquals (deserialized , splitWithCodec , "Codec split should match after round-trip" );
514+ foundCodecSplit = true ;
515+ }
516+ else if (split .getConnectorSplit () instanceof TestConnectorWithoutCodecSplit ) {
517+ TestConnectorWithoutCodecSplit deserialized = (TestConnectorWithoutCodecSplit ) split .getConnectorSplit ();
518+ assertEquals (deserialized , splitWithoutCodec , "JSON split should match after round-trip" );
519+ foundJsonSplit = true ;
520+ }
521+ }
522+
523+ assertTrue (foundCodecSplit , "Should have found and verified the codec split" );
524+ assertTrue (foundJsonSplit , "Should have found and verified the JSON split" );
525+ }
526+ finally {
527+ remoteTask .cancel ();
528+ httpRemoteTaskFactory .stop ();
529+ }
530+ }
531+
425532 private static RemoteTask createRemoteTask (HttpRemoteTaskFactory httpRemoteTaskFactory )
426533 {
427534 return createRemoteTask (httpRemoteTaskFactory , createPlanFragment ());
@@ -480,16 +587,16 @@ private static PlanFragment createPlanFragmentWithCodecHandles(String connectorN
480587 private static Injector createInjectorWithCodec (String connectorName , TestingTaskResource testingTaskResource )
481588 throws Exception
482589 {
483- InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig ().setThriftTransportEnabled (false );
484- return createInjectorWithCodec (connectorName , testingTaskResource , internalCommunicationConfig );
590+ return createInjectorWithMixedConnectors (connectorName , "unused-connector" , testingTaskResource );
485591 }
486592
487- private static Injector createInjectorWithCodec (
488- String connectorName ,
489- TestingTaskResource testingTaskResource ,
490- InternalCommunicationConfig internalCommunicationConfig )
593+ private static Injector createInjectorWithMixedConnectors (
594+ String connectorWithCodec ,
595+ String connectorWithoutCodec ,
596+ TestingTaskResource testingTaskResource )
491597 throws Exception
492598 {
599+ InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig ().setThriftTransportEnabled (false );
493600 Bootstrap app = new Bootstrap (
494601 new JsonModule (),
495602 new SmileModule (),
@@ -508,29 +615,30 @@ public void configure(Binder binder)
508615
509616 TestConnectorWithCodecProvider codecProvider = new TestConnectorWithCodecProvider ();
510617
511- Map <String , ConnectorCodec <ConnectorSplit >> splitCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
512- splitCodecMap .put (connectorName , codecProvider .getConnectorSplitCodec ().get ());
618+ Map <String , ConnectorCodec <ConnectorSplit >> splitCodecMap = new ConcurrentHashMap <>();
619+ splitCodecMap .put (connectorWithCodec , codecProvider .getConnectorSplitCodec ().get ());
513620
514- Map <String , ConnectorCodec <ConnectorTableHandle >> tableHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
515- tableHandleCodecMap .put (connectorName , codecProvider .getConnectorTableHandleCodec ().get ());
621+ Map <String , ConnectorCodec <ConnectorTableHandle >> tableHandleCodecMap = new ConcurrentHashMap <>();
622+ tableHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorTableHandleCodec ().get ());
516623
517- Map <String , ConnectorCodec <ColumnHandle >> columnHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
518- columnHandleCodecMap .put (connectorName , codecProvider .getConnectorColumnHandleCodec ().get ());
624+ Map <String , ConnectorCodec <ColumnHandle >> columnHandleCodecMap = new ConcurrentHashMap <>();
625+ columnHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorColumnHandleCodec ().get ());
519626
520- Map <String , ConnectorCodec <ConnectorTableLayoutHandle >> tableLayoutHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
521- tableLayoutHandleCodecMap .put (connectorName , codecProvider .getConnectorTableLayoutHandleCodec ().get ());
627+ Map <String , ConnectorCodec <ConnectorTableLayoutHandle >> tableLayoutHandleCodecMap = new ConcurrentHashMap <>();
628+ tableLayoutHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorTableLayoutHandleCodec ().get ());
522629
523- Map <String , ConnectorCodec <ConnectorOutputTableHandle >> outputTableHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
524- outputTableHandleCodecMap .put (connectorName , codecProvider .getConnectorOutputTableHandleCodec ().get ());
630+ Map <String , ConnectorCodec <ConnectorOutputTableHandle >> outputTableHandleCodecMap = new ConcurrentHashMap <>();
631+ outputTableHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorOutputTableHandleCodec ().get ());
525632
526- Map <String , ConnectorCodec <ConnectorInsertTableHandle >> insertTableHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
527- insertTableHandleCodecMap .put (connectorName , codecProvider .getConnectorInsertTableHandleCodec ().get ());
633+ Map <String , ConnectorCodec <ConnectorInsertTableHandle >> insertTableHandleCodecMap = new ConcurrentHashMap <>();
634+ insertTableHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorInsertTableHandleCodec ().get ());
528635
529- Map <String , ConnectorCodec <ConnectorDeleteTableHandle >> deleteTableHandleCodecMap = new java . util . concurrent . ConcurrentHashMap <>();
530- deleteTableHandleCodecMap .put (connectorName , codecProvider .getConnectorDeleteTableHandleCodec ().get ());
636+ Map <String , ConnectorCodec <ConnectorDeleteTableHandle >> deleteTableHandleCodecMap = new ConcurrentHashMap <>();
637+ deleteTableHandleCodecMap .put (connectorWithCodec , codecProvider .getConnectorDeleteTableHandleCodec ().get ());
531638
532639 HandleResolver handleResolver = new HandleResolver ();
533- handleResolver .addConnectorName (connectorName , new TestConnectorWithCodecHandleResolver ());
640+ handleResolver .addConnectorName (connectorWithCodec , new TestConnectorWithCodecHandleResolver ());
641+ handleResolver .addConnectorName (connectorWithoutCodec , new TestConnectorWithoutCodecHandleResolver ());
534642 binder .bind (HandleResolver .class ).toInstance (handleResolver );
535643
536644 Function <ConnectorId , Optional <ConnectorCodec <ConnectorTableHandle >>> tableHandleCodecExtractor =
@@ -657,9 +765,8 @@ private HttpRemoteTaskFactory createHttpRemoteTaskFactory(
657765 HandleResolver handleResolver = injector .getInstance (HandleResolver .class );
658766 handleResolver .addConnectorName ("test" , new com .facebook .presto .testing .TestingHandleResolver ());
659767
660- // Register the connector codec provider
661768 ConnectorCodecManager codecManager = injector .getInstance (ConnectorCodecManager .class );
662- codecManager .addConnectorCodecProvider (new ConnectorId (connectorName ), new TestConnectorWithCodecProvider ());
769+ codecManager .addConnectorCodecProvider (new ConnectorId (connectorWithCodec ), new TestConnectorWithCodecProvider ());
663770
664771 return injector ;
665772 }
@@ -1176,4 +1283,115 @@ public int hashCode()
11761283 return java .util .Objects .hash (tableName );
11771284 }
11781285 }
1286+
1287+ public static class TestConnectorWithoutCodecSplit
1288+ implements ConnectorSplit
1289+ {
1290+ private final String data ;
1291+ private final int sequence ;
1292+
1293+ @ JsonCreator
1294+ public TestConnectorWithoutCodecSplit (
1295+ @ JsonProperty ("data" ) String data ,
1296+ @ JsonProperty ("sequence" ) int sequence )
1297+ {
1298+ this .data = data ;
1299+ this .sequence = sequence ;
1300+ }
1301+
1302+ @ JsonProperty
1303+ public String getData ()
1304+ {
1305+ return data ;
1306+ }
1307+
1308+ @ JsonProperty
1309+ public int getSequence ()
1310+ {
1311+ return sequence ;
1312+ }
1313+
1314+ @ Override
1315+ public NodeSelectionStrategy getNodeSelectionStrategy ()
1316+ {
1317+ return NodeSelectionStrategy .NO_PREFERENCE ;
1318+ }
1319+
1320+ @ Override
1321+ public List <HostAddress > getPreferredNodes (NodeProvider nodeProvider )
1322+ {
1323+ return ImmutableList .of ();
1324+ }
1325+
1326+ @ Override
1327+ public Object getInfo ()
1328+ {
1329+ return ImmutableMap .of ("data" , data , "sequence" , sequence );
1330+ }
1331+
1332+ @ Override
1333+ public boolean equals (Object obj )
1334+ {
1335+ if (this == obj ) {
1336+ return true ;
1337+ }
1338+ if (obj == null || getClass () != obj .getClass ()) {
1339+ return false ;
1340+ }
1341+ TestConnectorWithoutCodecSplit that = (TestConnectorWithoutCodecSplit ) obj ;
1342+ return sequence == that .sequence && java .util .Objects .equals (data , that .data );
1343+ }
1344+
1345+ @ Override
1346+ public int hashCode ()
1347+ {
1348+ return java .util .Objects .hash (data , sequence );
1349+ }
1350+ }
1351+
1352+ public static class TestConnectorWithoutCodecHandleResolver
1353+ implements ConnectorHandleResolver
1354+ {
1355+ @ Override
1356+ public Class <? extends ConnectorTableHandle > getTableHandleClass ()
1357+ {
1358+ throw new UnsupportedOperationException ("Table handles not supported" );
1359+ }
1360+
1361+ @ Override
1362+ public Class <? extends ConnectorTableLayoutHandle > getTableLayoutHandleClass ()
1363+ {
1364+ throw new UnsupportedOperationException ("Table layout handles not supported" );
1365+ }
1366+
1367+ @ Override
1368+ public Class <? extends ColumnHandle > getColumnHandleClass ()
1369+ {
1370+ throw new UnsupportedOperationException ("Column handles not supported" );
1371+ }
1372+
1373+ @ Override
1374+ public Class <? extends ConnectorSplit > getSplitClass ()
1375+ {
1376+ return TestConnectorWithoutCodecSplit .class ;
1377+ }
1378+
1379+ @ Override
1380+ public Class <? extends ConnectorOutputTableHandle > getOutputTableHandleClass ()
1381+ {
1382+ throw new UnsupportedOperationException ("Output table handles not supported" );
1383+ }
1384+
1385+ @ Override
1386+ public Class <? extends ConnectorInsertTableHandle > getInsertTableHandleClass ()
1387+ {
1388+ throw new UnsupportedOperationException ("Insert table handles not supported" );
1389+ }
1390+
1391+ @ Override
1392+ public Class <? extends ConnectorDeleteTableHandle > getDeleteTableHandleClass ()
1393+ {
1394+ throw new UnsupportedOperationException ("Delete table handles not supported" );
1395+ }
1396+ }
11791397}
0 commit comments