import { Observable, of, Subject } from 'rxjs';
import { delay, switchMap, takeUntil } from 'rxjs/operators';

const INFINITY = -1;
export interface RetryUntilOptions {
  maxAttempts?: number;
  delayTime?: number;
}

export type RetryUntilType = <T>(
  httpRequest$: () => Observable<T>,
  prediction: (obj: T) => boolean,
  options?: RetryUntilOptions,
) => Observable<T>;

/**
 * @description This Observer will call inner httpRequest$ until prediction is true
 * or maxAttempts is reached with incremented delay between calls
 * @param httpRequest$
 * @param prediction
 * @param options { maxAttempts, delayTime }
 */
export const retryUntil: RetryUntilType = (httpRequest$, prediction, options = {}) => {
  return new Observable((result$) => {
    const { maxAttempts, delayTime } = { maxAttempts: INFINITY, delayTime: 300, ...options };
    const _destroyed$ = new Subject<void>();

    let _iteration = 1;

    const _innerSubscriptionCall$ = () =>
      of(null)
        .pipe(
          delay(delayTime),
          switchMap(() => httpRequest$()),
          takeUntil(_destroyed$),
        )
        .subscribe(
          (resp) => {
            if (prediction(resp)) {
              result$.next(resp);
              result$.complete();
            } else {
              if (maxAttempts === INFINITY || _iteration < maxAttempts) {
                _iteration++;
                _innerSubscriptionCall$();
              } else {
                result$.next(resp);
                result$.complete();
              }
            }
          },
          (err) => {
            result$.error(err);
          },
        );

    _innerSubscriptionCall$();

    return function () {
      _destroyed$.next();
      _destroyed$.complete();
    };
  });
};
