88 * You may obtain a copy of the License at
99 * http://www.apache.org/licenses/LICENSE-2.0
1010 *
11- * Unless required by applicable law or agreed to in writing, software
12- * distributed under the License is distributed on an "AS IS" BASIS,
13- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14- * See the License for the specific language governing permissions and
15- * limitations under the License.
11+ * Unless required by applicable law or agreed to in writing,
12+ * software distributed under the License is distributed on an "AS
13+ * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+ * express or implied. See the License for the specific language
15+ * governing permissions and limitations under the License.
1616 */
1717
1818"use strict" ;
@@ -48,9 +48,11 @@ class Publisher extends EventEmitter {
4848 }
4949
5050 /**
51- * throttleMs interacts with queueSize to determine when to send messages.
51+ * throttleMs interacts with queueSize to determine when to send
52+ * messages.
5253 * < 0 : send immediately - no interaction with queue
53- * >= 0 : place event at end of event queue to publish message after minimum delay (MS)
54+ * >= 0 : place event at end of event queue to publish message
55+ after minimum delay (MS)
5456 */
5557 if ( options . hasOwnProperty ( 'throttleMs' ) ) {
5658 this . _throttleMs = options . throttleMs ;
@@ -118,7 +120,8 @@ class Publisher extends EventEmitter {
118120 }
119121
120122 /**
121- * Schedule the msg for publishing - or publish immediately if we're supposed to
123+ * Schedule the msg for publishing - or publish immediately if we're
124+ * supposed to
122125 * @param msg {object} object type matching this._type
123126 * @param [throttleMs] {number} optional override for publisher setting
124127 */
@@ -149,7 +152,8 @@ class Publisher extends EventEmitter {
149152 if ( this . _pubTimeout === null ) {
150153 let now = Date . now ( ) ;
151154 if ( this . _pubTime !== null ) {
152- // check how long to throttle for based on the last time we published
155+ // check how long to throttle for based on the last time we
156+ // published
153157 if ( now - this . _pubTime > throttleMs ) {
154158 throttleMs = 0 ;
155159 }
@@ -180,11 +184,16 @@ class Publisher extends EventEmitter {
180184 try {
181185 let bufferInfo = { buffer : [ ] , length : 0 } ;
182186 // serialize pushes buffers onto buffInfo.buffer in order
183- // concat them, and preprend the byte length to the message before sending
187+ // concat them, and preprend the byte length to the message
188+ // before sending
189+ // console.log("_publish", this._messageHandler, msg);
184190 bufferInfo = this . _messageHandler . serialize ( msg , bufferInfo ) ;
191+ // bufferInfo = msg.serialize();
192+ // console.log("_publish", bufferInfo);
185193
186194 // prepend byte length to message
187- let serialized = Serialize ( Buffer . concat ( bufferInfo . buffer , bufferInfo . length ) ) ;
195+ let serialized = Serialize (
196+ Buffer . concat ( bufferInfo . buffer , bufferInfo . length ) ) ;
188197 Object . keys ( this . _subClients ) . forEach ( ( client ) => {
189198 this . _subClients [ client ] . write ( serialized ) ;
190199 } ) ;
@@ -197,7 +206,7 @@ class Publisher extends EventEmitter {
197206 }
198207 }
199208 catch ( err ) {
200- this . _log . warn ( 'Error when publishing message ' + err ) ;
209+ this . _log . warn ( 'Error when publishing message ' , err . stack ) ;
201210 }
202211 } ) ;
203212
@@ -212,13 +221,16 @@ class Publisher extends EventEmitter {
212221 NetworkUtils . getFreePort ( )
213222 . then ( ( port ) => {
214223 let server = net . createServer ( ( subscriber ) => {
215- let subName = subscriber . remoteAddress + ":" + subscriber . remotePort ;
224+ let subName = subscriber . remoteAddress + ":"
225+ + subscriber . remotePort ;
216226 subscriber . name = subName ;
217- this . _log . debug ( 'Publisher ' + this . getTopic ( ) + ' got connection from ' + subName ) ;
227+ this . _log . debug ( 'Publisher ' + this . getTopic ( )
228+ + ' got connection from ' + subName ) ;
218229
219- // subscriber will send us tcpros handshake before we can start publishing
220- // to it.
221- subscriber . $handshake = this . _handleHandshake . bind ( this , subscriber ) ;
230+ // subscriber will send us tcpros handshake before we can
231+ // start publishing to it.
232+ subscriber . $handshake =
233+ this . _handleHandshake . bind ( this , subscriber ) ;
222234
223235 // handshake will be TCPROS encoded, so use a DeserializeStream to
224236 // handle any chunking
@@ -233,7 +245,8 @@ class Publisher extends EventEmitter {
233245 }
234246
235247 subscriber . on ( 'close' , ( ) => {
236- this . _log . info ( 'Publisher ' + this . getTopic ( ) + ' client ' + subscriber . name + ' disconnected!' ) ;
248+ this . _log . info ( 'Publisher ' + this . getTopic ( ) + ' client '
249+ + subscriber . name + ' disconnected!' ) ;
237250 delete this . _subClients [ subscriber . name ] ;
238251 } ) ;
239252
@@ -271,15 +284,24 @@ class Publisher extends EventEmitter {
271284 _handleHandshake ( subscriber , data ) {
272285 if ( ! subscriber . $initialized ) {
273286 let header = TcprosUtils . parseSubHeader ( data ) ;
274- let valid = TcprosUtils . validateSubHeader ( header , this . getTopic ( ) , this . getType ( ) , this . _messageHandler . md5sum ( ) ) ;
287+ let valid = TcprosUtils . validateSubHeader (
288+ header , this . getTopic ( ) , this . getType ( ) ,
289+ this . _messageHandler . md5sum ( ) ) ;
275290 if ( valid !== null ) {
276- this . _log . error ( 'Unable to validate connection header ' + JSON . stringify ( header ) ) ;
291+ this . _log . error ( 'Unable to validate connection header '
292+ + JSON . stringify ( header ) ) ;
277293 subscriber . write ( Serialize ( valid ) ) ;
278294 return ;
279295 }
280- this . _log . debug ( 'Pub ' + this . getTopic ( ) + ' got connection header ' + JSON . stringify ( header ) ) ;
281-
282- let respHeader = TcprosUtils . createPubHeader ( this . _nodeHandle . getNodeName ( ) , this . _messageHandler . md5sum ( ) , this . getType ( ) , this . getLatching ( ) ) ;
296+ this . _log . debug ( 'Pub ' + this . getTopic ( )
297+ + ' got connection header ' + JSON . stringify ( header ) ) ;
298+
299+ let respHeader =
300+ TcprosUtils . createPubHeader (
301+ this . _nodeHandle . getNodeName ( ) ,
302+ this . _messageHandler . md5sum ( ) ,
303+ this . getType ( ) ,
304+ this . getLatching ( ) ) ;
283305 subscriber . write ( respHeader ) ;
284306
285307 if ( this . _lastSentMsg !== null ) {
@@ -293,7 +315,8 @@ class Publisher extends EventEmitter {
293315 this . emit ( 'connection' , subscriber . name ) ;
294316 }
295317 else {
296- this . _log . error ( 'Got message from subscriber after handshake - what gives!!' ) ;
318+ this . _log . error (
319+ 'Got message from subscriber after handshake - what gives!!' ) ;
297320 }
298321 }
299322
@@ -310,7 +333,8 @@ class Publisher extends EventEmitter {
310333 }
311334 } )
312335 . catch ( ( err , resp ) => {
313- this . _log . error ( 'reg pub err ' + err + ' resp: ' + JSON . stringify ( resp ) ) ;
336+ this . _log . error ( 'reg pub err ' + err + ' resp: '
337+ + JSON . stringify ( resp ) ) ;
314338 } )
315339 }
316340
0 commit comments