import { webSocket } from 'rxjs/webSocket'; import { of, timer, concatMap } from 'rxjs'; import { filter, buffer, map, tap, retryWhen, retry, delay, take, } from 'rxjs/operators'; import { v4 as uuid } from "uuid"; export class RealTimeAPI { constructor(param) { this.webSocket = webSocket(param); } getObservable() { return this.webSocket.pipe( retry({ count: 10, delay: () => timer(1000) }) ); } disconnect() { return this.webSocket.unsubscribe(); } onMessage(messageHandler) { this.subscribe(messageHandler, undefined, undefined); } onError(errorHandler) { this.subscribe(undefined, errorHandler, undefined); } onCompletion(completionHandler) { this.subscribe(undefined, undefined, completionHandler); } subscribe(messageHandler, errorHandler, completionHandler) { // this.getObservable().subscribe( // messageHandler, // errorHandler, // completionHandler // ); this.getObservable().subscribe({ next: messageHandler, error: errorHandler, complete: completionHandler, }); } sendMessage(messageObject) { this.webSocket.next(messageObject); } getObservableFilteredByMessageType(messageType) { return this.getObservable().pipe(filter((message) => message.msg === messageType)); } getObservableFilteredByID(id) { return this.getObservable().pipe(filter((message) => message.id === id)); } connectToServer() { this.sendMessage({ msg: 'connect', version: '1', support: ['1', 'pre2', 'pre1'], }); return this.getObservableFilteredByMessageType('connected'); } keepAlive() { return this.getObservableFilteredByMessageType('ping').pipe(tap(() => this.sendMessage({ msg: 'pong' }))); } callMethod(method, ...params) { let id = 'uuid()'; this.sendMessage({ msg: 'method', method, id, params, }); return this.getObservableFilteredByID(id); } getSubscription(streamName, streamParam, addEvent) { let id = 'uuid()'; let subscription = this.webSocket.multiplex( () => ({ msg: 'sub', id: id, name: streamName, params: [streamParam, addEvent], }), () => ({ msg: 'unsub', id: id, }), (message) => typeof message.collection === 'string' && message.collection === streamName && message.fields.eventName === streamParam ); return subscription; } }