import { Action } from '@app/ion-sockets/action.enum';
import { Observable } from 'rxjs';
import { filter, switchMap } from 'rxjs/operators';

const { PUB, SUB, UNSUB } = Action;

export class SocketStream {
  private _identifier: string;

  constructor(
    identifier: string,
    private socketConnection: Observable<any>,
  ) {
    this._identifier = identifier;
  }

  listen() {
    return this.socketConnection
      .pipe(switchMap((connection) => {
        return connection.multiplex(
          () => ({ action: SUB, topic: this._identifier }),
          () => ({ action: UNSUB, topic: this._identifier }),
          message => {
            return message['_identifier'] === this._identifier;
          });
      }));
  }

  sendMessage(data = {}) {
    this.socketConnection
      .subscribe(connection => {
        connection.next({
          action: PUB,
          topic: this._identifier,
          data: {
            _identifier: this._identifier,
            ...data,
          },
        });
      });
  }
}
