Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: IObservableCache.TransformMany with force transform param #899

Open
Metadorius opened this issue Apr 22, 2024 · 2 comments
Open

Comments

@Metadorius
Copy link
Contributor

Metadorius commented Apr 22, 2024

Describe the functionality desired 馃悶

Currently only Transform has an overload which supports refreshing the derived cache with another observable. This is very useful when doing a transform which depends on external to the cache data which can be changed, thus invalidating the derived cache data, but TransformMany doesn't have a similar override.

Considerations

Refreshing the original source on external data changes may be not desirable due to other effects and also isn't obvious

@dwcullop
Copy link
Member

I'm assuming you're talking about the IObservable<Unit> forceTransform parameter found in this overload.

IObservable<IChangeSet<TDestination, TKey>> Transform<TDestination, TSource, TKey>(this IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, TDestination> transformFactory, IObservable<Unit> forceTransform)

It's probably missing from TransformMany because of performance. Having to re-transform every single value is expensive and it would be worse if each value transforms into a collection of values.

My suggestion would be to check out one of the new operators such as TransformManyAsync or MergeManyChangeSets. TransformManyAsync allows you to transform each value into an ObservableCollection or a SourceCache of values which all all merged together.

MergeManyChangeSets allows you to transform each value into a ChangeSet, which is all merged together.

When your external data is updated, it can update the changeset, and then results will automatically be propagated.

This will be a lot more performant that just transforming every single value over and over again.

@Metadorius
Copy link
Contributor Author

In my case I don't really care about performance this much since I am evaluating this once when the source values are edited from UI, and I had to introduce another observable stream to update because conditions which impact calculation of TransformMany have changed.

What I did in the end was to snatch the refresh logic and introduce it as a separate .RefreshOn(IObservable<> refresher) operator which I can use wherever I need. Works flawlessly and I think could be a good addition to the library.

    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source,
            IObservable<Func<TSource, TKey, bool>> refresher) 
        where TSource : notnull
        where TKey : notnull
        => Observable.Create<IChangeSet<TSource, TKey>>(observer =>
        {
            var locker = new object();
            var shared = source.Synchronize(locker).Publish();

            // capture all items so we can apply a forced transform
            var cache = new Dictionary<TKey, TSource>();
            var cacheLoader = shared.Subscribe(changes =>
            {
                foreach (var item in (changes as ChangeSet<TSource, TKey>)!)
                {
                    switch (item.Reason)
                    {
                        case ChangeReason.Update:
                        case ChangeReason.Add:
                            cache[item.Key] = item.Current;
                            break;

                        case ChangeReason.Remove:
                            cache.Remove(item.Key);
                            break;
                    }
                }
            });

            // create change set of items where force refresh is applied
            var refreshes = refresher.Synchronize(locker)
                .Select(selector => cache
                    .Where(kvp => selector(kvp.Value, kvp.Key))
                    .Select(kvp => new Change<TSource, TKey>(ChangeReason.Refresh, kvp.Key, kvp.Value)))
                .Select(changes => new ChangeSet<TSource, TKey>(changes)).NotEmpty();

            var sourceAndRefreshes = shared.Merge(refreshes);
            
            return new CompositeDisposable(cacheLoader, sourceAndRefreshes.SubscribeSafe(observer), shared.Connect());
        });
    
    
    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source, 
            IObservable<Func<TSource, bool>> refresher)
        where TSource : notnull
        where TKey : notnull
        => source.RefreshOn(refresher.Select(selector => new Func<TSource, TKey, bool>((value, _) => selector(value))));
    
    public static IObservable<IChangeSet<TSource, TKey>> RefreshOn<TSource, TKey>(
            this IObservable<IChangeSet<TSource, TKey>> source,
            IObservable<Unit> refresher)
        where TSource : notnull
        where TKey : notnull
        => source.RefreshOn(refresher.Select(_ => new Func<TSource, TKey, bool>((_, _) => true)));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants