Skip to content

PxyUp/rx_go

Repository files navigation

RxGo with generics (v1.18+)

go get github.com/PxyUp/rx_go

This attempt to build generic version of Rx(Reactive) library

Observables

  1. New - create new observable from observer
observer := rx_go.NewObserver[Y]()
observable := rx_go.New(observer)
  1. From - create new observable from static array
observable := rx_go.From([]int{1, 2, 3})
  1. NewInterval - return observable from IntervalObserver observer
// Create interval which start from now
interval := rx_go.NewInterval(time.Second, true)
  1. NewHttp - return Observable from HttpObserver
obs, err := rx_go.NewHttp(http.DefaultClient, req)
  1. MapTo - create new observable with modified values
rx_go.MapTo[int, string](rx_go.From([]int{1, 2, 3}), func(t int) string {
	return fmt.Sprintf("hello %d", t)
}).Subscribe()
  1. Merge - merging multi observables with same type into single one
rx_go.Merge[int](rx_go.From[int]([]int{1, 2, 3, 7}...), rx_go.From[int]([]int{4, 5, 6}...)).Subscribe()
  1. FromChannel - create new observable from readable channel
rx_go.FromChannel[int](intChannel).Subscribe()
  1. Switch - change stream for observable
rx_go.Switch(rx_go.From([]int{1, 2, 3}...), func(value int) *rx_go.Observable[string] {
	return rx_go.From(fmt.Sprintf("HELLO %d", value)).Pipe(rx_go.Repeat[string](2))
}).Subscribe()
  1. Of - create static observable with one value
rx_go.Of("hello").Subscribe()
  1. Concat - create static observable witch emit single array of all values
rx_go.Concat(rx_go.From([]int{1, 2, 3, 4, 5, 6}...)).Subscribe()
  1. Reduce - create new observable which return accumulation value from all previous emitted items
rx_go.Reduce(rx_go.From([]int{1, 2, 3, 4, 5, 6}...), func(y string, t int) string {
	return y + fmt.Sprintf("%d", t)
}, "").Subscribe()
  1. Pairwise - create new observable with groups pairs of consecutive emissions together and emits them as an array of two values.
rx_go.Pairwise[int](rx_go.From([]int{1, 2, 3, 4, 5, 6}...)).Subscribe()
  1. Never - observable which never emiting
rx_go.Never
  1. ForkJoin - wait for Observables to complete and then combine last values they emitted; complete immediately if an empty array is passed.
rx_go.ForkJoin(rx_go.From([]int{1, 2, 3}...), rx_go.From([]int{4, 5, 6}...), rx_go.From([]int{7, 8, 9}...)).Subscribe()
  1. Empty - create new Observer which just completed
rx_go.Empty

Methods

  1. Subscribe - create subscription channel and cancel function
ch, cancel := obs.Subscribe()
  1. Pipe - function for accept operators

Operators:

  1. Filter - filter out
obs.Pipe(rx_go.Filter[int](func(value int) bool {
	return value > 16
})).Subscribe()
  1. Map - change value
obs.Pipe(rx_go.Map[int](func(value int) int {
	return value * 3
})).Subscribe()
  1. LastOne - get last one from the stream
obs.Pipe(rx_go.LastOne[int]()).Subscribe()
  1. FirstOne - get first one from the stream
obs.Pipe(rx_go.FirstOne[int]()).Subscribe()
  1. Delay - delay before emit next value
obs.Pipe(rx_go.Delay[int](time.Second)).Subscribe()
  1. Debounce - emit value if in provided amount of time new value was not emitted
obs.Pipe(rx_go.Debounce[int](time.Millisecond*500)).Subscribe()
  1. Do - execute action on each value
obs.Pipe(
    rx_go.Do(func(value int) {
        if value == 2 {
            cancel()
        }
    }),
).Subscribe()
  1. UntilCtx - emit value until context not done
obs.Pipe(
    rx_go.UntilCtx[int](ctx),
).Subscribe()
  1. Distinct - execute value if they different from previous
obs.Pipe(rx_go.Distinct[int]()).Subscribe()
  1. DistinctWith - same like Distinct but accept function like comparator
obs.Pipe(rx_go.DistinctWith[int](func(a, b int) bool { return a == b })).Subscribe()
  1. Take - take provided amount from observable
obs.Pipe(rx_go.Take[int](3)).Subscribe()
  1. Repeat - emit value multiple times
rx_go.From(values...).Pipe(rx_go.Repeat[int](2)).Subscribe()
  1. AfterCtx - emit value after ctx is done(values not ignored, they are not emitted)
obs.Pipe(
    rx_go.AfterCtx[int](ctx),
).Subscribe()
  1. Skip - that skips the first count items emitted
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Skip[int](2)).Subscribe()
  1. StartWith - start emitting with predefined value
rx_go.From([]int{1}...).Pipe(rx_go.StartWith(2)).Subscribe()
  1. EndWith - end emitting with predefined value
rx_go.From([]int{1}...).Pipe(rx_go.EndWith(2)).Subscribe()
  1. SkipUntilCtx - skips items emitted by the Observable until a ctx not done
rx_go.From([]int{1, 2, 3}...).Pipe(
	rx_go.SkipUntilCtx[int](ctx),
).Subscribe()
  1. SkipUntil - skips items emitted by the Observable until a second Observable emits an item(at least one).
rx_go.From([]int{1, 2, 3}...).Pipe(
    rx_go.AfterCtx[int](ctx),
    rx_go.SkipUntil[int, int](rx_go.Of(1).Pipe(rx_go.Do(func(value int) {
        cancelCtx()
    }))),
).Subscribe()
  1. Finally - do action before closing of observer(last value already emitted but observer not completed yet)
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Finally[int](func() {
	done = true
})).Subscribe()
  1. ElementAt - emit single value from observable which contains element on this position
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.ElementAt[int](1)).Subscribe()
  1. Find - only the first value emitted by the source Observable that meets some condition.
rx_go.From([]int{1, 2, 3}...).Pipe(rx_go.Find(func(t int) bool {
	return t == 3
})).Subscribe()
  1. InitialDelay - emit values with initial delay
rx_go.Of[int](1).Pipe(rx_go.InitialDelay[int](time.Second)).Subscribe()