Skip to content

Commit

Permalink
More operators
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 2, 2017
1 parent 10f4824 commit 0399a07
Show file tree
Hide file tree
Showing 18 changed files with 1,321 additions and 25 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ and interactive dataflows.**

```groovy
dependencies {
compile 'com.github.akarnokd:ixjava:1.0.0-RC3'
compile 'com.github.akarnokd:ixjava:1.0.0-RC5'
}
```

**ivy**

```xml
<dependencies>
<dependency org="com.github.akarnokd" name="ixjava" rev="1.0.0-RC3" />
<dependency org="com.github.akarnokd" name="ixjava" rev="1.0.0-RC5" />
</dependencies>
```

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.0.0-RC4
version=1.0.0-RC5
11 changes: 11 additions & 0 deletions src/main/java/ix/GroupedIx.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
*/
public abstract class GroupedIx<K, V> extends Ix<V> {

/**
* The group key.
*/
protected final K key;

/**
* Constructs a GroupedIx with the given group key.
* @param key the group key
*/
public GroupedIx(K key) {
this.key = key;
}

/**
* Returns this group's key.
* @return the key
*/
public final K key() {
return key;
}
Expand Down
146 changes: 146 additions & 0 deletions src/main/java/ix/Ix.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,58 @@ public static <T> Ix<T> mergeArray(Iterable<? extends T>... sources) {
return concatArray(sources); // concat and merge are the same in the Iterable world
}

/**
* Merges self-comparable items from an Iterable sequence of Iterable sequences, picking
* the smallest item from all those inner Iterables until all sources complete.
* @param <T> the value type
* @param sources the Iterable sequence of Iterables of self-comparable items
* @return the new Ix instance
* @since 1.0
*/
public static <T extends Comparable<? super T>> Ix<T> orderedMerge(Iterable<? extends Iterable<? extends T>> sources) {
return orderedMerge(sources, SelfComparator.INSTANCE);
}

/**
* Merges items from an Iterable sequence of Iterable sequences, picking
* the smallest item (according to a custom comparator) from all those inner
* Iterables until all sources complete.
* @param <T> the value type
* @param sources the Iterable sequence of Iterables
* @param comparator the comparator to compare items and pick the one that returns negative will be picked
* @return the new Ix instance
* @since 1.0
*/
public static <T> Ix<T> orderedMerge(Iterable<? extends Iterable<? extends T>> sources, Comparator<? super T> comparator) {
return new IxOrderedMergeIterable<T>(nullCheck(sources, "sources is null"), nullCheck(comparator, "comparator is null"));
}

/**
* Merges self-comparable items from an Iterable sequence of Iterable sequences, picking
* the smallest item from all those inner Iterables until all sources complete.
* @param <T> the value type
* @param sources the Iterable sequence of Iterables of self-comparable items
* @return the new Ix instance
* @since 1.0
*/
public static <T extends Comparable<? super T>> Ix<T> orderedMergeArray(Iterable<? extends T>... sources) {
return orderedMergeArray(SelfComparator.INSTANCE, sources);
}

/**
* Merges items from an array of Iterable sequences, picking
* the smallest item (according to a custom comparator) from all those inner
* Iterables until all sources complete.
* @param <T> the value type
* @param sources the Iterable sequence of Iterables
* @param comparator the comparator to compare items and pick the one that returns negative will be picked
* @return the new Ix instance
* @since 1.0
*/
public static <T> Ix<T> orderedMergeArray(Comparator<? super T> comparator, Iterable<? extends T>... sources) {
return new IxOrderedMergeArray<T>(nullCheck(sources, "sources is null"), nullCheck(comparator, "comparator is null"));
}

/**
* Emits a range of incrementing integer values, starting from {@code start} and
* up to {@code count} times.
Expand All @@ -420,6 +472,43 @@ public static Ix<Integer> range(int start, int count) {
return new IxRange(start, count);
}

/**
* Prevents the downstream from calling remove() and throws
* an UnsupportedOperationException instead.
* @return the new Ix instance
* @see #readOnly(boolean)
* @since 1.0
*/
public final Ix<T> readOnly() {
return new IxReadOnly<T>(this, false);
}

/**
* Prevents the downstream from calling remove() by optionally
* ignoring it or throwing an UnsupportedOperationException.
* @param silent if true, remove() calls are ignored; if false,
* remove() calls throw an UnsupportedOperationException
* @return the new Ix instance
* @since 1.0
*/
public final Ix<T> readOnly(boolean silent) {
return new IxReadOnly<T>(this, silent);
}

/**
* Repeatedly calls the given callable indefinitely and
* emits the returned value.
* <p>
* The result's iterator() doesn't support remove().
* @param <T> the value type
* @param callable the callable to call
* @return the new Ix instance
* @since 1.0
*/
public static <T> Ix<T> repeatCallable(Callable<T> callable) {
return new IxRepeatCallable<T>(nullCheck(callable, "callable is null"));
}

/**
* Repeats the given value indefinitely.
* <p>
Expand Down Expand Up @@ -738,6 +827,52 @@ public final Ix<List<T>> buffer(int size, int skip) {
return new IxBufferOverlap<T>(this, positive(size, "size"), positive(skip, "skip"));
}

/**
* Buffer until an item is encountered for which the predicate returns true,
* triggering a new buffer.
* <p>Neither the previous nor the next buffer will contain the item that caused the
* split
* @param predicate the predicate called with each item and should return false
* to trigger a new buffer
* @return the new Ix instance
* @see #bufferUntil(IxPredicate)
* @see #bufferWhile(IxPredicate)
* @since 1.0
*/
public final Ix<List<T>> bufferSplit(IxPredicate<? super T> predicate) {
return new IxBufferSplit<T>(this, nullCheck(predicate, "predicate is null"));
}

/**
* Buffer until an item is encountered after which the predicate returns true
* to start a new buffer.
* <p>The item will be part of the previous buffer.
* @param predicate the predicate called with each item after the item
* has been added to the current buffer and should return true to start a new buffer
* @return the new Ix instance
* @see #bufferSplit(IxPredicate)
* @see #bufferWhile(IxPredicate)
* @since 1.0
*/
public final Ix<List<T>> bufferUntil(IxPredicate<? super T> predicate) {
return new IxBufferUntil<T>(this, nullCheck(predicate, "predicate is null"));
}

/**
* Buffer while an item is encountered before which the predicate returns false
* to start a new buffer.
* <p>The item will be part of the next buffer
* @param predicate the predicate called with each item after the item
* has been added to the current buffer and should return true to start a new buffer
* @return the new Ix instance
* @see #bufferSplit(IxPredicate)
* @see #bufferUntil(IxPredicate)
* @since 1.0
*/
public final Ix<List<T>> bufferWhile(IxPredicate<? super T> predicate) {
return new IxBufferWhile<T>(this, nullCheck(predicate, "predicate is null"));
}

/**
* Cast the elements to the specified class.
* <p>
Expand Down Expand Up @@ -1095,6 +1230,17 @@ public final Ix<T> endWith(T... values) {
return concat(this, fromArray(values));
}

/**
* Emit every Nth item only from upstream.
* <p>Example: Ix.range(1, 5).every(2) will yield {2, 4}.
* @param nth how many items to skip + 1
* @return the new Ix instance
* @since 1.0
*/
public final Ix<T> every(int nth) {
return new IxEvery<T>(this, positive(nth, "nth"));
}

/**
* Emits distinct elements from this and the other Iterable which are not
* in the other sequence (i.e., (A union B) minus (A intersection B)).
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/ix/IxBufferSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2011-2016 David Karnok
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ix;

import java.util.*;

/**
* Split into buffers when the predicate returns true and neither
* buffer contains the item.
*
* @param <T> the value type
*/
final class IxBufferSplit<T> extends IxSource<T, List<T>> {

final IxPredicate<? super T> predicate;

IxBufferSplit(Iterable<T> source, IxPredicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}

@Override
public Iterator<List<T>> iterator() {
return new BufferSplitIterator<T>(source.iterator(), predicate);
}

static final class BufferSplitIterator<T> implements Iterator<List<T>> {

final Iterator<T> source;

final IxPredicate<? super T> predicate;

boolean done;

List<T> buffer;

BufferSplitIterator(Iterator<T> source, IxPredicate<? super T> predicate) {
this.source = source;
this.predicate = predicate;
}

@Override
public boolean hasNext() {
List<T> b = buffer;
if (b == null) {
if (done) {
return false;
}

b = new ArrayList<T>();

Iterator<T> src = source;
while (src.hasNext()) {
T v = src.next();

if (predicate.test(v)) {
buffer = b;
return true;
}

b.add(v);
}

if (b.isEmpty()) {
done = true;
return false;
}

buffer = b;
}
return true;
}

@Override
public List<T> next() {
if (hasNext()) {
List<T> b = buffer;
buffer = null;
return b;
}
throw new NoSuchElementException();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
Loading

0 comments on commit 0399a07

Please sign in to comment.