import { SbxHttpClient } from '@/core/http';
import { Injectable } from '@angular/core';
import { PickupFile, ProcessInfo } from '@shoobx/types/pieapi-v1';
import { Map } from 'immutable';
import {
  BehaviorSubject,
  Observable,
  Subscriber,
  catchError,
  concatWith,
  debounceTime,
  distinctUntilChanged,
  filter,
  first,
  forkJoin,
  map,
  of,
  repeat,
  retry,
  share,
  startWith,
  switchMap,
  takeWhile,
  throwError,
  timer,
} from 'rxjs';
import { Downgrade } from '../downgrade';

export enum SbxTaskState {
  PENDING = 'PENDING',
  SUCCESS = 'DONE',
  FAILURE = 'FAILED',
}

export interface SbxTaskResult {
  state: SbxTaskState;
  result?: string;
  id?: string;
}

interface SbxTaskResponse {
  id: string;
  successful: boolean;
  failed: boolean;
  ready: boolean;
  state: SbxTaskState;
  result?: PickupFile | ProcessInfo | string;
  exception?: string;
}

interface SbxTasksResponse {
  tasks: SbxTaskResponse[];
}

interface SbxStartedTasksResponse {
  startedTasks: SbxTaskResponse[];
}

const POLL_INTERVAL = 1500;
const MAX_RETRY = 2;

@Downgrade.Injectable('ngShoobx', 'SbxAsyncTaskService')
@Injectable({
  providedIn: 'root',
})
export class SbxAsyncTaskService {
  private readonly defaultPendingResult: SbxTaskResult = <SbxTaskResult>{
    state: SbxTaskState.PENDING,
    result: null,
  };
  private readonly unknownFailureResult: SbxTaskResult = <SbxTaskResult>{
    state: SbxTaskState.FAILURE,
    result: 'Unknown Failure',
  };
  private readonly taskSubscriptions$: BehaviorSubject<
    Map<string, number | SbxTaskResult>
  > = new BehaviorSubject(Map<string, number | SbxTaskResult>());
  private readonly taskPoll$: Observable<Map<string, number | SbxTaskResult>> =
    this.taskSubscriptions$.pipe(
      debounceTime(10),
      filter((taskSubs: Map<string, number>) => taskSubs.size > 0),
      first(),
      switchMap((taskSubs: Map<string, number>) => {
        const body = { tasks: Array.from(taskSubs.keys() as IterableIterator<string>) };
        return this.sbxHttpClient
          .entity('1')
          .post<SbxTasksResponse>('async-tasks', { params: body })
          .pipe(
            map((response: SbxTasksResponse) =>
              this.createResultsForPolledTasks(taskSubs, response.tasks),
            ),
            catchError(() => throwError(() => new Error('Failed task status poll.'))),
          );
      }),
      retry(MAX_RETRY),
      repeat({
        delay: () => timer(POLL_INTERVAL),
      }),
      share(),
    );

  public constructor(private readonly sbxHttpClient: SbxHttpClient) {}

  private taskStateEnum(task: SbxTaskResponse): SbxTaskState {
    if (task.successful) {
      return SbxTaskState.SUCCESS;
    } else if (task.failed) {
      return SbxTaskState.FAILURE;
    }
    return SbxTaskState.PENDING;
  }

  private createResultsForPolledTasks(
    taskSubs: Map<string, number | SbxTaskResult>,
    polledTasks: SbxTaskResponse[],
  ): Map<string, number | SbxTaskResult> {
    return taskSubs.withMutations((mutMap: Map<string, number | SbxTaskResult>) => {
      polledTasks.forEach((task: SbxTaskResponse) => {
        mutMap.set(task.id, <SbxTaskResult>{
          state: this.taskStateEnum(task),
          result: this.mapResult(task.result || task.exception || null),
          id: task.id,
        });
      });
    });
  }

  private mapResult(result?: PickupFile | ProcessInfo | string): string {
    return (
      (result as ProcessInfo)?.url ||
      (result as PickupFile)?.downloadUrl ||
      result?.toString() ||
      null
    );
  }

