您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

909 行
24KB

  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const https = require('https');
  4. const http = require('http');
  5. const net = require('net');
  6. const tls = require('tls');
  7. const { randomBytes, createHash } = require('crypto');
  8. const { URL } = require('url');
  9. const PerMessageDeflate = require('./permessage-deflate');
  10. const Receiver = require('./receiver');
  11. const Sender = require('./sender');
  12. const {
  13. BINARY_TYPES,
  14. EMPTY_BUFFER,
  15. GUID,
  16. kStatusCode,
  17. kWebSocket,
  18. NOOP
  19. } = require('./constants');
  20. const { addEventListener, removeEventListener } = require('./event-target');
  21. const { format, parse } = require('./extension');
  22. const { toBuffer } = require('./buffer-util');
  23. const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
  24. const protocolVersions = [8, 13];
  25. const closeTimeout = 30 * 1000;
  26. /**
  27. * Class representing a WebSocket.
  28. *
  29. * @extends EventEmitter
  30. */
  31. class WebSocket extends EventEmitter {
  32. /**
  33. * Create a new `WebSocket`.
  34. *
  35. * @param {(String|url.URL)} address The URL to which to connect
  36. * @param {(String|String[])} protocols The subprotocols
  37. * @param {Object} options Connection options
  38. */
  39. constructor(address, protocols, options) {
  40. super();
  41. this.readyState = WebSocket.CONNECTING;
  42. this.protocol = '';
  43. this._binaryType = BINARY_TYPES[0];
  44. this._closeFrameReceived = false;
  45. this._closeFrameSent = false;
  46. this._closeMessage = '';
  47. this._closeTimer = null;
  48. this._closeCode = 1006;
  49. this._extensions = {};
  50. this._receiver = null;
  51. this._sender = null;
  52. this._socket = null;
  53. if (address !== null) {
  54. this._bufferedAmount = 0;
  55. this._isServer = false;
  56. this._redirects = 0;
  57. if (Array.isArray(protocols)) {
  58. protocols = protocols.join(', ');
  59. } else if (typeof protocols === 'object' && protocols !== null) {
  60. options = protocols;
  61. protocols = undefined;
  62. }
  63. initAsClient(this, address, protocols, options);
  64. } else {
  65. this._isServer = true;
  66. }
  67. }
  68. get CONNECTING() {
  69. return WebSocket.CONNECTING;
  70. }
  71. get CLOSING() {
  72. return WebSocket.CLOSING;
  73. }
  74. get CLOSED() {
  75. return WebSocket.CLOSED;
  76. }
  77. get OPEN() {
  78. return WebSocket.OPEN;
  79. }
  80. /**
  81. * This deviates from the WHATWG interface since ws doesn't support the
  82. * required default "blob" type (instead we define a custom "nodebuffer"
  83. * type).
  84. *
  85. * @type {String}
  86. */
  87. get binaryType() {
  88. return this._binaryType;
  89. }
  90. set binaryType(type) {
  91. if (!BINARY_TYPES.includes(type)) return;
  92. this._binaryType = type;
  93. //
  94. // Allow to change `binaryType` on the fly.
  95. //
  96. if (this._receiver) this._receiver._binaryType = type;
  97. }
  98. /**
  99. * @type {Number}
  100. */
  101. get bufferedAmount() {
  102. if (!this._socket) return this._bufferedAmount;
  103. //
  104. // `socket.bufferSize` is `undefined` if the socket is closed.
  105. //
  106. return (this._socket.bufferSize || 0) + this._sender._bufferedBytes;
  107. }
  108. /**
  109. * @type {String}
  110. */
  111. get extensions() {
  112. return Object.keys(this._extensions).join();
  113. }
  114. /**
  115. * Set up the socket and the internal resources.
  116. *
  117. * @param {net.Socket} socket The network socket between the server and client
  118. * @param {Buffer} head The first packet of the upgraded stream
  119. * @param {Number} maxPayload The maximum allowed message size
  120. * @private
  121. */
  122. setSocket(socket, head, maxPayload) {
  123. const receiver = new Receiver(
  124. this._binaryType,
  125. this._extensions,
  126. this._isServer,
  127. maxPayload
  128. );
  129. this._sender = new Sender(socket, this._extensions);
  130. this._receiver = receiver;
  131. this._socket = socket;
  132. receiver[kWebSocket] = this;
  133. socket[kWebSocket] = this;
  134. receiver.on('conclude', receiverOnConclude);
  135. receiver.on('drain', receiverOnDrain);
  136. receiver.on('error', receiverOnError);
  137. receiver.on('message', receiverOnMessage);
  138. receiver.on('ping', receiverOnPing);
  139. receiver.on('pong', receiverOnPong);
  140. socket.setTimeout(0);
  141. socket.setNoDelay();
  142. if (head.length > 0) socket.unshift(head);
  143. socket.on('close', socketOnClose);
  144. socket.on('data', socketOnData);
  145. socket.on('end', socketOnEnd);
  146. socket.on('error', socketOnError);
  147. this.readyState = WebSocket.OPEN;
  148. this.emit('open');
  149. }
  150. /**
  151. * Emit the `'close'` event.
  152. *
  153. * @private
  154. */
  155. emitClose() {
  156. if (!this._socket) {
  157. this.readyState = WebSocket.CLOSED;
  158. this.emit('close', this._closeCode, this._closeMessage);
  159. return;
  160. }
  161. if (this._extensions[PerMessageDeflate.extensionName]) {
  162. this._extensions[PerMessageDeflate.extensionName].cleanup();
  163. }
  164. this._receiver.removeAllListeners();
  165. this.readyState = WebSocket.CLOSED;
  166. this.emit('close', this._closeCode, this._closeMessage);
  167. }
  168. /**
  169. * Start a closing handshake.
  170. *
  171. * +----------+ +-----------+ +----------+
  172. * - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
  173. * | +----------+ +-----------+ +----------+ |
  174. * +----------+ +-----------+ |
  175. * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
  176. * +----------+ +-----------+ |
  177. * | | | +---+ |
  178. * +------------------------+-->|fin| - - - -
  179. * | +---+ | +---+
  180. * - - - - -|fin|<---------------------+
  181. * +---+
  182. *
  183. * @param {Number} code Status code explaining why the connection is closing
  184. * @param {String} data A string explaining why the connection is closing
  185. * @public
  186. */
  187. close(code, data) {
  188. if (this.readyState === WebSocket.CLOSED) return;
  189. if (this.readyState === WebSocket.CONNECTING) {
  190. const msg = 'WebSocket was closed before the connection was established';
  191. return abortHandshake(this, this._req, msg);
  192. }
  193. if (this.readyState === WebSocket.CLOSING) {
  194. if (this._closeFrameSent && this._closeFrameReceived) this._socket.end();
  195. return;
  196. }
  197. this.readyState = WebSocket.CLOSING;
  198. this._sender.close(code, data, !this._isServer, (err) => {
  199. //
  200. // This error is handled by the `'error'` listener on the socket. We only
  201. // want to know if the close frame has been sent here.
  202. //
  203. if (err) return;
  204. this._closeFrameSent = true;
  205. if (this._closeFrameReceived) this._socket.end();
  206. });
  207. //
  208. // Specify a timeout for the closing handshake to complete.
  209. //
  210. this._closeTimer = setTimeout(
  211. this._socket.destroy.bind(this._socket),
  212. closeTimeout
  213. );
  214. }
  215. /**
  216. * Send a ping.
  217. *
  218. * @param {*} data The data to send
  219. * @param {Boolean} mask Indicates whether or not to mask `data`
  220. * @param {Function} cb Callback which is executed when the ping is sent
  221. * @public
  222. */
  223. ping(data, mask, cb) {
  224. if (this.readyState === WebSocket.CONNECTING) {
  225. throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
  226. }
  227. if (typeof data === 'function') {
  228. cb = data;
  229. data = mask = undefined;
  230. } else if (typeof mask === 'function') {
  231. cb = mask;
  232. mask = undefined;
  233. }
  234. if (typeof data === 'number') data = data.toString();
  235. if (this.readyState !== WebSocket.OPEN) {
  236. sendAfterClose(this, data, cb);
  237. return;
  238. }
  239. if (mask === undefined) mask = !this._isServer;
  240. this._sender.ping(data || EMPTY_BUFFER, mask, cb);
  241. }
  242. /**
  243. * Send a pong.
  244. *
  245. * @param {*} data The data to send
  246. * @param {Boolean} mask Indicates whether or not to mask `data`
  247. * @param {Function} cb Callback which is executed when the pong is sent
  248. * @public
  249. */
  250. pong(data, mask, cb) {
  251. if (this.readyState === WebSocket.CONNECTING) {
  252. throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
  253. }
  254. if (typeof data === 'function') {
  255. cb = data;
  256. data = mask = undefined;
  257. } else if (typeof mask === 'function') {
  258. cb = mask;
  259. mask = undefined;
  260. }
  261. if (typeof data === 'number') data = data.toString();
  262. if (this.readyState !== WebSocket.OPEN) {
  263. sendAfterClose(this, data, cb);
  264. return;
  265. }
  266. if (mask === undefined) mask = !this._isServer;
  267. this._sender.pong(data || EMPTY_BUFFER, mask, cb);
  268. }
  269. /**
  270. * Send a data message.
  271. *
  272. * @param {*} data The message to send
  273. * @param {Object} options Options object
  274. * @param {Boolean} options.compress Specifies whether or not to compress
  275. * `data`
  276. * @param {Boolean} options.binary Specifies whether `data` is binary or text
  277. * @param {Boolean} options.fin Specifies whether the fragment is the last one
  278. * @param {Boolean} options.mask Specifies whether or not to mask `data`
  279. * @param {Function} cb Callback which is executed when data is written out
  280. * @public
  281. */
  282. send(data, options, cb) {
  283. if (this.readyState === WebSocket.CONNECTING) {
  284. throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
  285. }
  286. if (typeof options === 'function') {
  287. cb = options;
  288. options = {};
  289. }
  290. if (typeof data === 'number') data = data.toString();
  291. if (this.readyState !== WebSocket.OPEN) {
  292. sendAfterClose(this, data, cb);
  293. return;
  294. }
  295. const opts = {
  296. binary: typeof data !== 'string',
  297. mask: !this._isServer,
  298. compress: true,
  299. fin: true,
  300. ...options
  301. };
  302. if (!this._extensions[PerMessageDeflate.extensionName]) {
  303. opts.compress = false;
  304. }
  305. this._sender.send(data || EMPTY_BUFFER, opts, cb);
  306. }
  307. /**
  308. * Forcibly close the connection.
  309. *
  310. * @public
  311. */
  312. terminate() {
  313. if (this.readyState === WebSocket.CLOSED) return;
  314. if (this.readyState === WebSocket.CONNECTING) {
  315. const msg = 'WebSocket was closed before the connection was established';
  316. return abortHandshake(this, this._req, msg);
  317. }
  318. if (this._socket) {
  319. this.readyState = WebSocket.CLOSING;
  320. this._socket.destroy();
  321. }
  322. }
  323. }
  324. readyStates.forEach((readyState, i) => {
  325. WebSocket[readyState] = i;
  326. });
  327. //
  328. // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
  329. // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
  330. //
  331. ['open', 'error', 'close', 'message'].forEach((method) => {
  332. Object.defineProperty(WebSocket.prototype, `on${method}`, {
  333. /**
  334. * Return the listener of the event.
  335. *
  336. * @return {(Function|undefined)} The event listener or `undefined`
  337. * @public
  338. */
  339. get() {
  340. const listeners = this.listeners(method);
  341. for (let i = 0; i < listeners.length; i++) {
  342. if (listeners[i]._listener) return listeners[i]._listener;
  343. }
  344. return undefined;
  345. },
  346. /**
  347. * Add a listener for the event.
  348. *
  349. * @param {Function} listener The listener to add
  350. * @public
  351. */
  352. set(listener) {
  353. const listeners = this.listeners(method);
  354. for (let i = 0; i < listeners.length; i++) {
  355. //
  356. // Remove only the listeners added via `addEventListener`.
  357. //
  358. if (listeners[i]._listener) this.removeListener(method, listeners[i]);
  359. }
  360. this.addEventListener(method, listener);
  361. }
  362. });
  363. });
  364. WebSocket.prototype.addEventListener = addEventListener;
  365. WebSocket.prototype.removeEventListener = removeEventListener;
  366. module.exports = WebSocket;
  367. /**
  368. * Initialize a WebSocket client.
  369. *
  370. * @param {WebSocket} websocket The client to initialize
  371. * @param {(String|url.URL)} address The URL to which to connect
  372. * @param {String} protocols The subprotocols
  373. * @param {Object} options Connection options
  374. * @param {(Boolean|Object)} options.perMessageDeflate Enable/disable
  375. * permessage-deflate
  376. * @param {Number} options.handshakeTimeout Timeout in milliseconds for the
  377. * handshake request
  378. * @param {Number} options.protocolVersion Value of the `Sec-WebSocket-Version`
  379. * header
  380. * @param {String} options.origin Value of the `Origin` or
  381. * `Sec-WebSocket-Origin` header
  382. * @param {Number} options.maxPayload The maximum allowed message size
  383. * @param {Boolean} options.followRedirects Whether or not to follow redirects
  384. * @param {Number} options.maxRedirects The maximum number of redirects allowed
  385. * @private
  386. */
  387. function initAsClient(websocket, address, protocols, options) {
  388. const opts = {
  389. protocolVersion: protocolVersions[1],
  390. maxPayload: 100 * 1024 * 1024,
  391. perMessageDeflate: true,
  392. followRedirects: false,
  393. maxRedirects: 10,
  394. ...options,
  395. createConnection: undefined,
  396. socketPath: undefined,
  397. hostname: undefined,
  398. protocol: undefined,
  399. timeout: undefined,
  400. method: undefined,
  401. host: undefined,
  402. path: undefined,
  403. port: undefined
  404. };
  405. if (!protocolVersions.includes(opts.protocolVersion)) {
  406. throw new RangeError(
  407. `Unsupported protocol version: ${opts.protocolVersion} ` +
  408. `(supported versions: ${protocolVersions.join(', ')})`
  409. );
  410. }
  411. let parsedUrl;
  412. if (address instanceof URL) {
  413. parsedUrl = address;
  414. websocket.url = address.href;
  415. } else {
  416. parsedUrl = new URL(address);
  417. websocket.url = address;
  418. }
  419. const isUnixSocket = parsedUrl.protocol === 'ws+unix:';
  420. if (!parsedUrl.host && (!isUnixSocket || !parsedUrl.pathname)) {
  421. throw new Error(`Invalid URL: ${websocket.url}`);
  422. }
  423. const isSecure =
  424. parsedUrl.protocol === 'wss:' || parsedUrl.protocol === 'https:';
  425. const defaultPort = isSecure ? 443 : 80;
  426. const key = randomBytes(16).toString('base64');
  427. const get = isSecure ? https.get : http.get;
  428. let perMessageDeflate;
  429. opts.createConnection = isSecure ? tlsConnect : netConnect;
  430. opts.defaultPort = opts.defaultPort || defaultPort;
  431. opts.port = parsedUrl.port || defaultPort;
  432. opts.host = parsedUrl.hostname.startsWith('[')
  433. ? parsedUrl.hostname.slice(1, -1)
  434. : parsedUrl.hostname;
  435. opts.headers = {
  436. 'Sec-WebSocket-Version': opts.protocolVersion,
  437. 'Sec-WebSocket-Key': key,
  438. Connection: 'Upgrade',
  439. Upgrade: 'websocket',
  440. ...opts.headers
  441. };
  442. opts.path = parsedUrl.pathname + parsedUrl.search;
  443. opts.timeout = opts.handshakeTimeout;
  444. if (opts.perMessageDeflate) {
  445. perMessageDeflate = new PerMessageDeflate(
  446. opts.perMessageDeflate !== true ? opts.perMessageDeflate : {},
  447. false,
  448. opts.maxPayload
  449. );
  450. opts.headers['Sec-WebSocket-Extensions'] = format({
  451. [PerMessageDeflate.extensionName]: perMessageDeflate.offer()
  452. });
  453. }
  454. if (protocols) {
  455. opts.headers['Sec-WebSocket-Protocol'] = protocols;
  456. }
  457. if (opts.origin) {
  458. if (opts.protocolVersion < 13) {
  459. opts.headers['Sec-WebSocket-Origin'] = opts.origin;
  460. } else {
  461. opts.headers.Origin = opts.origin;
  462. }
  463. }
  464. if (parsedUrl.username || parsedUrl.password) {
  465. opts.auth = `${parsedUrl.username}:${parsedUrl.password}`;
  466. }
  467. if (isUnixSocket) {
  468. const parts = opts.path.split(':');
  469. opts.socketPath = parts[0];
  470. opts.path = parts[1];
  471. }
  472. let req = (websocket._req = get(opts));
  473. if (opts.timeout) {
  474. req.on('timeout', () => {
  475. abortHandshake(websocket, req, 'Opening handshake has timed out');
  476. });
  477. }
  478. req.on('error', (err) => {
  479. if (websocket._req.aborted) return;
  480. req = websocket._req = null;
  481. websocket.readyState = WebSocket.CLOSING;
  482. websocket.emit('error', err);
  483. websocket.emitClose();
  484. });
  485. req.on('response', (res) => {
  486. const location = res.headers.location;
  487. const statusCode = res.statusCode;
  488. if (
  489. location &&
  490. opts.followRedirects &&
  491. statusCode >= 300 &&
  492. statusCode < 400
  493. ) {
  494. if (++websocket._redirects > opts.maxRedirects) {
  495. abortHandshake(websocket, req, 'Maximum redirects exceeded');
  496. return;
  497. }
  498. req.abort();
  499. const addr = new URL(location, address);
  500. initAsClient(websocket, addr, protocols, options);
  501. } else if (!websocket.emit('unexpected-response', req, res)) {
  502. abortHandshake(
  503. websocket,
  504. req,
  505. `Unexpected server response: ${res.statusCode}`
  506. );
  507. }
  508. });
  509. req.on('upgrade', (res, socket, head) => {
  510. websocket.emit('upgrade', res);
  511. //
  512. // The user may have closed the connection from a listener of the `upgrade`
  513. // event.
  514. //
  515. if (websocket.readyState !== WebSocket.CONNECTING) return;
  516. req = websocket._req = null;
  517. const digest = createHash('sha1')
  518. .update(key + GUID)
  519. .digest('base64');
  520. if (res.headers['sec-websocket-accept'] !== digest) {
  521. abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header');
  522. return;
  523. }
  524. const serverProt = res.headers['sec-websocket-protocol'];
  525. const protList = (protocols || '').split(/, */);
  526. let protError;
  527. if (!protocols && serverProt) {
  528. protError = 'Server sent a subprotocol but none was requested';
  529. } else if (protocols && !serverProt) {
  530. protError = 'Server sent no subprotocol';
  531. } else if (serverProt && !protList.includes(serverProt)) {
  532. protError = 'Server sent an invalid subprotocol';
  533. }
  534. if (protError) {
  535. abortHandshake(websocket, socket, protError);
  536. return;
  537. }
  538. if (serverProt) websocket.protocol = serverProt;
  539. if (perMessageDeflate) {
  540. try {
  541. const extensions = parse(res.headers['sec-websocket-extensions']);
  542. if (extensions[PerMessageDeflate.extensionName]) {
  543. perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]);
  544. websocket._extensions[
  545. PerMessageDeflate.extensionName
  546. ] = perMessageDeflate;
  547. }
  548. } catch (err) {
  549. abortHandshake(
  550. websocket,
  551. socket,
  552. 'Invalid Sec-WebSocket-Extensions header'
  553. );
  554. return;
  555. }
  556. }
  557. websocket.setSocket(socket, head, opts.maxPayload);
  558. });
  559. }
  560. /**
  561. * Create a `net.Socket` and initiate a connection.
  562. *
  563. * @param {Object} options Connection options
  564. * @return {net.Socket} The newly created socket used to start the connection
  565. * @private
  566. */
  567. function netConnect(options) {
  568. options.path = options.socketPath;
  569. return net.connect(options);
  570. }
  571. /**
  572. * Create a `tls.TLSSocket` and initiate a connection.
  573. *
  574. * @param {Object} options Connection options
  575. * @return {tls.TLSSocket} The newly created socket used to start the connection
  576. * @private
  577. */
  578. function tlsConnect(options) {
  579. options.path = undefined;
  580. if (!options.servername && options.servername !== '') {
  581. options.servername = options.host;
  582. }
  583. return tls.connect(options);
  584. }
  585. /**
  586. * Abort the handshake and emit an error.
  587. *
  588. * @param {WebSocket} websocket The WebSocket instance
  589. * @param {(http.ClientRequest|net.Socket)} stream The request to abort or the
  590. * socket to destroy
  591. * @param {String} message The error message
  592. * @private
  593. */
  594. function abortHandshake(websocket, stream, message) {
  595. websocket.readyState = WebSocket.CLOSING;
  596. const err = new Error(message);
  597. Error.captureStackTrace(err, abortHandshake);
  598. if (stream.setHeader) {
  599. stream.abort();
  600. stream.once('abort', websocket.emitClose.bind(websocket));
  601. websocket.emit('error', err);
  602. } else {
  603. stream.destroy(err);
  604. stream.once('error', websocket.emit.bind(websocket, 'error'));
  605. stream.once('close', websocket.emitClose.bind(websocket));
  606. }
  607. }
  608. /**
  609. * Handle cases where the `ping()`, `pong()`, or `send()` methods are called
  610. * when the `readyState` attribute is `CLOSING` or `CLOSED`.
  611. *
  612. * @param {WebSocket} websocket The WebSocket instance
  613. * @param {*} data The data to send
  614. * @param {Function} cb Callback
  615. * @private
  616. */
  617. function sendAfterClose(websocket, data, cb) {
  618. if (data) {
  619. const length = toBuffer(data).length;
  620. //
  621. // The `_bufferedAmount` property is used only when the peer is a client and
  622. // the opening handshake fails. Under these circumstances, in fact, the
  623. // `setSocket()` method is not called, so the `_socket` and `_sender`
  624. // properties are set to `null`.
  625. //
  626. if (websocket._socket) websocket._sender._bufferedBytes += length;
  627. else websocket._bufferedAmount += length;
  628. }
  629. if (cb) {
  630. const err = new Error(
  631. `WebSocket is not open: readyState ${websocket.readyState} ` +
  632. `(${readyStates[websocket.readyState]})`
  633. );
  634. cb(err);
  635. }
  636. }
  637. /**
  638. * The listener of the `Receiver` `'conclude'` event.
  639. *
  640. * @param {Number} code The status code
  641. * @param {String} reason The reason for closing
  642. * @private
  643. */
  644. function receiverOnConclude(code, reason) {
  645. const websocket = this[kWebSocket];
  646. websocket._socket.removeListener('data', socketOnData);
  647. websocket._socket.resume();
  648. websocket._closeFrameReceived = true;
  649. websocket._closeMessage = reason;
  650. websocket._closeCode = code;
  651. if (code === 1005) websocket.close();
  652. else websocket.close(code, reason);
  653. }
  654. /**
  655. * The listener of the `Receiver` `'drain'` event.
  656. *
  657. * @private
  658. */
  659. function receiverOnDrain() {
  660. this[kWebSocket]._socket.resume();
  661. }
  662. /**
  663. * The listener of the `Receiver` `'error'` event.
  664. *
  665. * @param {(RangeError|Error)} err The emitted error
  666. * @private
  667. */
  668. function receiverOnError(err) {
  669. const websocket = this[kWebSocket];
  670. websocket._socket.removeListener('data', socketOnData);
  671. websocket.readyState = WebSocket.CLOSING;
  672. websocket._closeCode = err[kStatusCode];
  673. websocket.emit('error', err);
  674. websocket._socket.destroy();
  675. }
  676. /**
  677. * The listener of the `Receiver` `'finish'` event.
  678. *
  679. * @private
  680. */
  681. function receiverOnFinish() {
  682. this[kWebSocket].emitClose();
  683. }
  684. /**
  685. * The listener of the `Receiver` `'message'` event.
  686. *
  687. * @param {(String|Buffer|ArrayBuffer|Buffer[])} data The message
  688. * @private
  689. */
  690. function receiverOnMessage(data) {
  691. this[kWebSocket].emit('message', data);
  692. }
  693. /**
  694. * The listener of the `Receiver` `'ping'` event.
  695. *
  696. * @param {Buffer} data The data included in the ping frame
  697. * @private
  698. */
  699. function receiverOnPing(data) {
  700. const websocket = this[kWebSocket];
  701. websocket.pong(data, !websocket._isServer, NOOP);
  702. websocket.emit('ping', data);
  703. }
  704. /**
  705. * The listener of the `Receiver` `'pong'` event.
  706. *
  707. * @param {Buffer} data The data included in the pong frame
  708. * @private
  709. */
  710. function receiverOnPong(data) {
  711. this[kWebSocket].emit('pong', data);
  712. }
  713. /**
  714. * The listener of the `net.Socket` `'close'` event.
  715. *
  716. * @private
  717. */
  718. function socketOnClose() {
  719. const websocket = this[kWebSocket];
  720. this.removeListener('close', socketOnClose);
  721. this.removeListener('end', socketOnEnd);
  722. websocket.readyState = WebSocket.CLOSING;
  723. //
  724. // The close frame might not have been received or the `'end'` event emitted,
  725. // for example, if the socket was destroyed due to an error. Ensure that the
  726. // `receiver` stream is closed after writing any remaining buffered data to
  727. // it. If the readable side of the socket is in flowing mode then there is no
  728. // buffered data as everything has been already written and `readable.read()`
  729. // will return `null`. If instead, the socket is paused, any possible buffered
  730. // data will be read as a single chunk and emitted synchronously in a single
  731. // `'data'` event.
  732. //
  733. websocket._socket.read();
  734. websocket._receiver.end();
  735. this.removeListener('data', socketOnData);
  736. this[kWebSocket] = undefined;
  737. clearTimeout(websocket._closeTimer);
  738. if (
  739. websocket._receiver._writableState.finished ||
  740. websocket._receiver._writableState.errorEmitted
  741. ) {
  742. websocket.emitClose();
  743. } else {
  744. websocket._receiver.on('error', receiverOnFinish);
  745. websocket._receiver.on('finish', receiverOnFinish);
  746. }
  747. }
  748. /**
  749. * The listener of the `net.Socket` `'data'` event.
  750. *
  751. * @param {Buffer} chunk A chunk of data
  752. * @private
  753. */
  754. function socketOnData(chunk) {
  755. if (!this[kWebSocket]._receiver.write(chunk)) {
  756. this.pause();
  757. }
  758. }
  759. /**
  760. * The listener of the `net.Socket` `'end'` event.
  761. *
  762. * @private
  763. */
  764. function socketOnEnd() {
  765. const websocket = this[kWebSocket];
  766. websocket.readyState = WebSocket.CLOSING;
  767. websocket._receiver.end();
  768. this.end();
  769. }
  770. /**
  771. * The listener of the `net.Socket` `'error'` event.
  772. *
  773. * @private
  774. */
  775. function socketOnError() {
  776. const websocket = this[kWebSocket];
  777. this.removeListener('error', socketOnError);
  778. this.on('error', NOOP);
  779. if (websocket) {
  780. websocket.readyState = WebSocket.CLOSING;
  781. this.destroy();
  782. }
  783. }