You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Global-sales/src/channel/realTimeAPI.js

167 lines
4.2 KiB
JavaScript

import { webSocket } from 'rxjs/webSocket';
import { of, timer, concatMap, EMPTY, takeWhile, concat } from 'rxjs';
import { filter, buffer, map, tap, retryWhen, retry, delay, take, catchError } from 'rxjs/operators';
import { v4 as uuid } from 'uuid';
import { logWebsocket } from '@/utils/indexedDB';
export class RealTimeAPI {
constructor(param, onOpenCallback, onCloseCallback, onRetryCallback) {
this.webSocket = webSocket({
...param,
openObserver: {
next: () => {
this.onOpen();
},
},
closeObserver: {
next: () => {
this.onClose();
},
},
});
this.retryCount = 0;
this.onOpenCallback = onOpenCallback;
this.onCloseCallback = onCloseCallback;
this.onErrorCallback = onRetryCallback;
}
onOpen() {
console.log(
`%c WebSocket connection opened `,
'background:#41b883 ; padding: 1px; border-radius: 3px; color: #fff',
);
if (this.onOpenCallback) {
this.onOpenCallback();
}
}
onClose() {
console.log(
`%c WebSocket connection closed `,
'background:#35495e ; padding: 1px; border-radius: 3px; color: #fff'
);
if (this.onCloseCallback) {
this.onCloseCallback(this.retryCount);
}
}
onRetry(i) {
this.retryCount = i;
if (this.onErrorCallback) {
this.onErrorCallback(i);
}
}
getObservable() {
return this.webSocket.pipe(
// retry(10)
retry({
count: 20,
// delay: 3000,
delay: (errors, index) => {
this.onRetry(index);
return timer(1000 * 2 * index); // linearly, but exponentially better : 2 ** index
},
resetOnSuccess: true,
}),
catchError((error) => {
this.retryCount = 0;
this.onRetry(-1);
console.log(
`%c All retries exhausted `,
'background:#fb923c ; padding: 1px; border-radius: 3px; color: #fff'
);
return EMPTY;
})
);
}
disconnect() {
return this.webSocket.unsubscribe();
}
onMessage(messageHandler) {
this.subscribe(messageHandler, undefined, undefined);
}
onError(errorHandler) {
this.subscribe(undefined, errorHandler, undefined);
}
onCompletion(completionHandler) {
this.subscribe(undefined, undefined, completionHandler);
}
subscribe(messageHandler, errorHandler, completionHandler) {
// this.getObservable().subscribe(
// messageHandler,
// errorHandler,
// completionHandler
// );
this.getObservable().subscribe({
next: messageHandler,
error: errorHandler,
complete: completionHandler,
});
}
sendMessage(messageObject) {
// console.log(
// `%c websocket Message OUT ⬆`,
// 'background:#41b883 ; padding: 1px; border-radius: 3px; color: #fff',
// JSON.stringify(messageObject, null, 2),
// );
logWebsocket(messageObject, 'O');
this.webSocket.next(messageObject);
}
getObservableFilteredByMessageType(messageType) {
return this.getObservable().pipe(filter((message) => message.msg === messageType));
}
getObservableFilteredByID(id) {
return this.getObservable().pipe(filter((message) => message.id === id));
}
connectToServer() {
this.sendMessage({
msg: 'connect',
version: '1',
support: ['1', 'pre2', 'pre1'],
});
return this.getObservableFilteredByMessageType('connected');
}
keepAlive() {
return this.getObservableFilteredByMessageType('ping').pipe(tap(() => this.sendMessage({ msg: 'pong' })));
}
callMethod(method, ...params) {
let id = 'uuid()';
this.sendMessage({
msg: 'method',
method,
id,
params,
});
return this.getObservableFilteredByID(id);
}
getSubscription(streamName, streamParam, addEvent) {
let id = 'uuid()';
let subscription = this.webSocket.multiplex(
() => ({
msg: 'sub',
id: id,
name: streamName,
params: [streamParam, addEvent],
}),
() => ({
msg: 'unsub',
id: id,
}),
(message) => typeof message.collection === 'string' && message.collection === streamName && message.fields.eventName === streamParam
);
return subscription;
}
}