@ -3,225 +3,243 @@
* /
import { Observable } from "rxjs" ;
import { WebSocketSubject } from 'rxjs/observable/dom/WebSocketSubject' ;
import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject" ;
import { v4 as uuid } from "uuid" ;
import { SHA256 } from "crypto-js" ;
export class RealTimeAPI {
public url : string ;
public webSocket : WebSocketSubject < { } > ;
constructor ( param : string | WebSocketSubject < { } > ) {
switch ( typeof param ) {
case "string" :
this . url = param as string ;
this . webSocket = Observable . webSocket ( this . url ) ;
break ;
case "object" :
this . webSocket = param as WebSocketSubject < { } > ;
this . url = this . webSocket . url ;
break ;
default :
throw new Error ( ` Invalid Parameter to the Constructor, Parameter must be of Type WebSocketSubject or URL but was found of type " ${ typeof param } " ` ) ;
}
}
/ * *
* Returns the Observable to the RealTime API Socket
* /
public getObservable() {
return this . webSocket . catch ( err = > Observable . of ( err ) ) ;
}
/ * *
* Disconnect the WebSocket Connection between client and RealTime API
* /
public disconnect() {
return this . webSocket . unsubscribe ( ) ;
}
/ * *
* onMessage
* /
public onMessage ( messageHandler ? : ( ( value : { } ) = > void ) | undefined ) : void {
this . subscribe ( messageHandler , undefined , undefined ) ;
}
/ * *
* onError
* /
public onError ( errorHandler ? : ( ( error : any ) = > void ) | undefined ) : void {
this . subscribe ( undefined , errorHandler , undefined ) ;
}
/ * *
* onCompletion
* /
public onCompletion ( completionHandler ? : ( ( ) = > void ) | undefined ) : void {
this . subscribe ( undefined , undefined , completionHandler ) ;
}
/ * *
* Subscribe to the WebSocket of the RealTime API
* /
public subscribe ( messageHandler ? : ( ( value : { } ) = > void ) | undefined , errorHandler ? : ( ( error : any ) = > void ) | undefined , completionHandler ? : ( ( ) = > void ) | undefined ) {
this . getObservable ( ) . subscribe ( messageHandler , errorHandler , completionHandler ) ;
}
/ * *
* sendMessage to Rocket . Chat Server
* /
public sendMessage ( messageObject : { } ) : void {
this . webSocket . next ( JSON . stringify ( messageObject ) ) ;
}
/ * *
* getObservableFilteredByMessageType
* /
public getObservableFilteredByMessageType ( messageType : string ) {
return this . getObservable ( ) . filter ( ( message : any ) = > message . msg === messageType ) ;
}
/ * *
* getObservableFilteredByID
* /
public getObservableFilteredByID ( id : string ) {
return this . getObservable ( ) . filter ( ( message : any ) = > message . id === id ) ;
}
/ * *
* connectToServer
* /
public connectToServer() {
this . sendMessage ( { "msg" : "connect" , "version" : "1" , "support" : [ "1" , "pre2" , "pre1" ] } ) ;
return this . getObservableFilteredByMessageType ( "connected" ) ;
}
/ * *
* keepAlive , Ping and Pong to the Rocket . Chat Server to Keep the Connection Alive .
* /
public keepAlive ( ) : void {
this . getObservableFilteredByMessageType ( "ping" ) . subscribe (
message = > this . sendMessage ( { msg : "pong" } )
public url : string ;
public webSocket : WebSocketSubject < { } > ;
constructor ( param : string | WebSocketSubject < { } > ) {
switch ( typeof param ) {
case "string" :
this . url = param as string ;
this . webSocket = Observable . webSocket ( this . url ) ;
break ;
case "object" :
this . webSocket = param as WebSocketSubject < { } > ;
this . url = this . webSocket . url ;
break ;
default :
throw new Error (
` Invalid Parameter to the Constructor, Parameter must be of Type WebSocketSubject or URL but was found of type " ${ typeof param } " `
) ;
}
/ * *
* Login with Username and Password
* /
public login ( username : string , password : string ) {
let id = uuid ( ) ;
let usernameType = username . indexOf ( "@" ) !== - 1 ? "email" : "username" ;
this . sendMessage ( {
"msg" : "method" ,
"method" : "login" ,
"id" : id ,
"params" : [
{
"user" : { [ usernameType ] : username } ,
"password" : {
"digest" : SHA256 ( password ) . toString ( ) ,
"algorithm" : "sha-256"
}
}
]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* Login with Authentication Token
* /
public loginWithAuthToken ( authToken : string ) {
let id = uuid ( ) ;
this . sendMessage ( {
"msg" : "method" ,
"method" : "login" ,
"id" : id ,
"params" : [
{ "resume" : authToken }
]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* Login with OAuth , with Client Token and Client Secret
* /
public loginWithOAuth ( credToken : string , credSecret : string ) {
let id = uuid ( ) ;
this . sendMessage ( {
"msg" : "method" ,
"method" : "login" ,
"id" : id ,
"params" : [
{
"oauth" : {
"credentialToken" : credToken ,
"credentialSecret" : credSecret
}
}
]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* getLoginObservable
* /
public getLoginObservable ( id : string ) {
let resultObservable = this . getObservableFilteredByID ( id ) ;
let resultId : string ;
let addedObservable = this . getObservable ( )
. buffer (
resultObservable . map ( ( { msg , error , result } ) = > {
if ( msg === "result" && ! error )
return resultId = result . id // Setting resultId to get Result from the buffer
} )
)
. flatMap ( x = > x ) // Flattening the Buffered Messages
. filter ( ( { id : msgId } ) = > resultId !== undefined && msgId === resultId ) //Filtering the "added" result message.
return Observable . merge ( resultObservable , addedObservable ) ; //Merging "result" and "added" messages.
}
/ * *
* Get Observalble to the Result of Method Call from Rocket . Chat Realtime API
* /
public callMethod ( method : string , . . . params : Array < { } > ) {
let id = uuid ( ) ;
this . sendMessage ( {
"msg" : "method" ,
method ,
id ,
params
} ) ;
return this . getObservableFilteredByID ( id ) ;
}
/ * *
* getSubscription
* /
public getSubscription ( streamName : string , streamParam : string , addEvent : boolean ) {
let id = uuid ( ) ;
let subscription = this . webSocket . multiplex (
( ) = > JSON . stringify ( {
"msg" : "sub" ,
"id" : id ,
"name" : streamName ,
"params" : [
streamParam ,
addEvent
]
} ) ,
( ) = > JSON . stringify ( {
"msg" : "unsub" ,
"id" : id
} ) ,
( message : any ) = > typeof message . collection === "string" && message . collection === streamName && message . fields . eventName === streamParam // Proper Filtering to be done. This is temporary filter just for the stream-room-messages subscription
) ;
return subscription ;
}
}
/ * *
* Returns the Observable to the RealTime API Socket
* /
public getObservable() {
return this . webSocket . catch ( err = > Observable . of ( err ) ) ;
}
/ * *
* Disconnect the WebSocket Connection between client and RealTime API
* /
public disconnect() {
return this . webSocket . unsubscribe ( ) ;
}
/ * *
* onMessage
* /
public onMessage ( messageHandler ? : ( ( value : { } ) = > void ) | undefined ) : void {
this . subscribe ( messageHandler , undefined , undefined ) ;
}
/ * *
* onError
* /
public onError ( errorHandler ? : ( ( error : any ) = > void ) | undefined ) : void {
this . subscribe ( undefined , errorHandler , undefined ) ;
}
/ * *
* onCompletion
* /
public onCompletion ( completionHandler ? : ( ( ) = > void ) | undefined ) : void {
this . subscribe ( undefined , undefined , completionHandler ) ;
}
/ * *
* Subscribe to the WebSocket of the RealTime API
* /
public subscribe (
messageHandler ? : ( ( value : { } ) = > void ) | undefined ,
errorHandler ? : ( ( error : any ) = > void ) | undefined ,
completionHandler ? : ( ( ) = > void ) | undefined
) {
this . getObservable ( ) . subscribe (
messageHandler ,
errorHandler ,
completionHandler
) ;
}
/ * *
* sendMessage to Rocket . Chat Server
* /
public sendMessage ( messageObject : { } ) : void {
this . webSocket . next ( JSON . stringify ( messageObject ) ) ;
}
/ * *
* getObservableFilteredByMessageType
* /
public getObservableFilteredByMessageType ( messageType : string ) {
return this . getObservable ( ) . filter (
( message : any ) = > message . msg === messageType
) ;
}
/ * *
* getObservableFilteredByID
* /
public getObservableFilteredByID ( id : string ) {
return this . getObservable ( ) . filter ( ( message : any ) = > message . id === id ) ;
}
/ * *
* connectToServer
* /
public connectToServer() {
this . sendMessage ( {
msg : "connect" ,
version : "1" ,
support : [ "1" , "pre2" , "pre1" ]
} ) ;
return this . getObservableFilteredByMessageType ( "connected" ) ;
}
/ * *
* keepAlive , Ping and Pong to the Rocket . Chat Server to Keep the Connection Alive .
* /
public keepAlive ( ) : void {
this . getObservableFilteredByMessageType ( "ping" ) . subscribe ( message = >
this . sendMessage ( { msg : "pong" } )
) ;
}
/ * *
* Login with Username and Password
* /
public login ( username : string , password : string ) {
let id = uuid ( ) ;
let usernameType = username . indexOf ( "@" ) !== - 1 ? "email" : "username" ;
this . sendMessage ( {
msg : "method" ,
method : "login" ,
id : id ,
params : [
{
user : { [ usernameType ] : username } ,
password : {
digest : SHA256 ( password ) . toString ( ) ,
algorithm : "sha-256"
}
}
]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* Login with Authentication Token
* /
public loginWithAuthToken ( authToken : string ) {
let id = uuid ( ) ;
this . sendMessage ( {
msg : "method" ,
method : "login" ,
id : id ,
params : [ { resume : authToken } ]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* Login with OAuth , with Client Token and Client Secret
* /
public loginWithOAuth ( credToken : string , credSecret : string ) {
let id = uuid ( ) ;
this . sendMessage ( {
msg : "method" ,
method : "login" ,
id : id ,
params : [
{
oauth : {
credentialToken : credToken ,
credentialSecret : credSecret
}
}
]
} ) ;
return this . getLoginObservable ( id ) ;
}
/ * *
* getLoginObservable
* /
public getLoginObservable ( id : string ) {
let resultObservable = this . getObservableFilteredByID ( id ) ;
let resultId : string ;
let addedObservable = this . getObservable ( )
. buffer (
resultObservable . map ( ( { msg , error , result } ) = > {
if ( msg === "result" && ! error ) return ( resultId = result . id ) ; // Setting resultId to get Result from the buffer
} )
)
. flatMap ( x = > x ) // Flattening the Buffered Messages
. filter ( ( { id : msgId } ) = > resultId !== undefined && msgId === resultId ) ; //Filtering the "added" result message.
return Observable . merge ( resultObservable , addedObservable ) ; //Merging "result" and "added" messages.
}
/ * *
* Get Observalble to the Result of Method Call from Rocket . Chat Realtime API
* /
public callMethod ( method : string , . . . params : Array < { } > ) {
let id = uuid ( ) ;
this . sendMessage ( {
msg : "method" ,
method ,
id ,
params
} ) ;
return this . getObservableFilteredByID ( id ) ;
}
/ * *
* getSubscription
* /
public getSubscription (
streamName : string ,
streamParam : string ,
addEvent : boolean
) {
let id = uuid ( ) ;
let subscription = this . webSocket . multiplex (
( ) = >
JSON . stringify ( {
msg : "sub" ,
id : id ,
name : streamName ,
params : [ streamParam , addEvent ]
} ) ,
( ) = >
JSON . stringify ( {
msg : "unsub" ,
id : id
} ) ,
( message : any ) = >
typeof message . collection === "string" &&
message . collection === streamName &&
message . fields . eventName === streamParam // Proper Filtering to be done. This is temporary filter just for the stream-room-messages subscription
) ;
return subscription ;
}
}