import { webSocket } from 'rxjs/webSocket'; import { of, timer, concatMap, EMPTY, takeWhile, concat } from 'rxjs'; import { filter, buffer, map, tap, retryWhen, retry, delay, take, catchError } from 'rxjs/operators'; import { v4 as uuid } from 'uuid'; export class RealTimeAPI { constructor(param, onOpenCallback, onCloseCallback, onRetryCallback) { this.webSocket = webSocket({ ...param, openObserver: { next: () => { this.onOpen(); }, }, closeObserver: { next: () => { this.onClose(); }, }, }); this.retryCount = 0; this.onOpenCallback = onOpenCallback; this.onCloseCallback = onCloseCallback; this.onErrorCallback = onRetryCallback; } onOpen() { console.log( `%c WebSocket connection opened `, 'background:#41b883 ; padding: 1px; border-radius: 3px; color: #fff', ); if (this.onOpenCallback) { this.onOpenCallback(); } } onClose() { console.log( `%c WebSocket connection closed `, 'background:#35495e ; padding: 1px; border-radius: 3px; color: #fff' ); if (this.onCloseCallback) { this.onCloseCallback(this.retryCount); } } onRetry(i) { this.retryCount = i; if (this.onErrorCallback) { this.onErrorCallback(i); } } getObservable() { return this.webSocket.pipe( // retry(10) retry({ count: 20, // delay: 3000, delay: (errors, index) => { this.onRetry(index); return timer(1000 * 2 * index); // linearly, but exponentially better : 2 ** index }, resetOnSuccess: true, }), catchError((error) => { this.retryCount = 0; this.onRetry(-1); console.log( `%c All retries exhausted `, 'background:#fb923c ; padding: 1px; border-radius: 3px; color: #fff' ); return EMPTY; }) ); } disconnect() { return this.webSocket.unsubscribe(); } onMessage(messageHandler) { this.subscribe(messageHandler, undefined, undefined); } onError(errorHandler) { this.subscribe(undefined, errorHandler, undefined); } onCompletion(completionHandler) { this.subscribe(undefined, undefined, completionHandler); } subscribe(messageHandler, errorHandler, completionHandler) { // this.getObservable().subscribe( // messageHandler, // errorHandler, // completionHandler // ); this.getObservable().subscribe({ next: messageHandler, error: errorHandler, complete: completionHandler, }); } sendMessage(messageObject) { this.webSocket.next(messageObject); } getObservableFilteredByMessageType(messageType) { return this.getObservable().pipe(filter((message) => message.msg === messageType)); } getObservableFilteredByID(id) { return this.getObservable().pipe(filter((message) => message.id === id)); } connectToServer() { this.sendMessage({ msg: 'connect', version: '1', support: ['1', 'pre2', 'pre1'], }); return this.getObservableFilteredByMessageType('connected'); } keepAlive() { return this.getObservableFilteredByMessageType('ping').pipe(tap(() => this.sendMessage({ msg: 'pong' }))); } callMethod(method, ...params) { let id = 'uuid()'; this.sendMessage({ msg: 'method', method, id, params, }); return this.getObservableFilteredByID(id); } getSubscription(streamName, streamParam, addEvent) { let id = 'uuid()'; let subscription = this.webSocket.multiplex( () => ({ msg: 'sub', id: id, name: streamName, params: [streamParam, addEvent], }), () => ({ msg: 'unsub', id: id, }), (message) => typeof message.collection === 'string' && message.collection === streamName && message.fields.eventName === streamParam ); return subscription; } }