You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
178 lines
3.9 KiB
JavaScript
178 lines
3.9 KiB
JavaScript
1 year ago
|
import {
|
||
|
webSocket,
|
||
|
} from "rxjs/webSocket";
|
||
|
import { filter, buffer, flatMap, merge, map, tap } from "rxjs/operators";
|
||
|
// import { v4 as uuid } from "uuid";
|
||
|
import { SHA256 } from "crypto-js";
|
||
|
|
||
|
export class RealTimeAPI {
|
||
|
constructor(param) {
|
||
|
this.webSocket = webSocket(param);
|
||
|
}
|
||
|
|
||
|
getObservable() {
|
||
|
return this.webSocket;
|
||
|
}
|
||
|
|
||
|
disconnect() {
|
||
|
return this.webSocket.unsubscribe();
|
||
|
}
|
||
|
|
||
|
onMessage(messageHandler) {
|
||
|
this.subscribe(messageHandler, undefined, undefined);
|
||
|
}
|
||
|
|
||
|
onError(errorHandler) {
|
||
|
this.subscribe(undefined, errorHandler, undefined);
|
||
|
}
|
||
|
|
||
|
onCompletion(completionHandler) {
|
||
|
this.subscribe(undefined, undefined, completionHandler);
|
||
|
}
|
||
|
|
||
|
subscribe(messageHandler, errorHandler, completionHandler) {
|
||
|
// this.getObservable().subscribe(
|
||
|
// messageHandler,
|
||
|
// errorHandler,
|
||
|
// completionHandler
|
||
|
// );
|
||
|
this.getObservable().subscribe({
|
||
|
next: messageHandler,
|
||
|
error: errorHandler,
|
||
|
complete: completionHandler
|
||
|
});
|
||
|
}
|
||
|
|
||
|
sendMessage(messageObject) {
|
||
|
this.webSocket.next(messageObject);
|
||
|
}
|
||
|
|
||
|
getObservableFilteredByMessageType(messageType) {
|
||
|
return this.getObservable().pipe(
|
||
|
filter((message) => message.msg === messageType)
|
||
|
);
|
||
|
}
|
||
|
|
||
|
getObservableFilteredByID(id) {
|
||
|
return this.getObservable().pipe(
|
||
|
filter((message) => message.id === id)
|
||
|
);
|
||
|
}
|
||
|
|
||
|
connectToServer() {
|
||
|
this.sendMessage({
|
||
|
msg: "connect",
|
||
|
version: "1",
|
||
|
support: ["1", "pre2", "pre1"]
|
||
|
});
|
||
|
return this.getObservableFilteredByMessageType("connected");
|
||
|
}
|
||
|
|
||
|
keepAlive() {
|
||
|
return this.getObservableFilteredByMessageType("ping").pipe(
|
||
|
tap(() => this.sendMessage({ msg: "pong" }))
|
||
|
);
|
||
|
}
|
||
|
|
||
|
login(username, password) {
|
||
|
let id = 'uuid()';
|
||
|
let usernameType = username.indexOf("@") !== -1 ? "email" : "username";
|
||
|
this.sendMessage({
|
||
|
msg: "method",
|
||
|
method: "login",
|
||
|
id: id,
|
||
|
params: [
|
||
|
{
|
||
|
user: { [usernameType]: username },
|
||
|
password: {
|
||
|
digest: SHA256(password).toString(),
|
||
|
algorithm: "sha-256"
|
||
|
}
|
||
|
}
|
||
|
]
|
||
|
});
|
||
|
return this.getLoginObservable(id);
|
||
|
}
|
||
|
|
||
|
loginWithAuthToken(authToken) {
|
||
|
let id = 'uuid()';
|
||
|
this.sendMessage({
|
||
|
msg: "method",
|
||
|
method: "login",
|
||
|
id: id,
|
||
|
params: [{ resume: authToken }]
|
||
|
});
|
||
|
return this.getLoginObservable(id);
|
||
|
}
|
||
|
|
||
|
loginWithOAuth(credToken, credSecret) {
|
||
|
let id = 'uuid()';
|
||
|
this.sendMessage({
|
||
|
msg: "method",
|
||
|
method: "login",
|
||
|
id: id,
|
||
|
params: [
|
||
|
{
|
||
|
oauth: {
|
||
|
credentialToken: credToken,
|
||
|
credentialSecret: credSecret
|
||
|
}
|
||
|
}
|
||
|
]
|
||
|
});
|
||
|
return this.getLoginObservable(id);
|
||
|
}
|
||
|
|
||
|
getLoginObservable(id) {
|
||
|
let resultObservable = this.getObservableFilteredByID(id);
|
||
|
let resultId;
|
||
|
|
||
|
let addedObservable = this.getObservable().pipe(
|
||
|
buffer(
|
||
|
resultObservable.pipe(
|
||
|
map(({ msg, error, result }) => {
|
||
|
if (msg === "result" && !error) return (resultId = result.id);
|
||
|
})
|
||
|
)
|
||
|
),
|
||
|
flatMap(x => x),
|
||
|
filter(({ id: msgId }) => resultId !== undefined && msgId === resultId),
|
||
|
merge(resultObservable)
|
||
|
);
|
||
|
|
||
|
return addedObservable;
|
||
|
}
|
||
|
|
||
|
callMethod(method, ...params) {
|
||
|
let id = 'uuid()';
|
||
|
this.sendMessage({
|
||
|
msg: "method",
|
||
|
method,
|
||
|
id,
|
||
|
params
|
||
|
});
|
||
|
return this.getObservableFilteredByID(id);
|
||
|
}
|
||
|
|
||
|
getSubscription(streamName, streamParam, addEvent) {
|
||
|
let id = 'uuid()';
|
||
|
let subscription = this.webSocket.multiplex(
|
||
|
() => ({
|
||
|
msg: "sub",
|
||
|
id: id,
|
||
|
name: streamName,
|
||
|
params: [streamParam, addEvent]
|
||
|
}),
|
||
|
() => ({
|
||
|
msg: "unsub",
|
||
|
id: id
|
||
|
}),
|
||
|
(message) =>
|
||
|
typeof message.collection === "string" &&
|
||
|
message.collection === streamName &&
|
||
|
message.fields.eventName === streamParam
|
||
|
);
|
||
|
return subscription;
|
||
|
}
|
||
|
}
|