Skip to content

in_tcp: fix parsed bytes to include separator length #10385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 30, 2025

Conversation

binary85
Copy link
Contributor

@binary85 binary85 commented May 26, 2025

Fixed malfunction when using a seperator longer than 1 charactor in tcp INPUT none format log.
Consumed length does not include the length of the separator.

I use $endlog$\r\n as a separator. As shown below, however part of the separator is included in the split logs, or some content from the previous log is replayed in the next one.

{"log":"Hello, fluent-bit! 1"}
{"log":"Hello, fluent-bit! 2"}
{"log":"Hello, fluent-bit! 3"}
{"log":"lo, fluent-bit! 3"}
{"log":"endlog$\r\nHello, fluent-bit! 4"}
{"log":"endlog$\r\nHello, fluent-bit! 5"}
{"log":"endlog$\r\nHello, fluent-bit! 6"}
{"log":"endlog$\r\nHello, fluent-bit! 7"}
{"log":"Hello, fluent-bit! 8"}
{"log":"t-bit! 8"}
{"log":"endlog$\r\nHello, fluent-bit! 9"}
{"log":"endlog$\r\nHello, fluent-bit! 10"}
{"log":"endlog$\r\nHello, fluent-bit! 11"}
{"log":"Hello, fluent-bit! 12"}
{"log":"-bit! 12"}
{"log":"endlog$\r\nHello, fluent-bit! 13"}
{"log":"Hello, fluent-bit! 14"}
{"log":"-bit! 14"}
{"log":"Hello, fluent-bit! 15"}
{"log":"-bit! 15"}
{"log":"endlog$\r\nHello, fluent-bit! 16"}
{"log":"Hello, fluent-bit! 17"}
{"log":"-bit! 17"}
{"log":"endlog$\r\nHello, fluent-bit! 18"}
{"log":"endlog$\r\nHello, fluent-bit! 19"}
{"log":"Hello, fluent-bit! 20"}

After checking the code for parsing the "none" format of the TCP INPUT, I found that the length of the parsed data did not include the length of the separator.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • [N/A] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@binary85
Copy link
Contributor Author

configurations for test

[SERVICE]
    flush        1
    daemon       Off
    log_level    debug
    log_file     logs/fluent-bit.log
    
    parsers_file parsers.conf
    plugins_file plugins.conf
    
    storage.metrics on
    storage.path tmp/storage
    storage.sync normal
    storage.backlog.mem_limit 5M

[INPUT]
    name    tcp
    listen  0.0.0.0
    port    8091
    Format  none
    Tag     test
    Chunk_Size  1024
    Buffer_Size 10024
    Separator $endlog$\r\n

[OUTPUT]
    Name file
    Match test
    Format plain
    Path logs/

@binary85
Copy link
Contributor Author

debug log output

