@@ -72,6 +72,41 @@ export class WebSocketHandler implements WebSocketInterface {
7272 return true ;
7373 }
7474
75+ public static async processData (
76+ data : string | Buffer ,
77+ ws : WebSocket | null ,
78+ createWS : ( ) => Promise < WebSocket > ,
79+ streamNum : number = 0 ,
80+ retryCount : number = 3 ,
81+ ) : Promise < WebSocket | null > {
82+ const buff = Buffer . alloc ( data . length + 1 ) ;
83+
84+ buff . writeInt8 ( streamNum , 0 ) ;
85+ if ( data instanceof Buffer ) {
86+ data . copy ( buff , 1 ) ;
87+ } else {
88+ buff . write ( data , 1 ) ;
89+ }
90+
91+ let i = 0 ;
92+ for ( ; i < retryCount ; ++ i ) {
93+ if ( ws !== null && ws . readyState === WebSocket . OPEN ) {
94+ ws . send ( buff ) ;
95+ break ;
96+ } else {
97+ ws = await createWS ( ) ;
98+ }
99+ }
100+
101+ // This throw doesn't go anywhere.
102+ // TODO: Figure out the right way to return an error.
103+ if ( i >= retryCount ) {
104+ throw new Error ( "can't send data to ws" ) ;
105+ }
106+
107+ return ws ;
108+ }
109+
75110 public static restartableHandleStandardInput (
76111 createWS : ( ) => Promise < WebSocket > ,
77112 stdin : stream . Readable | any ,
@@ -85,33 +120,10 @@ export class WebSocketHandler implements WebSocketInterface {
85120 let queue : Promise < void > = Promise . resolve ( ) ;
86121 let ws : WebSocket | null = null ;
87122
88- async function processData ( data ) : Promise < void > {
89- const buff = Buffer . alloc ( data . length + 1 ) ;
90-
91- buff . writeInt8 ( streamNum , 0 ) ;
92- if ( data instanceof Buffer ) {
93- data . copy ( buff , 1 ) ;
94- } else {
95- buff . write ( data , 1 ) ;
96- }
97-
98- let i = 0 ;
99- for ( ; i < retryCount ; ++ i ) {
100- if ( ws !== null && ws . readyState === WebSocket . OPEN ) {
101- ws . send ( buff ) ;
102- break ;
103- } else {
104- ws = await createWS ( ) ;
105- }
106- }
107-
108- if ( i >= retryCount ) {
109- throw new Error ( "can't send data to ws" ) ;
110- }
111- }
112-
113123 stdin . on ( 'data' , ( data ) => {
114- queue = queue . then ( ( ) => processData ( data ) ) ;
124+ queue = queue . then ( async ( ) => {
125+ ws = await WebSocketHandler . processData ( data , ws , createWS , streamNum , retryCount ) ;
126+ } ) ;
115127 } ) ;
116128
117129 stdin . on ( 'end' , ( ) => {
0 commit comments