diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index a594026a885..2a4a9fb54a5 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -792,11 +792,11 @@ interface EventDispatcher @0xf20697475ec1752d { interface WorkerdBootstrap { # Bootstrap interface exposed by workerd when serving Cap'n Proto RPC. - startEvent @0 () -> (dispatcher :EventDispatcher); + startEvent @0 (cfBlobJson :Text) -> (dispatcher :EventDispatcher); # Start a new event. Exactly one event should be delivered to the returned EventDispatcher. # - # TODO(someday): Pass cfBlobJson? Currently doesn't matter since the cf blob is only present for - # HTTP requests which can be delivered over regular HTTP instead of capnp. + # If the event is an HTTP request, `cfBlobJson` optionally carries the JSON-encoded `request.cf` + # object. The dispatcher will pass it through to the worker via SubrequestMetadata. } interface WorkerdDebugPort { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 43ac5bd61e4..c3c6e5393a9 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -4929,14 +4929,17 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server { httpOverCapnpFactory(httpOverCapnpFactory) {} kj::Promise startEvent(StartEventContext context) override { - // TODO(someday): Use cfBlobJson from the connection if there is one, or from RPC params - // if we add that? (Note that if a connection-level cf blob exists, it should take - // priority; we should only accept a cf blob from the client if we have a cfBlobHeader - // configured, which hints that this service trusts the client to provide the cf blob.) - + // Extract the optional cf blob from the RPC params and pass it along with the + // service channel to EventDispatcherImpl. The cf blob will be included in + // SubrequestMetadata when creating the WorkerInterface for HTTP events. + kj::Maybe cfBlobJson; + auto params = context.getParams(); + if (params.hasCfBlobJson()) { + cfBlobJson = kj::str(params.getCfBlobJson()); + } context.initResults(capnp::MessageSize{4, 1}) - .setDispatcher( - kj::heap(httpOverCapnpFactory, service->startRequest({}))); + .setDispatcher(kj::heap( + httpOverCapnpFactory, kj::addRef(*service), kj::mv(cfBlobJson))); return kj::READY_NOW; } @@ -4946,14 +4949,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server { class EventDispatcherImpl final: public rpc::EventDispatcher::Server { public: - EventDispatcherImpl( - capnp::HttpOverCapnpFactory& httpOverCapnpFactory, kj::Own worker) + EventDispatcherImpl(capnp::HttpOverCapnpFactory& httpOverCapnpFactory, + kj::Own service, + kj::Maybe cfBlobJson) : httpOverCapnpFactory(httpOverCapnpFactory), - worker(kj::mv(worker)) {} + service(kj::mv(service)), + cfBlobJson(kj::mv(cfBlobJson)) {} kj::Promise getHttpService(GetHttpServiceContext context) override { + // Create WorkerInterface with cf blob metadata (if provided via startEvent). + IoChannelFactory::SubrequestMetadata metadata; + KJ_IF_SOME(cf, cfBlobJson) { + metadata.cfBlobJson = kj::str(cf); + } + auto worker = getService()->startRequest(kj::mv(metadata)); context.initResults(capnp::MessageSize{4, 1}) - .setHttp(httpOverCapnpFactory.kjToCapnp(getWorker())); + .setHttp(httpOverCapnpFactory.kjToCapnp(kj::mv(worker))); return kj::READY_NOW; } @@ -5013,15 +5024,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server { private: capnp::HttpOverCapnpFactory& httpOverCapnpFactory; - kj::Maybe> worker; + kj::Maybe> service; + kj::Maybe cfBlobJson; - kj::Own getWorker() { + kj::Own getService() { auto result = - kj::mv(KJ_ASSERT_NONNULL(worker, "EventDispatcher can only be used for one request")); - worker = kj::none; + kj::mv(KJ_ASSERT_NONNULL(service, "EventDispatcher can only be used for one request")); + service = kj::none; return result; } + kj::Own getWorker() { + // For non-HTTP events (RPC, traces, etc.), create WorkerInterface with + // empty metadata since there's no HTTP request to extract cf from. + return getService()->startRequest({}); + } + [[noreturn]] void throwUnsupported() { JSG_FAIL_REQUIRE(Error, "RPC connections don't yet support this event type."); } diff --git a/src/workerd/server/workerd-debug-port-client.c++ b/src/workerd/server/workerd-debug-port-client.c++ index 66c60989093..618de0c8702 100644 --- a/src/workerd/server/workerd-debug-port-client.c++ +++ b/src/workerd/server/workerd-debug-port-client.c++ @@ -31,7 +31,13 @@ class WorkerdBootstrapSubrequestChannel final: public IoChannelFactory::Subreque connectionState(kj::mv(connectionState)) {} kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { - auto dispatcher = bootstrap.startEventRequest().send().getDispatcher(); + // Pass cfBlobJson as an RPC parameter on startEvent so the server can include it + // in SubrequestMetadata when creating the WorkerInterface. + auto req = bootstrap.startEventRequest(); + KJ_IF_SOME(cf, metadata.cfBlobJson) { + req.setCfBlobJson(cf); + } + auto dispatcher = req.send().getDispatcher(); // Attach connection ref for deferred proxying - the HTTP response body/WebSocket // will get this WorkerInterface attached, keeping the connection alive. return kj::heap(httpOverCapnpFactory, byteStreamFactory, kj::mv(dispatcher))