  private updateTaskSubCount(taskId: string, fn: (count: number) => number): void {
    const nextSubCounts: Map<string, number | SbxTaskResult> = this.taskSubscriptions$
      .getValue()
      .update(taskId, 0, fn)
      .filter((subCount: number) => subCount > 0) as Map<
      string,
      number | SbxTaskResult
    >;
    this.taskSubscriptions$.next(nextSubCounts);
  }

  private resultOfId$(taskId: string): Observable<SbxTaskResult> {
    return new Observable((subscriber: Subscriber<SbxTaskResult>) => {
      this.updateTaskSubCount(taskId, (count: number) => count + 1);
      return this.taskPoll$
        .pipe(
          // Because it is possible to resubscribe this taskPoll$ with repeatWhen
          // we must be careful to not do anything after we close the observer.
          takeWhile(() => !subscriber.closed),
          map((taskStatuses: Map<string, number | SbxTaskResult>) =>
            taskStatuses.get(taskId),
          ),
          filter((task: number | SbxTaskResult) => Boolean(task)),
        )
        .subscribe({
          next: (taskStatus: SbxTaskResult) => {
            subscriber.next(taskStatus);
            if (taskStatus.state !== SbxTaskState.PENDING) {
              subscriber.complete();
              this.updateTaskSubCount(taskId, (count: number) => count - 1);
            }
          },
          error: () => {
            subscriber.next(this.unknownFailureResult);
            subscriber.complete();
          },
        })
        .add(() => {
          this.updateTaskSubCount(taskId, (count: number) => count - 1);
        });
    }).pipe(
      startWith(this.defaultPendingResult),
      distinctUntilChanged(
        (task1: SbxTaskResult, task2: SbxTaskResult) => task1.state === task2.state,
      ),
    );
  }

  public resultOfId(taskId: string): Promise<string> {
    return new Promise(
      (
        resolve: (value: string | PromiseLike<string>) => void,
        reject: (value?: string) => void,
      ) =>
        this.resultOfId$(taskId).subscribe((result: SbxTaskResult) => {
          if (result.state === SbxTaskState.SUCCESS) {
            resolve(result.result);
          }
          if (result.state === SbxTaskState.FAILURE) {
            reject(result.result);
          }
        }),
    );
  }

  public resultOfAllIds(taskIds: string[]): Observable<SbxTaskResult> {
    if (taskIds.length === 0) {
      return of(<SbxTaskResult>{ state: SbxTaskState.SUCCESS });
    }
    const allResults$: Observable<SbxTaskResult>[] = taskIds.map((taskId: string) =>
      this.resultOfId$(taskId),
    );
    return forkJoin(allResults$, (...allResults) => {
      return <SbxTaskResult>{
        state: allResults.reduce((accum: SbxTaskState, { state }) => {
          // State will never be pending (only completed states) since we used forkJoin.
          if (accum === SbxTaskState.FAILURE || state === SbxTaskState.FAILURE) {
            return SbxTaskState.FAILURE;
          }
          return accum;
        }, SbxTaskState.SUCCESS),
      };
    }).pipe(startWith(this.defaultPendingResult));
  }

  public startTaskEndpoint(url: string, body?: unknown): Observable<SbxTaskResult> {
    return this.sbxHttpClient
      .fullUrl()
      .post<SbxStartedTasksResponse>(url, { params: body })
      .pipe(
        switchMap((response: SbxStartedTasksResponse) => {
          let status$: Observable<SbxTaskResult>;
          if (response.startedTasks.length === 1) {
            // If there was only one task, lets use .resultOfId() to preserve results.
            status$ = this.resultOfId$(response.startedTasks[0].id);
          } else {
            status$ = this.resultOfAllIds(
              response.startedTasks.map((task: SbxTaskResponse) => task.id),
            );
          }
          return status$.pipe(concatWith(of(null)));
        }),
        catchError(() => of(this.unknownFailureResult)),
        startWith(this.defaultPendingResult),
        takeWhile((task: SbxTaskResult) => Boolean(task)),
        distinctUntilChanged(
          (task1: SbxTaskResult, task2: SbxTaskResult) => task1.state === task2.state,
        ),
      );
  }
}
