import {Observable} from 'rxjs';

export {collectForTime}

function collectForTime<T>(time: number): (source$: Observable<T>) => Observable<T[]> {
    return (source$) => {
        return new Observable((destination) => {

            let collectedValues = new Array<T>();

            let collecting = false;

            const collect = (value: T) => collectedValues.push(value);

            const startNewCollection = (value: T) => {
                collecting = true;
                collectedValues = [value];

                setTimeout(() => {
                    // end collecting values
                    collecting = false;

                    // emit only when values are present
                    if (collectedValues.length > 0) {
                        destination.next(collectedValues);
                    }

                }, time);
            };

            const checkStep = (v: T) => collecting ? collect(v) : startNewCollection(v);

            source$.subscribe({
                next: checkStep,
                complete: () => {
                    source$.subscribe();
                    destination.complete();
                    destination.unsubscribe();
                },
            });
        });

    }
}