[2025/05/22 17:19:05] [ info] [fluent bit] version=4.0.1, commit=b12e507090, pid=46212
[2025/05/22 17:19:05] [debug] [engine] coroutine stack size: 98302 bytes (96.0K)
[2025/05/22 17:19:05] [ info] [storage] ver=1.5.3, type=memory+filesystem, sync=normal, checksum=off, max_chunks_up=128
[2025/05/22 17:19:05] [ info] [output:file:file.0] worker #0 started
[2025/05/22 17:19:05] [ info] [storage] backlog input plugin: storage_backlog.1
[2025/05/22 17:19:05] [ info] [simd    ] disabled
[2025/05/22 17:19:05] [ info] [cmetrics] version=1.0.0
[2025/05/22 17:19:05] [ info] [ctraces ] version=0.6.4
[2025/05/22 17:19:05] [ info] [input:tcp:tcp.0] initializing
[2025/05/22 17:19:05] [ info] [input:tcp:tcp.0] storage_strategy='memory' (memory only)
[2025/05/22 17:19:05] [debug] [tcp:tcp.0] created event channels: read=856 write=832
[2025/05/22 17:19:05] [debug] [downstream] listening on 0.0.0.0:8091
[2025/05/22 17:19:05] [ info] [input:storage_backlog:storage_backlog.1] initializing
[2025/05/22 17:19:05] [ info] [input:storage_backlog:storage_backlog.1] storage_strategy='memory' (memory only)
[2025/05/22 17:19:05] [debug] [storage_backlog:storage_backlog.1] created event channels: read=864 write=868
[2025/05/22 17:19:05] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 4.8M
[2025/05/22 17:19:05] [debug] [file:file.0] created event channels: read=872 write=876
[2025/05/22 17:19:05] [debug] [router] match rule tcp.0:file.0
[2025/05/22 17:19:05] [ info] [sp] stream processor started
[2025/05/22 17:19:35] [debug] [task] created task=000002428E3AD540 id=0 OK
[2025/05/22 17:19:35] [debug] [output:file:file.0] task_id=0 assigned to thread #0
[2025/05/22 17:19:35] [debug] [out flush] cb_destroy coro_id=0
[2025/05/22 17:19:35] [debug] [task] destroy task=000002428E3AD540 (task_id=0)
[2025/05/22 17:19:36] [debug] [input chunk] skip ingesting data with 0 bytes
[2025/05/22 17:19:36] [debug] [task] created task=000002428E3AD540 id=0 OK
[2025/05/22 17:19:36] [debug] [output:file:file.0] task_id=0 assigned to thread #0
[2025/05/22 17:19:36] [debug] [out flush] cb_destroy coro_id=1
[2025/05/22 17:19:36] [debug] [task] destroy task=000002428E3AD540 (task_id=0)
[2025/05/22 17:19:37] [debug] [input chunk] skip ingesting data with 0 bytes
[2025/05/22 17:19:37] [debug] [task] created task=000002428E3AD400 id=0 OK
[2025/05/22 17:19:37] [debug] [output:file:file.0] task_id=0 assigned to thread #0
...

@binary85
Copy link
Contributor Author

You can reproduce this problem with the above configuration and by executing this Go code.

package main

import (
	"fmt"
	"net"
	"time"
)

func main() {
	serverAddr := "localhost:8091"
	var logID uint64 = 0

	for {
		time.Sleep(1 * time.Second)
		conn, err := net.Dial("tcp", serverAddr)
		if err != nil {
			fmt.Println("connect failed:", err)
			continue
		}
		defer conn.Close()
		fmt.Println("connected:", serverAddr)

		message := "Hello, fluent-bit!"

		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()

		failed := false

		for {
			select {
			case <-ticker.C:
				logID++
				message = fmt.Sprintf("Hello, fluent-bit! %d", logID)
				err := sendMessage(conn, message)
				if err != nil {
					fmt.Println("send message failed:", err)
					failed = true
					break
				}

				logID++
				message = fmt.Sprintf("Hello, fluent-bit! %d", logID)
				err = sendMessage(conn, message)
				if err != nil {
					fmt.Println("send message failed:", err)
					failed = true
					break
				}

				logID++
				message = fmt.Sprintf("Hello, fluent-bit! %d", logID)
				err = sendMessage(conn, message)
				if err != nil {
					fmt.Println("send message failed:", err)
					failed = true
					break
				}

				fmt.Println("sent message")
			}

			if failed {
				conn.Close()
				break
			}
		}
	}
}

func sendMessage(conn net.Conn, message string) error {
	separator := "$endlog$"
	newline := "\r\n"

	_, err := conn.Write([]byte(message))
	if err != nil {
		return err
	}
	_, err = conn.Write([]byte(separator))
	if err != nil {
		return err
	}
	_, err = conn.Write([]byte(newline))
	if err != nil {
		return err
	}

	return nil
}

@edsiper edsiper added this to the Fluent bit v4.0.3 milestone May 30, 2025
@edsiper edsiper merged commit 8bc73a6 into fluent:master May 30, 2025
54 checks passed
@edsiper
Copy link
Member

edsiper commented May 30, 2025

thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants