import { Injectable } from '@angular/core';
import { BehaviorSubject, lastValueFrom, Subject } from 'rxjs';
import { environment } from '../../environments/environment';
import * as SockJS from 'sockjs-client';
import { RxStompConfig } from '@stomp/rx-stomp';

import { ContractsService } from '../modules/contracts/services/contracts.service';
import { RxStompService } from './rx-stomp.service';
import { Constants } from '../classes/constants';
import { AuthService } from './auth.service';
import { GoogleLoginProvider, SocialAuthService } from '@abacritt/angularx-social-login';
import { take } from 'rxjs/operators';
import jwtDecode from 'jwt-decode';

@Injectable({
  providedIn: 'root',
})
export class NewWsService {
  public newNotification: BehaviorSubject<any> = new BehaviorSubject(null);

  public chatOnBoardingWs: BehaviorSubject<any> = new BehaviorSubject(null);

  public newMessage: Subject<any> = new Subject();

  public updateUnreadCount = new Subject<string | void>();

  public operationSuccess: Subject<any> = new Subject<any>();

  public balanceUpdate: Subject<any> = new Subject<any>();

  public taskSubject = new Subject<any>();

  constructor(private contractsService: ContractsService,
              private authService: AuthService,
              private socialAuthService: SocialAuthService,
              public rxStompService: RxStompService) {}

  connect(): void {
    const serverUrl = `${window.location.protocol}//${environment.MAIN_HOST}/ws/connect`;

    const myRxStompConfig: RxStompConfig = {
      webSocketFactory: () =>  new SockJS(serverUrl),
      connectHeaders: { Authorization: this.token },
      reconnectDelay: 5000,
      // @TODO: turn it on if need to debug
      // debug: (msg: string): void => {
        // console.log(new Date(), msg);
      // },
      beforeConnect: this.beforeConnectCb
    };

    this.rxStompService.configure(myRxStompConfig);
    this.rxStompService.activate();

    this.subscribeToStreams();

    this.rxStompService.stompClient.onStompError = (err) => {
        console.log(err, '<<<<<onStompError');
    };

    this.rxStompService.stompClient.onWebSocketClose = (err) => {
        console.log(err, new Date(), '<<<<<onWebSocketClose');
    };
  }

  beforeConnectCb = async (): Promise<void> => {
    if (!localStorage.getItem(Constants.TOKEN_KEY)?.length) {
      this.disconnect();
    }

    const expiresAt = +localStorage.getItem(Constants.EXPIRES_KEY);
    const now = Date.now();

    if (now > expiresAt) {
      const refreshToken = localStorage.getItem(Constants.REFRESHTOKEN_KEY);
      if (refreshToken) {
        try {
          const data = await lastValueFrom(this.authService.refreshToken());
          this.authService.updateToken(data);
          this.rxStompService.stompClient.configure({connectHeaders: { Authorization: this.token}});
          return;
        } catch (err) {
          this.disconnect();
          return;
        }
      } else {
        try {
          await this.socialAuthService.refreshAuthToken(GoogleLoginProvider.PROVIDER_ID);
          this.socialAuthService.authState.pipe(take(1)).subscribe(socialUser => {
            localStorage.setItem(Constants.TOKEN_KEY, socialUser.idToken);

            const tokenDecoded: any = jwtDecode(socialUser.idToken);
            if (tokenDecoded?.exp) {
              const expires_at = tokenDecoded.exp;
              localStorage.setItem(Constants.EXPIRES_KEY, expires_at);
            }
            this.rxStompService.stompClient.configure({connectHeaders: { Authorization: this.token}});
            return;
          });
        } catch (err) {
          this.disconnect();
          return;
        }
      }
    }
    this.rxStompService.stompClient.configure({connectHeaders: { Authorization: this.token}});
  }

  subscribeToStreams(): void {
    this.rxStompService.watch('/user/queue/notifications').subscribe((greeting) => {
      const message = JSON.parse(greeting.body);
      this.newNotification.next(message);
    });

    this.rxStompService.watch('/user/queue/system').subscribe((message) => {
      if (message && message.body) {
        const value = JSON.parse(message.body);
        this.chatOnBoardingWs.next(value);

        if (value.cash) {
          this.balanceUpdate.next(value);
        }
      }
    });

    this.rxStompService.watch('/user/queue/finance').subscribe((message) => {
      if (message && message.body) {
        this.operationSuccess.next(JSON.parse(message.body));
      }
    });

    this.rxStompService.watch('/user/queue/chats').subscribe((message) => {
      if (message && message.body) {
        const value = JSON.parse(message.body);
        this.newMessage.next(value);

        this.updateUnreadCount.next(value?.sender?.id);
      }
    });

    this.rxStompService.watch('/user/queue/tasks').subscribe((message) => {
      if (message && message.body) {
        this.taskSubject.next(JSON.parse(message.body));
      }
    });
  }

  disconnect() {
    this.rxStompService?.stompClient.deactivate();
  }

  get token() {
    return 'Bearer ' + localStorage.getItem(Constants.TOKEN_KEY);
  }
}
