Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,11 @@ interface EventDispatcher @0xf20697475ec1752d {
interface WorkerdBootstrap {
# Bootstrap interface exposed by workerd when serving Cap'n Proto RPC.

startEvent @0 () -> (dispatcher :EventDispatcher);
startEvent @0 (cfBlobJson :Text) -> (dispatcher :EventDispatcher);
# Start a new event. Exactly one event should be delivered to the returned EventDispatcher.
#
# TODO(someday): Pass cfBlobJson? Currently doesn't matter since the cf blob is only present for
# HTTP requests which can be delivered over regular HTTP instead of capnp.
# If the event is an HTTP request, `cfBlobJson` optionally carries the JSON-encoded `request.cf`
# object. The dispatcher will pass it through to the worker via SubrequestMetadata.
}

interface WorkerdDebugPort {
Expand Down
48 changes: 33 additions & 15 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4929,14 +4929,17 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {
httpOverCapnpFactory(httpOverCapnpFactory) {}

kj::Promise<void> startEvent(StartEventContext context) override {
// TODO(someday): Use cfBlobJson from the connection if there is one, or from RPC params
// if we add that? (Note that if a connection-level cf blob exists, it should take
// priority; we should only accept a cf blob from the client if we have a cfBlobHeader
// configured, which hints that this service trusts the client to provide the cf blob.)

// Extract the optional cf blob from the RPC params and pass it along with the
// service channel to EventDispatcherImpl. The cf blob will be included in
// SubrequestMetadata when creating the WorkerInterface for HTTP events.
kj::Maybe<kj::String> cfBlobJson;
auto params = context.getParams();
if (params.hasCfBlobJson()) {
cfBlobJson = kj::str(params.getCfBlobJson());
}
context.initResults(capnp::MessageSize{4, 1})
.setDispatcher(
kj::heap<EventDispatcherImpl>(httpOverCapnpFactory, service->startRequest({})));
.setDispatcher(kj::heap<EventDispatcherImpl>(
httpOverCapnpFactory, kj::addRef(*service), kj::mv(cfBlobJson)));
return kj::READY_NOW;
}

Expand All @@ -4946,14 +4949,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {

class EventDispatcherImpl final: public rpc::EventDispatcher::Server {
public:
EventDispatcherImpl(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory, kj::Own<WorkerInterface> worker)
EventDispatcherImpl(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
kj::Own<IoChannelFactory::SubrequestChannel> service,
kj::Maybe<kj::String> cfBlobJson)
: httpOverCapnpFactory(httpOverCapnpFactory),
worker(kj::mv(worker)) {}
service(kj::mv(service)),
cfBlobJson(kj::mv(cfBlobJson)) {}

kj::Promise<void> getHttpService(GetHttpServiceContext context) override {
// Create WorkerInterface with cf blob metadata (if provided via startEvent).
IoChannelFactory::SubrequestMetadata metadata;
KJ_IF_SOME(cf, cfBlobJson) {
metadata.cfBlobJson = kj::str(cf);
}
auto worker = getService()->startRequest(kj::mv(metadata));
context.initResults(capnp::MessageSize{4, 1})
.setHttp(httpOverCapnpFactory.kjToCapnp(getWorker()));
.setHttp(httpOverCapnpFactory.kjToCapnp(kj::mv(worker)));
return kj::READY_NOW;
}

Expand Down Expand Up @@ -5013,15 +5024,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {

private:
capnp::HttpOverCapnpFactory& httpOverCapnpFactory;
kj::Maybe<kj::Own<WorkerInterface>> worker;
kj::Maybe<kj::Own<IoChannelFactory::SubrequestChannel>> service;
kj::Maybe<kj::String> cfBlobJson;

kj::Own<WorkerInterface> getWorker() {
kj::Own<IoChannelFactory::SubrequestChannel> getService() {
auto result =
kj::mv(KJ_ASSERT_NONNULL(worker, "EventDispatcher can only be used for one request"));
worker = kj::none;
kj::mv(KJ_ASSERT_NONNULL(service, "EventDispatcher can only be used for one request"));
service = kj::none;
return result;
}

kj::Own<WorkerInterface> getWorker() {
// For non-HTTP events (RPC, traces, etc.), create WorkerInterface with
// empty metadata since there's no HTTP request to extract cf from.
return getService()->startRequest({});
}

[[noreturn]] void throwUnsupported() {
JSG_FAIL_REQUIRE(Error, "RPC connections don't yet support this event type.");
}
Expand Down
8 changes: 7 additions & 1 deletion src/workerd/server/workerd-debug-port-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ class WorkerdBootstrapSubrequestChannel final: public IoChannelFactory::Subreque
connectionState(kj::mv(connectionState)) {}

kj::Own<WorkerInterface> startRequest(IoChannelFactory::SubrequestMetadata metadata) override {
auto dispatcher = bootstrap.startEventRequest().send().getDispatcher();
// Pass cfBlobJson as an RPC parameter on startEvent so the server can include it
// in SubrequestMetadata when creating the WorkerInterface.
auto req = bootstrap.startEventRequest();
KJ_IF_SOME(cf, metadata.cfBlobJson) {
req.setCfBlobJson(cf);
}
auto dispatcher = req.send().getDispatcher();
// Attach connection ref for deferred proxying - the HTTP response body/WebSocket
// will get this WorkerInterface attached, keeping the connection alive.
return kj::heap<RpcWorkerInterface>(httpOverCapnpFactory, byteStreamFactory, kj::mv(dispatcher))
Expand Down
Loading