Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 proposal #24

Open
jeromer opened this issue Jan 27, 2019 · 4 comments
Open

v2 proposal #24

jeromer opened this issue Jan 27, 2019 · 4 comments

Comments

@jeromer
Copy link
Contributor

jeromer commented Jan 27, 2019

Hi there, I spotted a few problems with the current implementation which led me to write a specification for a possible v2. Here it is

Table of Contents

New public API

There are only two available clients: TCP or UDP.

TLS becomes an optional configuration which can be injected to either the TCP or UDP client.

No more Connect() method. Connection must be lazily handled by the client.

It is up to the caller's responsability to call Close() when required.

examples below.

A non TLS TCP client

c, err := NewTCPClient(
    "example.com:12345",
)

if err != nil {
    // do something
}

err  = c.WithTimeout(
    100 * time.Millisecond,
)

if err != nil {
    // do something
}

A non TLS UDP client

c, err := NewUDPClient(
    "example.com:12345",
)

if err != nil {
    // do something
}

err = c.WithTimeout(
    100 * time.Millisecond,
)

if err != nil {
    // do something
}

A TLS enabled TCP client

c, err := NewTCPClient(
    "example.com:12345",
)

if err != nil {
    // do something
}

err = c.WithTimeout(
    100 * time.Millisecond,
)

if err != nil {
    // do something
}

err = c.WithTLS(
    "path/to/cert",
    "path/to/key",
    SKIP_CERT_VERIFICATION, // or NO_SKIP_CERT_VERIFICATION
)

if err != nil {
    // do something
}

A TLS enabled UDP client

c, err := NewUDPClient(
    "example.com:12345",
)

if err != nil {
    // do something
}

c.WithTimeout(
    100 * time.Millisecond,
)

err = c.WithTLS(
    "path/to/cert",
    "path/to/key",
    SKIP_CERT_VERIFICATION, // or NO_SKIP_CERT_VERIFICATION
)

if err != nil {
    // do something
}

If that's too complex for users to implement all these steps for TLS client it is trivial to provide a helper function such as.

func NewTCPClientBuilder(addr, cert, key string, skip bool, timeout time.Duration) (*Client, error) {
    c, err := NewTCPClient(addr)
    if err != nil {
        return nil, err
    }

    err = c.WithTimeout(timeout)
    if err != nil {
        return nil, err
    }

    err = c.WithTLS(cert, key, skip)
    if err != nil {
        return nil, err
    }

    return c, nil
}

So it can be used like this:

c, err := NewTCPClientBuilder(....)
// handle error
defer c.Close()

c.Send(&Event{....})

(The same thing can be done for a UDP builder)

New Event type

The current Event type is:

type Event struct {
	Ttl         float32
	Time        time.Time
	Tags        []string
	Host        string // Defaults to os.Hostname()
	State       string
	Service     string
	Metric      interface{} // Could be Int, Float32, Float64
	Description string
	Attributes  map[string]string
}

There are two problems with it:

  1. Event.Ttl should be a time.Duration so it is easier to express how long
    an Event will survive in Riemman's index

  2. Event.Metric generates all sort of complexities when marshalling and
    requires using Golang's reflection which is complex and slow

The new proposed Event is :

type Event struct {
    TTL           time.Duration
    Time          time.Time
    Tags          []string
    Host          string 
    State         string
    Service       string
    MetricInt64   *int64
    MetricFloat32 *float32
    MetricFloat64 *float64
    Description   string
    Attributes    map[string]string
}

According to Riemman's proto definition an event only supports one of the the following types:

optional sint64 metric_sint64 = 13;
optional double metric_d = 14;
optional float metric_f = 15;

So I propose to directly map :

  • Event.MetricInt64 to Event.metric_sint64
  • Event.MetricFloat32 to Event.metric_d
  • Event.MetricFloat64 to Event.metric_f

And let the user decide which must be used and accept the possible loss that goes along with it for example when sending an uint64.

By doing this we avoid using Golang's reflection and simplify the marshaling process by using the first non nil value among Metric(Int64|Float32|Float64).

Unless there is a high added value to sort Event.Attributes I propose not to sort them and send them as is.

Event.Host can be defaulted to os.Hostname() but we should state in the documentation that this will generate an openat syscall (at least on
Linux), more specifically : openat(AT_FDCWD, "/proc/sys/kernel/hostname", O_RDONLY|O_CLOEXEC ...) for every new Event.

A buffered sender

I propose to always send events in bulk. The idea behind this buffered (bulked) sender is to reduce roundtrips between Riemman and the sender as much as
possible in order to maximize thoughput while still keeping a pretty high send rate.

Here is a prototype implementation below:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	bf := newBufferedSender()

	for i := 0; i < 1e4; i++ {
		bf.Send(
			&event{ID: i},
		)

		time.Sleep(
			time.Duration(rand.Intn(200)) * time.Microsecond,
		)
	}

	bf.Stop()
}

type event struct {
	ID int
}

type bf struct {
	in chan *event
	wg *sync.WaitGroup
}

func newBufferedSender() *bf {
	bf := &bf{
		in: make(chan *event, 1e6),
		wg: new(sync.WaitGroup),
	}

	bf.wg.Add(1)

	go bf.start()

	return bf
}

