import {Inject, Injectable} from '@angular/core';
import {BehaviorSubject, interval, Observable, Subject} from 'rxjs';
import {WebsocketMessage} from './websocket-message';
import {WebSocketSubject, WebSocketSubjectConfig} from 'rxjs/internal-compatibility';
import {filter, first, map, skip, takeWhile} from 'rxjs/operators';

export class WebsocketService {

  protected websocket$: WebSocketSubject<WebsocketMessage<any>>;
  protected wsMessages$: Subject<WebsocketMessage<any>>;

  private reconnectInterval = 1000;
  private reconnectAttempts = 5;

  public onConnectionIsOpen = new BehaviorSubject<Event>(null);

  private wsConfig = {
    closeObserver: {
      next: (event: CloseEvent) => {
        if (event.code === 1006) {
          this.reconnect();
        } else {
          this.websocket$ = null;
          console.log('WebSocket closed!', event);
        }
      }
    },
    openObserver: this.onConnectionIsOpen
  }

  private readonly urlData = null;

  constructor(private url: string) {
    this.urlData = url;
  }

  public connect() {
    this.wsMessages$ = new Subject<WebsocketMessage<any>>();
    this.websocket$ = new WebSocketSubject(Object.assign(this.wsConfig, {
      url: this.urlData
    }));

    this.onConnectionIsOpen.pipe(
      filter(item => !!(item)),
      first()
    ).subscribe(event => {
      console.log('WebSocket connected!', event);
    })
    this.websocket$.subscribe(
      (message) => this.wsMessages$.next(message),
      (error: Event) => {
        if (!this.websocket$) {
          this.reconnect();
        }
      });
  }

  private reconnect(): void {
    interval(this.reconnectInterval)
      .pipe(
        takeWhile((v, index) =>
          index < this.reconnectAttempts && !this.websocket$)
      ).subscribe(
      () => this.connect(),
      () => null,
      () => {
        if (!this.websocket$) {
          this.wsMessages$.complete();
        }
      }
    );
  }

  protected send(event: string, data: any = {}): void {
    if (event) {
      this.websocket$.next({event_type: event, payload: data} as any);
    } else {
      console.error('Send error!');
    }
  }

  protected sendToken(data: any = {}): void {
    if (data) {
      this.websocket$.next(data as any);
    } else {
      console.error('Auth error!');
    }
  }

  /*
  * on message event
  * */
  protected on<T>(event: string): Observable<T> {
    if (event) {
      return this.wsMessages$.pipe(
        filter((message: WebsocketMessage<T>) => {
          return message.requested_action ? (message.requested_action === event) : (message.broadcast_message_type === event);
        }),
        map((message: WebsocketMessage<T>) => message.data)
      );
    }
  }

  public disconnect() {
    if (this.websocket$) {
      this.websocket$.unsubscribe();
      this.websocket$ = null;
    }
    if (this.wsMessages$) {
      this.wsMessages$.unsubscribe();
      this.wsMessages$ = null;
    }
  }
}
