diff --git a/__tests__/rx-sql.test.ts b/__tests__/rx-sql.test.ts index 5911e42..f8d3d5e 100644 --- a/__tests__/rx-sql.test.ts +++ b/__tests__/rx-sql.test.ts @@ -1,7 +1,6 @@ -import { Server, WebSocket } from 'mock-socket'; +import { Server, WebSocket } from "mock-socket"; import { RealTimeAPI } from "../src/RealTimeAPI"; describe("RxSQL Tests", () => { - it("Mock Test", ()=> { - }) -}) \ No newline at end of file + it("Mock Test", () => {}); +}); diff --git a/src/RealTimeAPI.ts b/src/RealTimeAPI.ts index d0c651b..31cffdd 100644 --- a/src/RealTimeAPI.ts +++ b/src/RealTimeAPI.ts @@ -3,225 +3,243 @@ */ import { Observable } from "rxjs"; -import { WebSocketSubject } from 'rxjs/observable/dom/WebSocketSubject'; +import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject"; import { v4 as uuid } from "uuid"; import { SHA256 } from "crypto-js"; export class RealTimeAPI { - public url: string; - public webSocket: WebSocketSubject<{}>; - - constructor(param: string | WebSocketSubject<{}>) { - switch (typeof param) { - case "string": - this.url = param as string; - this.webSocket = Observable.webSocket(this.url); - break; - case "object": - this.webSocket = param as WebSocketSubject<{}>; - this.url = this.webSocket.url; - break; - default: - throw new Error(`Invalid Parameter to the Constructor, Parameter must be of Type WebSocketSubject or URL but was found of type "${typeof param}"`); - } - } - - /** - * Returns the Observable to the RealTime API Socket - */ - public getObservable() { - return this.webSocket.catch(err => Observable.of(err)); - } - - /** - * Disconnect the WebSocket Connection between client and RealTime API - */ - public disconnect() { - return this.webSocket.unsubscribe(); - } - - /** - * onMessage - */ - public onMessage(messageHandler?: ((value: {}) => void) | undefined): void { - this.subscribe(messageHandler, undefined, undefined); - } - - - /** - * onError - */ - public onError(errorHandler?: ((error: any) => void) | undefined): void { - this.subscribe(undefined, errorHandler, undefined); - } - - /** - * onCompletion - */ - public onCompletion(completionHandler?: (() => void) | undefined): void { - this.subscribe(undefined, undefined, completionHandler); - } - - /** - * Subscribe to the WebSocket of the RealTime API - */ - public subscribe(messageHandler?: ((value: {}) => void) | undefined, errorHandler?: ((error: any) => void) | undefined, completionHandler?: (() => void) | undefined) { - this.getObservable().subscribe(messageHandler, errorHandler, completionHandler); - } - - /** - * sendMessage to Rocket.Chat Server - */ - public sendMessage(messageObject: {}): void { - this.webSocket.next(JSON.stringify(messageObject)); - } - - /** - * getObservableFilteredByMessageType - */ - public getObservableFilteredByMessageType(messageType: string) { - return this.getObservable().filter((message: any) => message.msg === messageType); - } - - /** - * getObservableFilteredByID - */ - public getObservableFilteredByID(id: string) { - return this.getObservable().filter((message: any) => message.id === id); - } - - /** - * connectToServer - */ - public connectToServer() { - this.sendMessage({ "msg": "connect", "version": "1", "support": ["1", "pre2", "pre1"] }); - return this.getObservableFilteredByMessageType("connected"); - } - - /** - * keepAlive, Ping and Pong to the Rocket.Chat Server to Keep the Connection Alive. - */ - public keepAlive(): void { - this.getObservableFilteredByMessageType("ping").subscribe( - message => this.sendMessage({ msg: "pong" }) + public url: string; + public webSocket: WebSocketSubject<{}>; + + constructor(param: string | WebSocketSubject<{}>) { + switch (typeof param) { + case "string": + this.url = param as string; + this.webSocket = Observable.webSocket(this.url); + break; + case "object": + this.webSocket = param as WebSocketSubject<{}>; + this.url = this.webSocket.url; + break; + default: + throw new Error( + `Invalid Parameter to the Constructor, Parameter must be of Type WebSocketSubject or URL but was found of type "${typeof param}"` ); } - - /** - * Login with Username and Password - */ - public login(username: string, password: string) { - let id = uuid(); - let usernameType = username.indexOf("@") !== -1 ? "email" : "username"; - this.sendMessage({ - "msg": "method", - "method": "login", - "id": id, - "params": [ - { - "user": { [usernameType]: username }, - "password": { - "digest": SHA256(password).toString(), - "algorithm": "sha-256" - } - } - ] - }); - return this.getLoginObservable(id); - } - - /** - * Login with Authentication Token - */ - public loginWithAuthToken(authToken: string) { - let id = uuid(); - this.sendMessage({ - "msg": "method", - "method": "login", - "id": id, - "params": [ - { "resume": authToken } - ] - }); - return this.getLoginObservable(id); - } - - /** - * Login with OAuth, with Client Token and Client Secret - */ - public loginWithOAuth(credToken: string, credSecret: string) { - let id = uuid(); - this.sendMessage({ - "msg": "method", - "method": "login", - "id": id, - "params": [ - { - "oauth": { - "credentialToken": credToken, - "credentialSecret": credSecret - } - } - ] - }); - return this.getLoginObservable(id); - } - - /** - * getLoginObservable - */ - public getLoginObservable(id: string) { - let resultObservable = this.getObservableFilteredByID(id); - let resultId: string; - - let addedObservable = this.getObservable() - .buffer( - resultObservable.map( ({ msg, error, result }) => { - if(msg === "result" && !error ) - return resultId = result.id // Setting resultId to get Result from the buffer - }) - ) - .flatMap( x => x) // Flattening the Buffered Messages - .filter(({ id: msgId }) => resultId !== undefined && msgId === resultId ) //Filtering the "added" result message. - - return Observable.merge(resultObservable, addedObservable); //Merging "result" and "added" messages. - } - - /** - * Get Observalble to the Result of Method Call from Rocket.Chat Realtime API - */ - public callMethod(method: string, ...params: Array<{}>) { - let id = uuid(); - this.sendMessage({ - "msg": "method", - method, - id, - params - }); - return this.getObservableFilteredByID(id); - } - - /** - * getSubscription - */ - public getSubscription(streamName: string, streamParam: string, addEvent: boolean) { - let id = uuid(); - let subscription = this.webSocket.multiplex( - () => JSON.stringify({ - "msg": "sub", - "id": id, - "name": streamName, - "params": [ - streamParam, - addEvent - ] - }), - () => JSON.stringify({ - "msg": "unsub", - "id": id - }), - (message: any) => typeof message.collection === "string" && message.collection === streamName && message.fields.eventName === streamParam // Proper Filtering to be done. This is temporary filter just for the stream-room-messages subscription - ); - return subscription; - } + } + + /** + * Returns the Observable to the RealTime API Socket + */ + public getObservable() { + return this.webSocket.catch(err => Observable.of(err)); + } + + /** + * Disconnect the WebSocket Connection between client and RealTime API + */ + public disconnect() { + return this.webSocket.unsubscribe(); + } + + /** + * onMessage + */ + public onMessage(messageHandler?: ((value: {}) => void) | undefined): void { + this.subscribe(messageHandler, undefined, undefined); + } + + /** + * onError + */ + public onError(errorHandler?: ((error: any) => void) | undefined): void { + this.subscribe(undefined, errorHandler, undefined); + } + + /** + * onCompletion + */ + public onCompletion(completionHandler?: (() => void) | undefined): void { + this.subscribe(undefined, undefined, completionHandler); + } + + /** + * Subscribe to the WebSocket of the RealTime API + */ + public subscribe( + messageHandler?: ((value: {}) => void) | undefined, + errorHandler?: ((error: any) => void) | undefined, + completionHandler?: (() => void) | undefined + ) { + this.getObservable().subscribe( + messageHandler, + errorHandler, + completionHandler + ); + } + + /** + * sendMessage to Rocket.Chat Server + */ + public sendMessage(messageObject: {}): void { + this.webSocket.next(JSON.stringify(messageObject)); + } + + /** + * getObservableFilteredByMessageType + */ + public getObservableFilteredByMessageType(messageType: string) { + return this.getObservable().filter( + (message: any) => message.msg === messageType + ); + } + + /** + * getObservableFilteredByID + */ + public getObservableFilteredByID(id: string) { + return this.getObservable().filter((message: any) => message.id === id); + } + + /** + * connectToServer + */ + public connectToServer() { + this.sendMessage({ + msg: "connect", + version: "1", + support: ["1", "pre2", "pre1"] + }); + return this.getObservableFilteredByMessageType("connected"); + } + + /** + * keepAlive, Ping and Pong to the Rocket.Chat Server to Keep the Connection Alive. + */ + public keepAlive(): void { + this.getObservableFilteredByMessageType("ping").subscribe(message => + this.sendMessage({ msg: "pong" }) + ); + } + + /** + * Login with Username and Password + */ + public login(username: string, password: string) { + let id = uuid(); + let usernameType = username.indexOf("@") !== -1 ? "email" : "username"; + this.sendMessage({ + msg: "method", + method: "login", + id: id, + params: [ + { + user: { [usernameType]: username }, + password: { + digest: SHA256(password).toString(), + algorithm: "sha-256" + } + } + ] + }); + return this.getLoginObservable(id); + } + + /** + * Login with Authentication Token + */ + public loginWithAuthToken(authToken: string) { + let id = uuid(); + this.sendMessage({ + msg: "method", + method: "login", + id: id, + params: [{ resume: authToken }] + }); + return this.getLoginObservable(id); + } + + /** + * Login with OAuth, with Client Token and Client Secret + */ + public loginWithOAuth(credToken: string, credSecret: string) { + let id = uuid(); + this.sendMessage({ + msg: "method", + method: "login", + id: id, + params: [ + { + oauth: { + credentialToken: credToken, + credentialSecret: credSecret + } + } + ] + }); + return this.getLoginObservable(id); + } + + /** + * getLoginObservable + */ + public getLoginObservable(id: string) { + let resultObservable = this.getObservableFilteredByID(id); + let resultId: string; + + let addedObservable = this.getObservable() + .buffer( + resultObservable.map(({ msg, error, result }) => { + if (msg === "result" && !error) return (resultId = result.id); // Setting resultId to get Result from the buffer + }) + ) + .flatMap(x => x) // Flattening the Buffered Messages + .filter(({ id: msgId }) => resultId !== undefined && msgId === resultId); //Filtering the "added" result message. + + return Observable.merge(resultObservable, addedObservable); //Merging "result" and "added" messages. + } + + /** + * Get Observalble to the Result of Method Call from Rocket.Chat Realtime API + */ + public callMethod(method: string, ...params: Array<{}>) { + let id = uuid(); + this.sendMessage({ + msg: "method", + method, + id, + params + }); + return this.getObservableFilteredByID(id); + } + + /** + * getSubscription + */ + public getSubscription( + streamName: string, + streamParam: string, + addEvent: boolean + ) { + let id = uuid(); + let subscription = this.webSocket.multiplex( + () => + JSON.stringify({ + msg: "sub", + id: id, + name: streamName, + params: [streamParam, addEvent] + }), + () => + JSON.stringify({ + msg: "unsub", + id: id + }), + (message: any) => + typeof message.collection === "string" && + message.collection === streamName && + message.fields.eventName === streamParam // Proper Filtering to be done. This is temporary filter just for the stream-room-messages subscription + ); + return subscription; + } } diff --git a/src/index.ts b/src/index.ts index 28ebbb7..854370d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1 +1 @@ -export { RealTimeAPI } from "./RealTimeAPI"; \ No newline at end of file +export { RealTimeAPI } from "./RealTimeAPI";