Skip to content

[C#] Arrow Flight consumer raises exception when receiving a RecordBatch with a DictionaryArray field #46089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
sstein37 opened this issue Apr 10, 2025 · 0 comments

Comments

@sstein37
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

Steps to reproduce:

  1. Create an ASP .NET Core server application using package Apache.Arrow.Flight.AspNetCore (19.0.1) and the below code for Program.cs and TestFlightServer.cs
  2. Create a Windows Console client application using package Apache.Arrow.Flight (19.0.1) and the below code for Program.cs
  3. Run the server application and client application simultaneously. Press enter in the client terminal and observe the exception. Note that this behavior does not occur if the client Program.cs is changed so that MakeTestRecordBatch is called with argument includeDictionaryColumn=false.

Observed Exception

System.ArgumentException
  HResult=0x80070057
  Message=Field with name Column3 not found
  Source=Apache.Arrow
  StackTrace:
   at Apache.Arrow.Ipc.DictionaryMemo.GetId(Field field)
   at Apache.Arrow.Ipc.ArrowReaderImplementation.LoadField(MetadataVersion version, RecordBatchEnumerator& recordBatchEnumerator, Field field, FieldNode& fieldNode, ByteBuffer bodyData, IBufferCreator bufferCreator)
   at Apache.Arrow.Ipc.ArrowReaderImplementation.BuildArrays(MetadataVersion version, Schema schema, ByteBuffer messageBuffer, RecordBatch recordBatchMessage)
   at Apache.Arrow.Ipc.ArrowReaderImplementation.CreateArrowObjectFromMessage(Message message, ByteBuffer bodyByteBuffer, IMemoryOwner`1 memoryOwner)
   at Apache.Arrow.Flight.Internal.RecordBatchReaderImplementation.<ReadNextRecordBatchAsync>d__11.MoveNext()
   at System.Runtime.CompilerServices.ValueTaskAwaiter`1.GetResult()
   at Apache.Arrow.Flight.FlightRecordBatchStreamReader.<MoveNext>d__12.MoveNext()
   at Program.<<Main>$>d__0.MoveNext() in FlightClient\Program.cs:line 11

Server: Program.cs

using Microsoft.AspNetCore.Server.Kestrel.Core;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc().AddFlightServer<FlightServer.TestFlightServer>();
builder.WebHost.ConfigureKestrel(c => { c.ListenAnyIP(50552, o => { o.Protocols = HttpProtocols.Http2; }); });
var app = builder.Build();
app.MapFlightEndpoint();
app.Run();

Server: TestFlightServer.cs

using Apache.Arrow;
using Apache.Arrow.Types;
using Grpc.Core;
namespace FlightServer
{
    public class TestFlightServer : Apache.Arrow.Flight.Server.FlightServer
    {
        private static RecordBatch MakeTestRecordBatch(bool includeDictionaryColumn)
        {
            var rbBuilder = new RecordBatch.Builder();
            rbBuilder.Append("Column1", true, new Int32Array.Builder().Append(1).Append(2).Append(3).Append(4).Build());
            rbBuilder.Append("Column2", true, new StringArray.Builder().Append("foo1").Append("foo2").Append("foo3").Append("foo4").Build());
            if (includeDictionaryColumn)
            {
                var dictionaryType = new DictionaryType(indexType: Int32Type.Default, valueType: StringType.Default, ordered: false);
                var dictionaryArray = new DictionaryArray(
                    dictionaryType,
                    new Int32Array.Builder().Append(2).Append(0).Append(0).Append(1).Build(),
                    new StringArray.Builder().Append("blue").Append("red").Append("green").Build()
                    );
                rbBuilder.Append("Column3", true, dictionaryArray);
            }
            return rbBuilder.Build();
        }

        public override async Task DoGet(Apache.Arrow.Flight.FlightTicket ticket, Apache.Arrow.Flight.Server.FlightServerRecordBatchStreamWriter responseStream, ServerCallContext context) =>  await responseStream.WriteAsync(MakeTestRecordBatch(true));
    }
}

Client: Program.cs

using Grpc.Core;
using Apache.Arrow.Flight.Client;

Console.WriteLine("Press enter to connect");
Console.ReadLine();
using var channel = Grpc.Net.Client.GrpcChannel.ForAddress("http://localhost:50552", new Grpc.Net.Client.GrpcChannelOptions { MaxReceiveMessageSize = 1024 * 1024 * 1024 });
var client = new FlightClient(channel);
var ticket = new Apache.Arrow.Flight.FlightTicket(System.Text.Encoding.UTF8.GetBytes(""));
var stream = client.GetStream(ticket);
while (await stream.ResponseStream.MoveNext())
{
    var recordBatch = stream.ResponseStream.Current;
    Console.WriteLine(format: "received RecordBatch containing {0} columns and {1} rows", recordBatch.Schema.FieldsList.Count, recordBatch.Length);
}

Component(s)

C#

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant