From 44298c9fa2f7e07a63f7044ec7c9441dcd99a533 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Fri, 21 Feb 2025 03:46:53 +0100 Subject: [PATCH] http: support http proxy for fetch under NODE_USE_ENV_PROXY --- deps/undici/src/index-fetch.js | 4 + deps/undici/undici.js | 1236 +++++++++++++++++++++++- lib/internal/process/pre_execution.js | 12 + test/fixtures/fetch-and-log.mjs | 4 + test/fixtures/proxy-handler.js | 22 + test/parallel/test-http-proxy-fetch.js | 64 ++ 6 files changed, 1338 insertions(+), 4 deletions(-) create mode 100644 test/fixtures/fetch-and-log.mjs create mode 100644 test/fixtures/proxy-handler.js create mode 100644 test/parallel/test-http-proxy-fetch.js diff --git a/deps/undici/src/index-fetch.js b/deps/undici/src/index-fetch.js index 01df32d2fb4ded..77eb2279cf8326 100644 --- a/deps/undici/src/index-fetch.js +++ b/deps/undici/src/index-fetch.js @@ -26,6 +26,10 @@ module.exports.createFastMessageEvent = createFastMessageEvent module.exports.EventSource = require('./lib/web/eventsource/eventsource').EventSource +const api = require('./lib/api') +const Dispatcher = require('./lib/dispatcher/dispatcher') +Object.assign(Dispatcher.prototype, api) + // Expose the fetch implementation to be enabled in Node.js core via a flag module.exports.EnvHttpProxyAgent = EnvHttpProxyAgent module.exports.getGlobalDispatcher = getGlobalDispatcher diff --git a/deps/undici/undici.js b/deps/undici/undici.js index 5b8ec925ed0be0..497b5b629a3e53 100644 --- a/deps/undici/undici.js +++ b/deps/undici/undici.js @@ -461,7 +461,7 @@ var require_dispatcher = __commonJS({ var EventEmitter = require("node:events"); var WrapHandler = require_wrap_handler(); var wrapInterceptor = /* @__PURE__ */ __name((dispatch) => (opts, handler) => dispatch(opts, WrapHandler.wrap(handler)), "wrapInterceptor"); - var Dispatcher = class extends EventEmitter { + var Dispatcher2 = class extends EventEmitter { static { __name(this, "Dispatcher"); } @@ -495,7 +495,7 @@ var require_dispatcher = __commonJS({ }); } }; - module2.exports = Dispatcher; + module2.exports = Dispatcher2; } }); @@ -1425,7 +1425,7 @@ var require_unwrap_handler = __commonJS({ var require_dispatcher_base = __commonJS({ "lib/dispatcher/dispatcher-base.js"(exports2, module2) { "use strict"; - var Dispatcher = require_dispatcher(); + var Dispatcher2 = require_dispatcher(); var UnwrapHandler = require_unwrap_handler(); var { ClientDestroyedError, @@ -1435,7 +1435,7 @@ var require_dispatcher_base = __commonJS({ var { kDestroy, kClose, kClosed, kDestroyed, kDispatch } = require_symbols(); var kOnDestroyed = Symbol("onDestroyed"); var kOnClosed = Symbol("onClosed"); - var DispatcherBase = class extends Dispatcher { + var DispatcherBase = class extends Dispatcher2 { static { __name(this, "DispatcherBase"); } @@ -14120,6 +14120,1231 @@ var require_eventsource = __commonJS({ } }); +// lib/api/readable.js +var require_readable = __commonJS({ + "lib/api/readable.js"(exports2, module2) { + "use strict"; + var assert = require("node:assert"); + var { Readable } = require("node:stream"); + var { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require_errors(); + var util = require_util(); + var { ReadableStreamFrom } = require_util(); + var kConsume = Symbol("kConsume"); + var kReading = Symbol("kReading"); + var kBody = Symbol("kBody"); + var kAbort = Symbol("kAbort"); + var kContentType = Symbol("kContentType"); + var kContentLength = Symbol("kContentLength"); + var kUsed = Symbol("kUsed"); + var kBytesRead = Symbol("kBytesRead"); + var noop = /* @__PURE__ */ __name(() => { + }, "noop"); + var BodyReadable = class extends Readable { + static { + __name(this, "BodyReadable"); + } + /** + * @param {object} opts + * @param {(this: Readable, size: number) => void} opts.resume + * @param {() => (void | null)} opts.abort + * @param {string} [opts.contentType = ''] + * @param {number} [opts.contentLength] + * @param {number} [opts.highWaterMark = 64 * 1024] + */ + constructor({ + resume, + abort, + contentType = "", + contentLength, + highWaterMark = 64 * 1024 + // Same as nodejs fs streams. + }) { + super({ + autoDestroy: true, + read: resume, + highWaterMark + }); + this._readableState.dataEmitted = false; + this[kAbort] = abort; + this[kConsume] = null; + this[kBytesRead] = 0; + this[kBody] = null; + this[kUsed] = false; + this[kContentType] = contentType; + this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null; + this[kReading] = false; + } + /** + * @param {Error|null} err + * @param {(error:(Error|null)) => void} callback + * @returns {void} + */ + _destroy(err, callback) { + if (!err && !this._readableState.endEmitted) { + err = new RequestAbortedError(); + } + if (err) { + this[kAbort](); + } + if (!this[kUsed]) { + setImmediate(() => { + callback(err); + }); + } else { + callback(err); + } + } + /** + * @param {string} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + on(event, listener) { + if (event === "data" || event === "readable") { + this[kReading] = true; + this[kUsed] = true; + } + return super.on(event, listener); + } + /** + * @param {string} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + addListener(event, listener) { + return this.on(event, listener); + } + /** + * @param {string|symbol} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + off(event, listener) { + const ret = super.off(event, listener); + if (event === "data" || event === "readable") { + this[kReading] = this.listenerCount("data") > 0 || this.listenerCount("readable") > 0; + } + return ret; + } + /** + * @param {string|symbol} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + removeListener(event, listener) { + return this.off(event, listener); + } + /** + * @param {Buffer|null} chunk + * @returns {boolean} + */ + push(chunk) { + this[kBytesRead] += chunk ? chunk.length : 0; + if (this[kConsume] && chunk !== null) { + consumePush(this[kConsume], chunk); + return this[kReading] ? super.push(chunk) : true; + } + return super.push(chunk); + } + /** + * Consumes and returns the body as a string. + * + * @see https://fetch.spec.whatwg.org/#dom-body-text + * @returns {Promise} + */ + text() { + return consume(this, "text"); + } + /** + * Consumes and returns the body as a JavaScript Object. + * + * @see https://fetch.spec.whatwg.org/#dom-body-json + * @returns {Promise} + */ + json() { + return consume(this, "json"); + } + /** + * Consumes and returns the body as a Blob + * + * @see https://fetch.spec.whatwg.org/#dom-body-blob + * @returns {Promise} + */ + blob() { + return consume(this, "blob"); + } + /** + * Consumes and returns the body as an Uint8Array. + * + * @see https://fetch.spec.whatwg.org/#dom-body-bytes + * @returns {Promise} + */ + bytes() { + return consume(this, "bytes"); + } + /** + * Consumes and returns the body as an ArrayBuffer. + * + * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer + * @returns {Promise} + */ + arrayBuffer() { + return consume(this, "arrayBuffer"); + } + /** + * Not implemented + * + * @see https://fetch.spec.whatwg.org/#dom-body-formdata + * @throws {NotSupportedError} + */ + async formData() { + throw new NotSupportedError(); + } + /** + * Returns true if the body is not null and the body has been consumed. + * Otherwise, returns false. + * + * @see https://fetch.spec.whatwg.org/#dom-body-bodyused + * @readonly + * @returns {boolean} + */ + get bodyUsed() { + return util.isDisturbed(this); + } + /** + * @see https://fetch.spec.whatwg.org/#dom-body-body + * @readonly + * @returns {ReadableStream} + */ + get body() { + if (!this[kBody]) { + this[kBody] = ReadableStreamFrom(this); + if (this[kConsume]) { + this[kBody].getReader(); + assert(this[kBody].locked); + } + } + return this[kBody]; + } + /** + * Dumps the response body by reading `limit` number of bytes. + * @param {object} opts + * @param {number} [opts.limit = 131072] Number of bytes to read. + * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump. + * @returns {Promise} + */ + async dump(opts) { + const signal = opts?.signal; + if (signal != null && (typeof signal !== "object" || !("aborted" in signal))) { + throw new InvalidArgumentError("signal must be an AbortSignal"); + } + const limit = opts?.limit && Number.isFinite(opts.limit) ? opts.limit : 128 * 1024; + signal?.throwIfAborted(); + if (this._readableState.closeEmitted) { + return null; + } + return await new Promise((resolve, reject) => { + if (this[kContentLength] && this[kContentLength] > limit || this[kBytesRead] > limit) { + this.destroy(new AbortError()); + } + if (signal) { + const onAbort = /* @__PURE__ */ __name(() => { + this.destroy(signal.reason ?? new AbortError()); + }, "onAbort"); + signal.addEventListener("abort", onAbort); + this.on("close", function() { + signal.removeEventListener("abort", onAbort); + if (signal.aborted) { + reject(signal.reason ?? new AbortError()); + } else { + resolve(null); + } + }); + } else { + this.on("close", resolve); + } + this.on("error", noop).on("data", () => { + if (this[kBytesRead] > limit) { + this.destroy(); + } + }).resume(); + }); + } + /** + * @param {BufferEncoding} encoding + * @returns {this} + */ + setEncoding(encoding) { + if (Buffer.isEncoding(encoding)) { + this._readableState.encoding = encoding; + } + return this; + } + }; + function isLocked(bodyReadable) { + return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null; + } + __name(isLocked, "isLocked"); + function isUnusable(bodyReadable) { + return util.isDisturbed(bodyReadable) || isLocked(bodyReadable); + } + __name(isUnusable, "isUnusable"); + function consume(stream, type) { + assert(!stream[kConsume]); + return new Promise((resolve, reject) => { + if (isUnusable(stream)) { + const rState = stream._readableState; + if (rState.destroyed && rState.closeEmitted === false) { + stream.on("error", (err) => { + reject(err); + }).on("close", () => { + reject(new TypeError("unusable")); + }); + } else { + reject(rState.errored ?? new TypeError("unusable")); + } + } else { + queueMicrotask(() => { + stream[kConsume] = { + type, + stream, + resolve, + reject, + length: 0, + body: [] + }; + stream.on("error", function(err) { + consumeFinish(this[kConsume], err); + }).on("close", function() { + if (this[kConsume].body !== null) { + consumeFinish(this[kConsume], new RequestAbortedError()); + } + }); + consumeStart(stream[kConsume]); + }); + } + }); + } + __name(consume, "consume"); + function consumeStart(consume2) { + if (consume2.body === null) { + return; + } + const { _readableState: state } = consume2.stream; + if (state.bufferIndex) { + const start = state.bufferIndex; + const end = state.buffer.length; + for (let n = start; n < end; n++) { + consumePush(consume2, state.buffer[n]); + } + } else { + for (const chunk of state.buffer) { + consumePush(consume2, chunk); + } + } + if (state.endEmitted) { + consumeEnd(this[kConsume], this._readableState.encoding); + } else { + consume2.stream.on("end", function() { + consumeEnd(this[kConsume], this._readableState.encoding); + }); + } + consume2.stream.resume(); + while (consume2.stream.read() != null) { + } + } + __name(consumeStart, "consumeStart"); + function chunksDecode(chunks, length, encoding) { + if (chunks.length === 0 || length === 0) { + return ""; + } + const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length); + const bufferLength = buffer.length; + const start = bufferLength > 2 && buffer[0] === 239 && buffer[1] === 187 && buffer[2] === 191 ? 3 : 0; + if (!encoding || encoding === "utf8" || encoding === "utf-8") { + return buffer.utf8Slice(start, bufferLength); + } else { + return buffer.subarray(start, bufferLength).toString(encoding); + } + } + __name(chunksDecode, "chunksDecode"); + function chunksConcat(chunks, length) { + if (chunks.length === 0 || length === 0) { + return new Uint8Array(0); + } + if (chunks.length === 1) { + return new Uint8Array(chunks[0]); + } + const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer); + let offset = 0; + for (let i = 0; i < chunks.length; ++i) { + const chunk = chunks[i]; + buffer.set(chunk, offset); + offset += chunk.length; + } + return buffer; + } + __name(chunksConcat, "chunksConcat"); + function consumeEnd(consume2, encoding) { + const { type, body, resolve, stream, length } = consume2; + try { + if (type === "text") { + resolve(chunksDecode(body, length, encoding)); + } else if (type === "json") { + resolve(JSON.parse(chunksDecode(body, length, encoding))); + } else if (type === "arrayBuffer") { + resolve(chunksConcat(body, length).buffer); + } else if (type === "blob") { + resolve(new Blob(body, { type: stream[kContentType] })); + } else if (type === "bytes") { + resolve(chunksConcat(body, length)); + } + consumeFinish(consume2); + } catch (err) { + stream.destroy(err); + } + } + __name(consumeEnd, "consumeEnd"); + function consumePush(consume2, chunk) { + consume2.length += chunk.length; + consume2.body.push(chunk); + } + __name(consumePush, "consumePush"); + function consumeFinish(consume2, err) { + if (consume2.body === null) { + return; + } + if (err) { + consume2.reject(err); + } else { + consume2.resolve(); + } + consume2.type = null; + consume2.stream = null; + consume2.resolve = null; + consume2.reject = null; + consume2.length = 0; + consume2.body = null; + } + __name(consumeFinish, "consumeFinish"); + module2.exports = { + Readable: BodyReadable, + chunksDecode + }; + } +}); + +// lib/api/api-request.js +var require_api_request = __commonJS({ + "lib/api/api-request.js"(exports2, module2) { + "use strict"; + var assert = require("node:assert"); + var { AsyncResource } = require("node:async_hooks"); + var { Readable } = require_readable(); + var { InvalidArgumentError, RequestAbortedError } = require_errors(); + var util = require_util(); + function noop() { + } + __name(noop, "noop"); + var RequestHandler = class extends AsyncResource { + static { + __name(this, "RequestHandler"); + } + constructor(opts, callback) { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError("invalid opts"); + } + const { signal, method, opaque, body, onInfo, responseHeaders, highWaterMark } = opts; + try { + if (typeof callback !== "function") { + throw new InvalidArgumentError("invalid callback"); + } + if (highWaterMark && (typeof highWaterMark !== "number" || highWaterMark < 0)) { + throw new InvalidArgumentError("invalid highWaterMark"); + } + if (signal && typeof signal.on !== "function" && typeof signal.addEventListener !== "function") { + throw new InvalidArgumentError("signal must be an EventEmitter or EventTarget"); + } + if (method === "CONNECT") { + throw new InvalidArgumentError("invalid method"); + } + if (onInfo && typeof onInfo !== "function") { + throw new InvalidArgumentError("invalid onInfo callback"); + } + super("UNDICI_REQUEST"); + } catch (err) { + if (util.isStream(body)) { + util.destroy(body.on("error", noop), err); + } + throw err; + } + this.method = method; + this.responseHeaders = responseHeaders || null; + this.opaque = opaque || null; + this.callback = callback; + this.res = null; + this.abort = null; + this.body = body; + this.trailers = {}; + this.context = null; + this.onInfo = onInfo || null; + this.highWaterMark = highWaterMark; + this.reason = null; + this.removeAbortListener = null; + if (signal?.aborted) { + this.reason = signal.reason ?? new RequestAbortedError(); + } else if (signal) { + this.removeAbortListener = util.addAbortListener(signal, () => { + this.reason = signal.reason ?? new RequestAbortedError(); + if (this.res) { + util.destroy(this.res.on("error", noop), this.reason); + } else if (this.abort) { + this.abort(this.reason); + } + }); + } + } + onConnect(abort, context) { + if (this.reason) { + abort(this.reason); + return; + } + assert(this.callback); + this.abort = abort; + this.context = context; + } + onHeaders(statusCode, rawHeaders, resume, statusMessage) { + const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this; + const headers = responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + if (statusCode < 200) { + if (this.onInfo) { + this.onInfo({ statusCode, headers }); + } + return; + } + const parsedHeaders = responseHeaders === "raw" ? util.parseHeaders(rawHeaders) : headers; + const contentType = parsedHeaders["content-type"]; + const contentLength = parsedHeaders["content-length"]; + const res = new Readable({ + resume, + abort, + contentType, + contentLength: this.method !== "HEAD" && contentLength ? Number(contentLength) : null, + highWaterMark + }); + if (this.removeAbortListener) { + res.on("close", this.removeAbortListener); + this.removeAbortListener = null; + } + this.callback = null; + this.res = res; + if (callback !== null) { + this.runInAsyncScope(callback, null, null, { + statusCode, + headers, + trailers: this.trailers, + opaque, + body: res, + context + }); + } + } + onData(chunk) { + return this.res.push(chunk); + } + onComplete(trailers) { + util.parseHeaders(trailers, this.trailers); + this.res.push(null); + } + onError(err) { + const { res, callback, body, opaque } = this; + if (callback) { + this.callback = null; + queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }); + }); + } + if (res) { + this.res = null; + queueMicrotask(() => { + util.destroy(res.on("error", noop), err); + }); + } + if (body) { + this.body = null; + if (util.isStream(body)) { + body.on("error", noop); + util.destroy(body, err); + } + } + if (this.removeAbortListener) { + this.removeAbortListener(); + this.removeAbortListener = null; + } + } + }; + function request(opts, callback) { + if (callback === void 0) { + return new Promise((resolve, reject) => { + request.call(this, opts, (err, data) => { + return err ? reject(err) : resolve(data); + }); + }); + } + try { + const handler = new RequestHandler(opts, callback); + this.dispatch(opts, handler); + } catch (err) { + if (typeof callback !== "function") { + throw err; + } + const opaque = opts?.opaque; + queueMicrotask(() => callback(err, { opaque })); + } + } + __name(request, "request"); + module2.exports = request; + module2.exports.RequestHandler = RequestHandler; + } +}); + +// lib/api/abort-signal.js +var require_abort_signal = __commonJS({ + "lib/api/abort-signal.js"(exports2, module2) { + "use strict"; + var { addAbortListener } = require_util(); + var { RequestAbortedError } = require_errors(); + var kListener = Symbol("kListener"); + var kSignal = Symbol("kSignal"); + function abort(self) { + if (self.abort) { + self.abort(self[kSignal]?.reason); + } else { + self.reason = self[kSignal]?.reason ?? new RequestAbortedError(); + } + removeSignal(self); + } + __name(abort, "abort"); + function addSignal(self, signal) { + self.reason = null; + self[kSignal] = null; + self[kListener] = null; + if (!signal) { + return; + } + if (signal.aborted) { + abort(self); + return; + } + self[kSignal] = signal; + self[kListener] = () => { + abort(self); + }; + addAbortListener(self[kSignal], self[kListener]); + } + __name(addSignal, "addSignal"); + function removeSignal(self) { + if (!self[kSignal]) { + return; + } + if ("removeEventListener" in self[kSignal]) { + self[kSignal].removeEventListener("abort", self[kListener]); + } else { + self[kSignal].removeListener("abort", self[kListener]); + } + self[kSignal] = null; + self[kListener] = null; + } + __name(removeSignal, "removeSignal"); + module2.exports = { + addSignal, + removeSignal + }; + } +}); + +// lib/api/api-stream.js +var require_api_stream = __commonJS({ + "lib/api/api-stream.js"(exports2, module2) { + "use strict"; + var assert = require("node:assert"); + var { finished } = require("node:stream"); + var { AsyncResource } = require("node:async_hooks"); + var { InvalidArgumentError, InvalidReturnValueError } = require_errors(); + var util = require_util(); + var { addSignal, removeSignal } = require_abort_signal(); + function noop() { + } + __name(noop, "noop"); + var StreamHandler = class extends AsyncResource { + static { + __name(this, "StreamHandler"); + } + constructor(opts, factory, callback) { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError("invalid opts"); + } + const { signal, method, opaque, body, onInfo, responseHeaders } = opts; + try { + if (typeof callback !== "function") { + throw new InvalidArgumentError("invalid callback"); + } + if (typeof factory !== "function") { + throw new InvalidArgumentError("invalid factory"); + } + if (signal && typeof signal.on !== "function" && typeof signal.addEventListener !== "function") { + throw new InvalidArgumentError("signal must be an EventEmitter or EventTarget"); + } + if (method === "CONNECT") { + throw new InvalidArgumentError("invalid method"); + } + if (onInfo && typeof onInfo !== "function") { + throw new InvalidArgumentError("invalid onInfo callback"); + } + super("UNDICI_STREAM"); + } catch (err) { + if (util.isStream(body)) { + util.destroy(body.on("error", noop), err); + } + throw err; + } + this.responseHeaders = responseHeaders || null; + this.opaque = opaque || null; + this.factory = factory; + this.callback = callback; + this.res = null; + this.abort = null; + this.context = null; + this.trailers = null; + this.body = body; + this.onInfo = onInfo || null; + if (util.isStream(body)) { + body.on("error", (err) => { + this.onError(err); + }); + } + addSignal(this, signal); + } + onConnect(abort, context) { + if (this.reason) { + abort(this.reason); + return; + } + assert(this.callback); + this.abort = abort; + this.context = context; + } + onHeaders(statusCode, rawHeaders, resume, statusMessage) { + const { factory, opaque, context, responseHeaders } = this; + const headers = responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + if (statusCode < 200) { + if (this.onInfo) { + this.onInfo({ statusCode, headers }); + } + return; + } + this.factory = null; + if (factory === null) { + return; + } + const res = this.runInAsyncScope(factory, null, { + statusCode, + headers, + opaque, + context + }); + if (!res || typeof res.write !== "function" || typeof res.end !== "function" || typeof res.on !== "function") { + throw new InvalidReturnValueError("expected Writable"); + } + finished(res, { readable: false }, (err) => { + const { callback, res: res2, opaque: opaque2, trailers, abort } = this; + this.res = null; + if (err || !res2.readable) { + util.destroy(res2, err); + } + this.callback = null; + this.runInAsyncScope(callback, null, err || null, { opaque: opaque2, trailers }); + if (err) { + abort(); + } + }); + res.on("drain", resume); + this.res = res; + const needDrain = res.writableNeedDrain !== void 0 ? res.writableNeedDrain : res._writableState?.needDrain; + return needDrain !== true; + } + onData(chunk) { + const { res } = this; + return res ? res.write(chunk) : true; + } + onComplete(trailers) { + const { res } = this; + removeSignal(this); + if (!res) { + return; + } + this.trailers = util.parseHeaders(trailers); + res.end(); + } + onError(err) { + const { res, callback, opaque, body } = this; + removeSignal(this); + this.factory = null; + if (res) { + this.res = null; + util.destroy(res, err); + } else if (callback) { + this.callback = null; + queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }); + }); + } + if (body) { + this.body = null; + util.destroy(body, err); + } + } + }; + function stream(opts, factory, callback) { + if (callback === void 0) { + return new Promise((resolve, reject) => { + stream.call(this, opts, factory, (err, data) => { + return err ? reject(err) : resolve(data); + }); + }); + } + try { + const handler = new StreamHandler(opts, factory, callback); + this.dispatch(opts, handler); + } catch (err) { + if (typeof callback !== "function") { + throw err; + } + const opaque = opts?.opaque; + queueMicrotask(() => callback(err, { opaque })); + } + } + __name(stream, "stream"); + module2.exports = stream; + } +}); + +// lib/api/api-pipeline.js +var require_api_pipeline = __commonJS({ + "lib/api/api-pipeline.js"(exports2, module2) { + "use strict"; + var { + Readable, + Duplex, + PassThrough + } = require("node:stream"); + var assert = require("node:assert"); + var { AsyncResource } = require("node:async_hooks"); + var { + InvalidArgumentError, + InvalidReturnValueError, + RequestAbortedError + } = require_errors(); + var util = require_util(); + var { addSignal, removeSignal } = require_abort_signal(); + function noop() { + } + __name(noop, "noop"); + var kResume = Symbol("resume"); + var PipelineRequest = class extends Readable { + static { + __name(this, "PipelineRequest"); + } + constructor() { + super({ autoDestroy: true }); + this[kResume] = null; + } + _read() { + const { [kResume]: resume } = this; + if (resume) { + this[kResume] = null; + resume(); + } + } + _destroy(err, callback) { + this._read(); + callback(err); + } + }; + var PipelineResponse = class extends Readable { + static { + __name(this, "PipelineResponse"); + } + constructor(resume) { + super({ autoDestroy: true }); + this[kResume] = resume; + } + _read() { + this[kResume](); + } + _destroy(err, callback) { + if (!err && !this._readableState.endEmitted) { + err = new RequestAbortedError(); + } + callback(err); + } + }; + var PipelineHandler = class extends AsyncResource { + static { + __name(this, "PipelineHandler"); + } + constructor(opts, handler) { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError("invalid opts"); + } + if (typeof handler !== "function") { + throw new InvalidArgumentError("invalid handler"); + } + const { signal, method, opaque, onInfo, responseHeaders } = opts; + if (signal && typeof signal.on !== "function" && typeof signal.addEventListener !== "function") { + throw new InvalidArgumentError("signal must be an EventEmitter or EventTarget"); + } + if (method === "CONNECT") { + throw new InvalidArgumentError("invalid method"); + } + if (onInfo && typeof onInfo !== "function") { + throw new InvalidArgumentError("invalid onInfo callback"); + } + super("UNDICI_PIPELINE"); + this.opaque = opaque || null; + this.responseHeaders = responseHeaders || null; + this.handler = handler; + this.abort = null; + this.context = null; + this.onInfo = onInfo || null; + this.req = new PipelineRequest().on("error", noop); + this.ret = new Duplex({ + readableObjectMode: opts.objectMode, + autoDestroy: true, + read: /* @__PURE__ */ __name(() => { + const { body } = this; + if (body?.resume) { + body.resume(); + } + }, "read"), + write: /* @__PURE__ */ __name((chunk, encoding, callback) => { + const { req } = this; + if (req.push(chunk, encoding) || req._readableState.destroyed) { + callback(); + } else { + req[kResume] = callback; + } + }, "write"), + destroy: /* @__PURE__ */ __name((err, callback) => { + const { body, req, res, ret, abort } = this; + if (!err && !ret._readableState.endEmitted) { + err = new RequestAbortedError(); + } + if (abort && err) { + abort(); + } + util.destroy(body, err); + util.destroy(req, err); + util.destroy(res, err); + removeSignal(this); + callback(err); + }, "destroy") + }).on("prefinish", () => { + const { req } = this; + req.push(null); + }); + this.res = null; + addSignal(this, signal); + } + onConnect(abort, context) { + const { res } = this; + if (this.reason) { + abort(this.reason); + return; + } + assert(!res, "pipeline cannot be retried"); + this.abort = abort; + this.context = context; + } + onHeaders(statusCode, rawHeaders, resume) { + const { opaque, handler, context } = this; + if (statusCode < 200) { + if (this.onInfo) { + const headers = this.responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + this.onInfo({ statusCode, headers }); + } + return; + } + this.res = new PipelineResponse(resume); + let body; + try { + this.handler = null; + const headers = this.responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + body = this.runInAsyncScope(handler, null, { + statusCode, + headers, + opaque, + body: this.res, + context + }); + } catch (err) { + this.res.on("error", noop); + throw err; + } + if (!body || typeof body.on !== "function") { + throw new InvalidReturnValueError("expected Readable"); + } + body.on("data", (chunk) => { + const { ret, body: body2 } = this; + if (!ret.push(chunk) && body2.pause) { + body2.pause(); + } + }).on("error", (err) => { + const { ret } = this; + util.destroy(ret, err); + }).on("end", () => { + const { ret } = this; + ret.push(null); + }).on("close", () => { + const { ret } = this; + if (!ret._readableState.ended) { + util.destroy(ret, new RequestAbortedError()); + } + }); + this.body = body; + } + onData(chunk) { + const { res } = this; + return res.push(chunk); + } + onComplete(trailers) { + const { res } = this; + res.push(null); + } + onError(err) { + const { ret } = this; + this.handler = null; + util.destroy(ret, err); + } + }; + function pipeline(opts, handler) { + try { + const pipelineHandler = new PipelineHandler(opts, handler); + this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler); + return pipelineHandler.ret; + } catch (err) { + return new PassThrough().destroy(err); + } + } + __name(pipeline, "pipeline"); + module2.exports = pipeline; + } +}); + +// lib/api/api-upgrade.js +var require_api_upgrade = __commonJS({ + "lib/api/api-upgrade.js"(exports2, module2) { + "use strict"; + var { InvalidArgumentError, SocketError } = require_errors(); + var { AsyncResource } = require("node:async_hooks"); + var assert = require("node:assert"); + var util = require_util(); + var { addSignal, removeSignal } = require_abort_signal(); + var UpgradeHandler = class extends AsyncResource { + static { + __name(this, "UpgradeHandler"); + } + constructor(opts, callback) { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError("invalid opts"); + } + if (typeof callback !== "function") { + throw new InvalidArgumentError("invalid callback"); + } + const { signal, opaque, responseHeaders } = opts; + if (signal && typeof signal.on !== "function" && typeof signal.addEventListener !== "function") { + throw new InvalidArgumentError("signal must be an EventEmitter or EventTarget"); + } + super("UNDICI_UPGRADE"); + this.responseHeaders = responseHeaders || null; + this.opaque = opaque || null; + this.callback = callback; + this.abort = null; + this.context = null; + addSignal(this, signal); + } + onConnect(abort, context) { + if (this.reason) { + abort(this.reason); + return; + } + assert(this.callback); + this.abort = abort; + this.context = null; + } + onHeaders() { + throw new SocketError("bad upgrade", null); + } + onUpgrade(statusCode, rawHeaders, socket) { + assert(statusCode === 101); + const { callback, opaque, context } = this; + removeSignal(this); + this.callback = null; + const headers = this.responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + this.runInAsyncScope(callback, null, null, { + headers, + socket, + opaque, + context + }); + } + onError(err) { + const { callback, opaque } = this; + removeSignal(this); + if (callback) { + this.callback = null; + queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }); + }); + } + } + }; + function upgrade(opts, callback) { + if (callback === void 0) { + return new Promise((resolve, reject) => { + upgrade.call(this, opts, (err, data) => { + return err ? reject(err) : resolve(data); + }); + }); + } + try { + const upgradeHandler = new UpgradeHandler(opts, callback); + const upgradeOpts = { + ...opts, + method: opts.method || "GET", + upgrade: opts.protocol || "Websocket" + }; + this.dispatch(upgradeOpts, upgradeHandler); + } catch (err) { + if (typeof callback !== "function") { + throw err; + } + const opaque = opts?.opaque; + queueMicrotask(() => callback(err, { opaque })); + } + } + __name(upgrade, "upgrade"); + module2.exports = upgrade; + } +}); + +// lib/api/api-connect.js +var require_api_connect = __commonJS({ + "lib/api/api-connect.js"(exports2, module2) { + "use strict"; + var assert = require("node:assert"); + var { AsyncResource } = require("node:async_hooks"); + var { InvalidArgumentError, SocketError } = require_errors(); + var util = require_util(); + var { addSignal, removeSignal } = require_abort_signal(); + var ConnectHandler = class extends AsyncResource { + static { + __name(this, "ConnectHandler"); + } + constructor(opts, callback) { + if (!opts || typeof opts !== "object") { + throw new InvalidArgumentError("invalid opts"); + } + if (typeof callback !== "function") { + throw new InvalidArgumentError("invalid callback"); + } + const { signal, opaque, responseHeaders } = opts; + if (signal && typeof signal.on !== "function" && typeof signal.addEventListener !== "function") { + throw new InvalidArgumentError("signal must be an EventEmitter or EventTarget"); + } + super("UNDICI_CONNECT"); + this.opaque = opaque || null; + this.responseHeaders = responseHeaders || null; + this.callback = callback; + this.abort = null; + addSignal(this, signal); + } + onConnect(abort, context) { + if (this.reason) { + abort(this.reason); + return; + } + assert(this.callback); + this.abort = abort; + this.context = context; + } + onHeaders() { + throw new SocketError("bad connect", null); + } + onUpgrade(statusCode, rawHeaders, socket) { + const { callback, opaque, context } = this; + removeSignal(this); + this.callback = null; + let headers = rawHeaders; + if (headers != null) { + headers = this.responseHeaders === "raw" ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders); + } + this.runInAsyncScope(callback, null, null, { + statusCode, + headers, + socket, + opaque, + context + }); + } + onError(err) { + const { callback, opaque } = this; + removeSignal(this); + if (callback) { + this.callback = null; + queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }); + }); + } + } + }; + function connect(opts, callback) { + if (callback === void 0) { + return new Promise((resolve, reject) => { + connect.call(this, opts, (err, data) => { + return err ? reject(err) : resolve(data); + }); + }); + } + try { + const connectHandler = new ConnectHandler(opts, callback); + const connectOptions = { ...opts, method: "CONNECT" }; + this.dispatch(connectOptions, connectHandler); + } catch (err) { + if (typeof callback !== "function") { + throw err; + } + const opaque = opts?.opaque; + queueMicrotask(() => callback(err, { opaque })); + } + } + __name(connect, "connect"); + module2.exports = connect; + } +}); + +// lib/api/index.js +var require_api = __commonJS({ + "lib/api/index.js"(exports2, module2) { + "use strict"; + module2.exports.request = require_api_request(); + module2.exports.stream = require_api_stream(); + module2.exports.pipeline = require_api_pipeline(); + module2.exports.upgrade = require_api_upgrade(); + module2.exports.connect = require_api_connect(); + } +}); + // index-fetch.js var { getGlobalDispatcher, setGlobalDispatcher } = require_global2(); var EnvHttpProxyAgent = require_env_http_proxy_agent(); @@ -14143,6 +15368,9 @@ module.exports.ErrorEvent = ErrorEvent; module.exports.MessageEvent = MessageEvent; module.exports.createFastMessageEvent = createFastMessageEvent; module.exports.EventSource = require_eventsource().EventSource; +var Dispatcher = require_dispatcher(); +var api = require_api(); +Object.assign(Dispatcher.prototype, api); module.exports.EnvHttpProxyAgent = EnvHttpProxyAgent; module.exports.getGlobalDispatcher = getGlobalDispatcher; module.exports.setGlobalDispatcher = setGlobalDispatcher; diff --git a/lib/internal/process/pre_execution.js b/lib/internal/process/pre_execution.js index 109890e5986ee4..c0c8397b46888e 100644 --- a/lib/internal/process/pre_execution.js +++ b/lib/internal/process/pre_execution.js @@ -117,6 +117,7 @@ function prepareExecution(options) { initializeDeprecations(); require('internal/dns/utils').initializeDns(); + setupHttpProxy(); if (isMainThread) { assert(internalBinding('worker').isMainThread); @@ -152,6 +153,17 @@ function prepareExecution(options) { return mainEntry; } +function setupHttpProxy() { + if (process.env.NODE_USE_ENV_PROXY && + (process.env.HTTP_PROXY || process.env.HTTPS_PROXY)) { + const { setGlobalDispatcher, EnvHttpProxyAgent } = require('internal/deps/undici/undici'); + const envHttpProxyAgent = new EnvHttpProxyAgent(); + setGlobalDispatcher(envHttpProxyAgent); + // TODO(joyeecheung): handle http/https global agents and perhaps Agent constructor + // behaviors. + } +} + function setupUserModules(forceDefaultLoader = false) { initializeCJSLoader(); initializeESMLoader(forceDefaultLoader); diff --git a/test/fixtures/fetch-and-log.mjs b/test/fixtures/fetch-and-log.mjs new file mode 100644 index 00000000000000..6907ea74832437 --- /dev/null +++ b/test/fixtures/fetch-and-log.mjs @@ -0,0 +1,4 @@ +const address = process.env.SERVER_ADDRESS; +const response = await fetch(address); +const body = await response.text(); +console.log(body); diff --git a/test/fixtures/proxy-handler.js b/test/fixtures/proxy-handler.js new file mode 100644 index 00000000000000..44c41f54f4129f --- /dev/null +++ b/test/fixtures/proxy-handler.js @@ -0,0 +1,22 @@ +const net = require('net'); + +exports.onConnect = function (req, clientSocket, head) { + const [hostname, port] = req.url.split(':'); + + const serverSocket = net.connect(port, hostname, () => { + clientSocket.write( + 'HTTP/1.1 200 Connection Established\r\n' + + 'Proxy-agent: Node.js-Proxy\r\n' + + '\r\n' + ); + serverSocket.write(head); + clientSocket.pipe(serverSocket); + serverSocket.pipe(clientSocket); + }); + + serverSocket.on('error', (err) => { + console.error('Error on CONNECT tunnel:', err.message); + clientSocket.write('HTTP/1.1 500 Connection Error\r\n\r\n'); + clientSocket.end(); + }); +}; diff --git a/test/parallel/test-http-proxy-fetch.js b/test/parallel/test-http-proxy-fetch.js new file mode 100644 index 00000000000000..e60c75c55a8b1a --- /dev/null +++ b/test/parallel/test-http-proxy-fetch.js @@ -0,0 +1,64 @@ +'use strict'; + +const common = require('../common'); +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const { spawn } = require('child_process'); +const http = require('http'); +const { onConnect } = require('../fixtures/proxy-handler'); + +// Start a server to process the final request. +const server = http.createServer((req, res) => { + res.end('Hello world'); +}); +server.on('error', (err) => { console.log('Server error', err); }); + +server.listen(0, common.mustCall(() => { + // Start a proxy server to tunnel the request. + const proxy = http.createServer(); + // If the request does not go through the proxy server, common.mustCall fails. + proxy.on('connect', common.mustCall((req, clientSocket, head) => { + console.log('Proxying CONNECT', req.url, req.headers); + assert.strictEqual(req.url, `localhost:${server.address().port}`); + onConnect(req, clientSocket, head); + })); + proxy.on('error', (err) => { console.log('Proxy error', err); }); + + proxy.listen(0, common.mustCall(() => { + const proxyAddress = `http://localhost:${proxy.address().port}`; + const serverAddress = `http://localhost:${server.address().port}`; + const child = spawn(process.execPath, + [fixtures.path('fetch-and-log.mjs')], + { + env: { + ...process.env, + HTTP_PROXY: proxyAddress, + NODE_USE_ENV_PROXY: true, + SERVER_ADDRESS: serverAddress, + }, + }); + + const stderr = []; + const stdout = []; + child.stderr.on('data', (chunk) => { + stderr.push(chunk); + }); + child.stdout.on('data', (chunk) => { + stdout.push(chunk); + }); + + child.on('exit', common.mustCall(function(code, signal) { + proxy.close(); + server.close(); + + console.log('--- stderr ---'); + console.log(Buffer.concat(stderr).toString()); + console.log('--- stdout ---'); + const stdoutStr = Buffer.concat(stdout).toString(); + console.log(stdoutStr); + assert.strictEqual(stdoutStr.trim(), 'Hello world'); + assert.strictEqual(code, 0); + assert.strictEqual(signal, null); + })); + })); +}));