您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

507 行
12KB

  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const GET_INFO = 0;
  13. const GET_PAYLOAD_LENGTH_16 = 1;
  14. const GET_PAYLOAD_LENGTH_64 = 2;
  15. const GET_MASK = 3;
  16. const GET_DATA = 4;
  17. const INFLATING = 5;
  18. /**
  19. * HyBi Receiver implementation.
  20. *
  21. * @extends stream.Writable
  22. */
  23. class Receiver extends Writable {
  24. /**
  25. * Creates a Receiver instance.
  26. *
  27. * @param {String} binaryType The type for binary data
  28. * @param {Object} extensions An object containing the negotiated extensions
  29. * @param {Boolean} isServer Specifies whether to operate in client or server
  30. * mode
  31. * @param {Number} maxPayload The maximum allowed message length
  32. */
  33. constructor(binaryType, extensions, isServer, maxPayload) {
  34. super();
  35. this._binaryType = binaryType || BINARY_TYPES[0];
  36. this[kWebSocket] = undefined;
  37. this._extensions = extensions || {};
  38. this._isServer = !!isServer;
  39. this._maxPayload = maxPayload | 0;
  40. this._bufferedBytes = 0;
  41. this._buffers = [];
  42. this._compressed = false;
  43. this._payloadLength = 0;
  44. this._mask = undefined;
  45. this._fragmented = 0;
  46. this._masked = false;
  47. this._fin = false;
  48. this._opcode = 0;
  49. this._totalPayloadLength = 0;
  50. this._messageLength = 0;
  51. this._fragments = [];
  52. this._state = GET_INFO;
  53. this._loop = false;
  54. }
  55. /**
  56. * Implements `Writable.prototype._write()`.
  57. *
  58. * @param {Buffer} chunk The chunk of data to write
  59. * @param {String} encoding The character encoding of `chunk`
  60. * @param {Function} cb Callback
  61. */
  62. _write(chunk, encoding, cb) {
  63. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  64. this._bufferedBytes += chunk.length;
  65. this._buffers.push(chunk);
  66. this.startLoop(cb);
  67. }
  68. /**
  69. * Consumes `n` bytes from the buffered data.
  70. *
  71. * @param {Number} n The number of bytes to consume
  72. * @return {Buffer} The consumed bytes
  73. * @private
  74. */
  75. consume(n) {
  76. this._bufferedBytes -= n;
  77. if (n === this._buffers[0].length) return this._buffers.shift();
  78. if (n < this._buffers[0].length) {
  79. const buf = this._buffers[0];
  80. this._buffers[0] = buf.slice(n);
  81. return buf.slice(0, n);
  82. }
  83. const dst = Buffer.allocUnsafe(n);
  84. do {
  85. const buf = this._buffers[0];
  86. const offset = dst.length - n;
  87. if (n >= buf.length) {
  88. dst.set(this._buffers.shift(), offset);
  89. } else {
  90. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  91. this._buffers[0] = buf.slice(n);
  92. }
  93. n -= buf.length;
  94. } while (n > 0);
  95. return dst;
  96. }
  97. /**
  98. * Starts the parsing loop.
  99. *
  100. * @param {Function} cb Callback
  101. * @private
  102. */
  103. startLoop(cb) {
  104. let err;
  105. this._loop = true;
  106. do {
  107. switch (this._state) {
  108. case GET_INFO:
  109. err = this.getInfo();
  110. break;
  111. case GET_PAYLOAD_LENGTH_16:
  112. err = this.getPayloadLength16();
  113. break;
  114. case GET_PAYLOAD_LENGTH_64:
  115. err = this.getPayloadLength64();
  116. break;
  117. case GET_MASK:
  118. this.getMask();
  119. break;
  120. case GET_DATA:
  121. err = this.getData(cb);
  122. break;
  123. default:
  124. // `INFLATING`
  125. this._loop = false;
  126. return;
  127. }
  128. } while (this._loop);
  129. cb(err);
  130. }
  131. /**
  132. * Reads the first two bytes of a frame.
  133. *
  134. * @return {(RangeError|undefined)} A possible error
  135. * @private
  136. */
  137. getInfo() {
  138. if (this._bufferedBytes < 2) {
  139. this._loop = false;
  140. return;
  141. }
  142. const buf = this.consume(2);
  143. if ((buf[0] & 0x30) !== 0x00) {
  144. this._loop = false;
  145. return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
  146. }
  147. const compressed = (buf[0] & 0x40) === 0x40;
  148. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  149. this._loop = false;
  150. return error(RangeError, 'RSV1 must be clear', true, 1002);
  151. }
  152. this._fin = (buf[0] & 0x80) === 0x80;
  153. this._opcode = buf[0] & 0x0f;
  154. this._payloadLength = buf[1] & 0x7f;
  155. if (this._opcode === 0x00) {
  156. if (compressed) {
  157. this._loop = false;
  158. return error(RangeError, 'RSV1 must be clear', true, 1002);
  159. }
  160. if (!this._fragmented) {
  161. this._loop = false;
  162. return error(RangeError, 'invalid opcode 0', true, 1002);
  163. }
  164. this._opcode = this._fragmented;
  165. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  166. if (this._fragmented) {
  167. this._loop = false;
  168. return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
  169. }
  170. this._compressed = compressed;
  171. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  172. if (!this._fin) {
  173. this._loop = false;
  174. return error(RangeError, 'FIN must be set', true, 1002);
  175. }
  176. if (compressed) {
  177. this._loop = false;
  178. return error(RangeError, 'RSV1 must be clear', true, 1002);
  179. }
  180. if (this._payloadLength > 0x7d) {
  181. this._loop = false;
  182. return error(
  183. RangeError,
  184. `invalid payload length ${this._payloadLength}`,
  185. true,
  186. 1002
  187. );
  188. }
  189. } else {
  190. this._loop = false;
  191. return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
  192. }
  193. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  194. this._masked = (buf[1] & 0x80) === 0x80;
  195. if (this._isServer) {
  196. if (!this._masked) {
  197. this._loop = false;
  198. return error(RangeError, 'MASK must be set', true, 1002);
  199. }
  200. } else if (this._masked) {
  201. this._loop = false;
  202. return error(RangeError, 'MASK must be clear', true, 1002);
  203. }
  204. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  205. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  206. else return this.haveLength();
  207. }
  208. /**
  209. * Gets extended payload length (7+16).
  210. *
  211. * @return {(RangeError|undefined)} A possible error
  212. * @private
  213. */
  214. getPayloadLength16() {
  215. if (this._bufferedBytes < 2) {
  216. this._loop = false;
  217. return;
  218. }
  219. this._payloadLength = this.consume(2).readUInt16BE(0);
  220. return this.haveLength();
  221. }
  222. /**
  223. * Gets extended payload length (7+64).
  224. *
  225. * @return {(RangeError|undefined)} A possible error
  226. * @private
  227. */
  228. getPayloadLength64() {
  229. if (this._bufferedBytes < 8) {
  230. this._loop = false;
  231. return;
  232. }
  233. const buf = this.consume(8);
  234. const num = buf.readUInt32BE(0);
  235. //
  236. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  237. // if payload length is greater than this number.
  238. //
  239. if (num > Math.pow(2, 53 - 32) - 1) {
  240. this._loop = false;
  241. return error(
  242. RangeError,
  243. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  244. false,
  245. 1009
  246. );
  247. }
  248. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  249. return this.haveLength();
  250. }
  251. /**
  252. * Payload length has been read.
  253. *
  254. * @return {(RangeError|undefined)} A possible error
  255. * @private
  256. */
  257. haveLength() {
  258. if (this._payloadLength && this._opcode < 0x08) {
  259. this._totalPayloadLength += this._payloadLength;
  260. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  261. this._loop = false;
  262. return error(RangeError, 'Max payload size exceeded', false, 1009);
  263. }
  264. }
  265. if (this._masked) this._state = GET_MASK;
  266. else this._state = GET_DATA;
  267. }
  268. /**
  269. * Reads mask bytes.
  270. *
  271. * @private
  272. */
  273. getMask() {
  274. if (this._bufferedBytes < 4) {
  275. this._loop = false;
  276. return;
  277. }
  278. this._mask = this.consume(4);
  279. this._state = GET_DATA;
  280. }
  281. /**
  282. * Reads data bytes.
  283. *
  284. * @param {Function} cb Callback
  285. * @return {(Error|RangeError|undefined)} A possible error
  286. * @private
  287. */
  288. getData(cb) {
  289. let data = EMPTY_BUFFER;
  290. if (this._payloadLength) {
  291. if (this._bufferedBytes < this._payloadLength) {
  292. this._loop = false;
  293. return;
  294. }
  295. data = this.consume(this._payloadLength);
  296. if (this._masked) unmask(data, this._mask);
  297. }
  298. if (this._opcode > 0x07) return this.controlMessage(data);
  299. if (this._compressed) {
  300. this._state = INFLATING;
  301. this.decompress(data, cb);
  302. return;
  303. }
  304. if (data.length) {
  305. //
  306. // This message is not compressed so its lenght is the sum of the payload
  307. // length of all fragments.
  308. //
  309. this._messageLength = this._totalPayloadLength;
  310. this._fragments.push(data);
  311. }
  312. return this.dataMessage();
  313. }
  314. /**
  315. * Decompresses data.
  316. *
  317. * @param {Buffer} data Compressed data
  318. * @param {Function} cb Callback
  319. * @private
  320. */
  321. decompress(data, cb) {
  322. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  323. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  324. if (err) return cb(err);
  325. if (buf.length) {
  326. this._messageLength += buf.length;
  327. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  328. return cb(
  329. error(RangeError, 'Max payload size exceeded', false, 1009)
  330. );
  331. }
  332. this._fragments.push(buf);
  333. }
  334. const er = this.dataMessage();
  335. if (er) return cb(er);
  336. this.startLoop(cb);
  337. });
  338. }
  339. /**
  340. * Handles a data message.
  341. *
  342. * @return {(Error|undefined)} A possible error
  343. * @private
  344. */
  345. dataMessage() {
  346. if (this._fin) {
  347. const messageLength = this._messageLength;
  348. const fragments = this._fragments;
  349. this._totalPayloadLength = 0;
  350. this._messageLength = 0;
  351. this._fragmented = 0;
  352. this._fragments = [];
  353. if (this._opcode === 2) {
  354. let data;
  355. if (this._binaryType === 'nodebuffer') {
  356. data = concat(fragments, messageLength);
  357. } else if (this._binaryType === 'arraybuffer') {
  358. data = toArrayBuffer(concat(fragments, messageLength));
  359. } else {
  360. data = fragments;
  361. }
  362. this.emit('message', data);
  363. } else {
  364. const buf = concat(fragments, messageLength);
  365. if (!isValidUTF8(buf)) {
  366. this._loop = false;
  367. return error(Error, 'invalid UTF-8 sequence', true, 1007);
  368. }
  369. this.emit('message', buf.toString());
  370. }
  371. }
  372. this._state = GET_INFO;
  373. }
  374. /**
  375. * Handles a control message.
  376. *
  377. * @param {Buffer} data Data to handle
  378. * @return {(Error|RangeError|undefined)} A possible error
  379. * @private
  380. */
  381. controlMessage(data) {
  382. if (this._opcode === 0x08) {
  383. this._loop = false;
  384. if (data.length === 0) {
  385. this.emit('conclude', 1005, '');
  386. this.end();
  387. } else if (data.length === 1) {
  388. return error(RangeError, 'invalid payload length 1', true, 1002);
  389. } else {
  390. const code = data.readUInt16BE(0);
  391. if (!isValidStatusCode(code)) {
  392. return error(RangeError, `invalid status code ${code}`, true, 1002);
  393. }
  394. const buf = data.slice(2);
  395. if (!isValidUTF8(buf)) {
  396. return error(Error, 'invalid UTF-8 sequence', true, 1007);
  397. }
  398. this.emit('conclude', code, buf.toString());
  399. this.end();
  400. }
  401. } else if (this._opcode === 0x09) {
  402. this.emit('ping', data);
  403. } else {
  404. this.emit('pong', data);
  405. }
  406. this._state = GET_INFO;
  407. }
  408. }
  409. module.exports = Receiver;
  410. /**
  411. * Builds an error object.
  412. *
  413. * @param {(Error|RangeError)} ErrorCtor The error constructor
  414. * @param {String} message The error message
  415. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  416. * `message`
  417. * @param {Number} statusCode The status code
  418. * @return {(Error|RangeError)} The error
  419. * @private
  420. */
  421. function error(ErrorCtor, message, prefix, statusCode) {
  422. const err = new ErrorCtor(
  423. prefix ? `Invalid WebSocket frame: ${message}` : message
  424. );
  425. Error.captureStackTrace(err, error);
  426. err[kStatusCode] = statusCode;
  427. return err;
  428. }