func (bf *bf) start() {
	var buff []*event

	resetBuff := func() {
		buff = make([]*event, 0, 1e2)
	}

	send := func(ticked bool) {
		fmt.Printf(
			"Sending buff with %d events, ticked: %t\n",
			len(buff), ticked,
		)

		time.Sleep(
			1 * time.Millisecond,
		)
	}

	tkr := time.NewTicker(
		20 * time.Millisecond,
	)

	defer func() {
		fmt.Println("Stop ticking")
		tkr.Stop()

		fmt.Printf(
			"Draining buffer which contains %d events\n",
			len(buff),
		)

		time.Sleep(100 * time.Millisecond)

		fmt.Println("Exiting")
		bf.wg.Done()
	}()

	resetBuff()

	for {
		select {
		case e, open := <-bf.in:
			if !open {
				return
			}

			buff = append(buff, e)

			if len(buff) == cap(buff) {
				send(false)

				resetBuff()
			}

		case <-tkr.C:
			if len(buff) <= 0 {
				continue
			}

			send(true)

			resetBuff()
		}
	}
}

func (bf *bf) Stop() {
	close(bf.in)

	bf.wg.Wait()
}

func (bf *bf) Send(e *event) {
	bf.in <- e
}

What do you think ?

:)

@mcorbin
Copy link
Collaborator

mcorbin commented Jan 27, 2019

Hello,

I agree that the client needs a v2 version. When I started working on it, I didn't know Go at all (now, I'm much better :p).

Thanks you for your proposals, I will think about it next week :)

@jeromer
Copy link
Contributor Author

jeromer commented Jan 28, 2019

I just pushed a prototype implementation for a v2.

Only TCPClient is implemented using a buffered sender.

The code is here.

You can run go test -tags=integration -v in order to launch on test using the TCPClient.

You should see something like this:

go test -tags=integration -v
➜  riemann-go-client git:(v2) go test -tags=integration -v
=== RUN   TestToProto
--- PASS: TestToProto (0.00s)
=== RUN   TestTCPClient
--- PASS: TestTCPClient (0.67s)
	tcp_integration_test.go:50: Pushed 100000 events in 665.570922ms
=== RUN   TestNewTlsClientWithInsecure
--- PASS: TestNewTlsClientWithInsecure (0.01s)
=== RUN   TestNewTlsClientWithoutInsecure
--- PASS: TestNewTlsClientWithoutInsecure (0.01s)
=== RUN   TestNewUdpClient
--- PASS: TestNewUdpClient (0.00s)
PASS
ok  	github.com/riemann/riemann-go-client	0.684s

Tell me what you think

:)

@mcorbin
Copy link
Collaborator

mcorbin commented Feb 3, 2019

Thanks for you work. here are some comments:

TLS becomes an optional configuration which can be injected to either the TCP or UDP client.

Not needed for UDP.

No more Connect() method. Connection must be lazily handled by the client.

I like being able to choose when to define a client and when to start it. Sometimes, it's not at the same time. I would keep the Connect() method.

Event.Host can be defaulted to os.Hostname()

I often need to send events to Riemann without the hostname set, so I would not set the hostname by default.

I will try to find the time next week to work on it too.

@jeromer
Copy link
Contributor Author

jeromer commented Feb 4, 2019

Aloha,

No more Connect() method. Connection must be lazily handled by the client.

I like being able to choose when to define a client and when to start it. Sometimes, it's not at the same time. I would keep the Connect() method.

I believe you appreciate to keep control on when the client should connect. So you can instantiate a client somewhere in your code and then later, connect to Riemann and then do some real work. The idea behind this is not to connect when it is not needed I think.

If the client lazily connects to Riemann it achieves the exact same goal while masking the implementation so users do not even have to care about connecting to Riemann. If you want to send some events then the client should connect if not already connected and if you want to query the index then the client should connect right before sending the query. By default the client could keep the connection open and reuse it as much as possible and automatically reconnect when needed so the user does not even have to care about this.

Event.Host can be defaulted to os.Hostname()

I often need to send events to Riemann without the hostname set, so I would not set the hostname by default.

That's perfect :)

Using a loop to periodically send the events can be an issue. Riemann returns a proto message indicating if the message was correctly processed (cf http://riemann.io/howto.html#write-a-client).

Indeed. It really depends on the use case. If a user wants to send Riemann events at a low rate or from times to times then yes it is required to check the response Riemann returns in order to give the user a chance to act on this response, maybe retry if the send fails or anything else a user may need.

Another use case (mine for example) is to send events to Riemann at a very high rate. Basically my use case will literally flood Riemann with events. Which means that I can not afford to check whether or not the event is correctly processed by Riemann as I expect it to be the case and if I had to check Riemann's response every time as this will kill the sending throughput. This indeed poses the problem of backpressure you describes below.

The good news is that I believe there is a way to solve both problems simply by providing a buffered sender (sends in bulk and ignore Riemann's response) and a non buffered one (sends events possibly one by one and check Riemman's response and returns it to the user).

It's also useful for backpressure (http://riemann.io/howto.html#client-backpressure-latency-and-queues). The user should then be able to read the response from Riemann.

Maybe there is a way to let the client handle backpressure. Here is a possible strategy. In a separate connection, the client could poll riemann netty event-executor queue size every X ms (in a dedicated goroutine) and if the queue size is above a configurable threshold then the client could start buffering events (which will probably lead to events being sent too late) and if this buffer becomes too large (configurable ?) then the client will simply start dropping events to the floor. With this system we could even dynamically control the event bulk size for every request. That way we can control how much we buffer events, how periodically we send them (if buffered) and how large the event bulk should be. If this system is implemented this also means the client should provide stats on its behavior so the user knows what happens. Like "why does Riemann no longer receives as much event as before ?" (user check Riemann's client stats) "Ha that's because the client dropped event to the floor".

I think tomb is better than sync.WaitGroup to control the goroutine (context support, error handling ...).

Never used it but why not :)

The user should be able to pass a context to the send method.

Ha, indeed. But then is it a context per event which is to be sent or a context per bulk sent ?

I will try to find the time next week to work on it too.

Cool :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants