import angular from 'angular';
import { Record, Map } from 'immutable';
import { BehaviorSubject, Observable, forkJoin, of } from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  takeWhile,
  map,
  delay,
  concatWith,
  debounceTime,
  retry,
  startWith,
  switchMap,
  first,
  repeatWhen,
  share,
} from 'rxjs/operators';

/**
 * @ngdoc object
 * @name sb.lib.tasks.object:AsyncTasks
 *
 * @description
 * This service is responsible for launching tasks and keeping track of
 * outstanding/failed tasks. AsyncTasks automatically polls.
 *
 * The service emits immutable records of datatype `TaskResult`:
 *   @property {string} state "Status" of the task (see constants below).
 *   @property {any} [result=null] The result of the task. Will be the exception
 *     on failure case and the result on success (if the endpoint offers these).
 */
export const AsyncTasks = [
  '$observable',
  'BackendLocation',
  function ($observable, BackendLocation) {
    const POLL_INTERVAL = 1500;
    const MAX_RETRY = 2;
    const START_AJAX_BASE_OPTS = Object.freeze({ method: 'POST', timeout: 20000 });
    const POLL_AJAX_BASE_OPTS = Object.assign(
      { url: `${BackendLocation.entity(1)}async-tasks/` },
      START_AJAX_BASE_OPTS,
    );

    const PENDING_STATE = 'PENDING';
    const SUCCESS_STATE = 'DONE';
    const FAILURE_STATE = 'FAILED';
    const TaskResult = Record({ state: PENDING_STATE, result: null });
    const defaultPendingResult = new TaskResult();
    const unknownFailureResult = new TaskResult({
      state: FAILURE_STATE,
      result: 'Unknown Failure',
    });

    function taskStateEnum(task) {
      if (task.successful) {
        return SUCCESS_STATE;
      } else if (task.failed) {
        return FAILURE_STATE;
      }
      return PENDING_STATE;
    }

    function createResultsForPolledTasks(taskSubs, polledTasks) {
      return taskSubs.withMutations((mutMap) => {
        polledTasks.forEach((task) => {
          mutMap.set(
            task.id,
            new TaskResult({
              state: taskStateEnum(task),
              result: task.result || task.exception || null,
            }),
          );
        });
      });
    }

    function updateTaskSubCount(taskId, fn) {
      const nextSubCounts = taskSubscriptions$
        .getValue()
        .update(taskId, 0, fn)
        .filter((subCount) => subCount > 0);
      taskSubscriptions$.next(nextSubCounts);
    }

    const taskSubscriptions$ = new BehaviorSubject(Map());
    const taskPoll$ = taskSubscriptions$.pipe(
      debounceTime(10),
      filter((taskSubs) => taskSubs.size > 0),
      first(),
      switchMap((taskSubs) => {
        const body = { tasks: Array.from(taskSubs.keys()) };
        return $observable.sbAjax(Object.assign({ body }, POLL_AJAX_BASE_OPTS)).pipe(
          map(({ status, response }) => {
            if (status !== 200) {
              throw new Error('Failed task status poll.');
            }
            return createResultsForPolledTasks(taskSubs, response.tasks);
          }),
        );
      }),
      retry(MAX_RETRY),
      repeatWhen((notifications) => notifications.pipe(delay(POLL_INTERVAL))),
      share(),
    );

    return {
      /**
       * @ngdoc method
       * @name resultOfId
       * @methodOf sb.lib.tasks.AsyncTasks
       *
       * @param {string} taskId Standard task ID to get the result of.
       *
       * @returns {observable<TaskResult>} Returns an observable that emits
       *   the task status as a `TaskResult`.
       */
      resultOfId(taskId) {
        return new Observable((observer) => {
          updateTaskSubCount(taskId, (count) => count + 1);
          return taskPoll$
            .pipe(
              // Because it is possible to resubscribe this taskPoll$ with repeatWhen
              // we must be careful to not do anything after we close the observer.
              takeWhile(() => !observer.closed),
              map((taskStatuses) => taskStatuses.get(taskId)),
              // The results of taskPoll$ might be slightly outdated since once the AJAX is outstanding
              // we cannot cancel it, so we should make sure we have something to report on.
              filter(angular.identity),
            )
            .subscribe(
              (taskStatus) => {
                observer.next(taskStatus);
                if (taskStatus.state !== PENDING_STATE) {
                  observer.complete();
                }
              },
              () => {
                observer.next(unknownFailureResult);
                observer.complete();
              },
            )
            .add(() => {
              updateTaskSubCount(taskId, (count) => count - 1);
            });
        }).pipe(
          startWith(defaultPendingResult),
          distinctUntilChanged((task1, task2) => task1.state === task2.state),
        );
      },

      /**
       * @ngdoc method
       * @name resultOfAllIds
       * @methodOf sb.lib.tasks.AsyncTasks
       *
       * @param {array<string>} taskIds List of task IDs to get the _cumulative_ status of.
       *
       * @returns {observable<TaskResult>} Returns an observable that emits
       *   the status of all the tasks as a `TaskResult`.
       */
      resultOfAllIds(taskIds) {
        if (taskIds.length === 0) {
          return of(new TaskResult({ state: SUCCESS_STATE }));
        }
        const allResults$ = taskIds.map((taskId) => this.resultOfId(taskId));
        return forkJoin(allResults$, (...allResults) => {
          return new TaskResult({
            state: allResults.reduce((accum, { state }) => {
              // State will never be pending (only completed states) since we used forkJoin.
              if (accum === FAILURE_STATE || state === FAILURE_STATE) {
                return FAILURE_STATE;
              }
              return accum;
            }, SUCCESS_STATE),
          });
        }).pipe(startWith(defaultPendingResult));
      },

      /**
       * @ngdoc method
       * @name startTaskEndpoint
       * @methodOf sb.lib.tasks.AsyncTasks
       *
       * @param {string} url The url of the endpoint to interact with. It is expected
       *   to be a webapi with `@taskendpoint` decoration/behavior.
       * @param {object} [body=undefined] Extra data to send to the api with the request.
       *
       * @returns {observable<TaskResult>} Returns an observable that emits
       *   the status of all the tasks started by the endpoint.
       */
      startTaskEndpoint(url, body) {
        return $observable
          .sbAjax(Object.assign({ url, body }, START_AJAX_BASE_OPTS))
          .pipe(
            switchMap(({ response, status }) => {
              let status$;
              const is200 = status === 200;
              if (is200 && response.startedTasks.length === 1) {
                // If there was only one task, lets use .resultOfId() to preserve results.
                status$ = this.resultOfId(response.startedTasks[0].id);
              } else if (is200) {
                status$ = this.resultOfAllIds(
                  response.startedTasks.map((task) => task.id),
                );
              } else {
                status$ = of(unknownFailureResult);
              }
              return status$.pipe(concatWith(of(null)));
            }),
            startWith(defaultPendingResult),
            takeWhile(angular.identity),
            distinctUntilChanged((task1, task2) => task1.state === task2.state),
          );
      },
    };
  },
]; // end AsyncTasks
