Skip to content

Conversation

Raiden1411
Copy link
Contributor

Closes #24937

This PR updates the websocket server implementation to support larger messages and fragmented ones too.

I have also tested this implementation against the autobahn test suite and it passes on all cases expect the compression tests which this implementation doesn't support. Once std.compress.flate.Compress is re-implemented then it can be added too.
I can also share the code used for the test if needed.

If you think this PR goes against the original design/goal feel free to close it.

@scottredig
Copy link
Contributor

I can also share the code used for the test if needed.

I'm rewriting the implementation for my project to use the new Io Reader and Writer. I've explained more in the issue. So having that for my own testing would be useful, if you would please share it.

@Raiden1411
Copy link
Contributor Author

Sure no problem! This was the code used for the test.

const std = @import("std");

const Allocator = std.mem.Allocator;
const Server = std.http.Server;

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.detectLeaks();

    const allocator = gpa.allocator();
    
    var auto = try AutoBanh.init(allocator, 9224);
    defer auto.deinit();

    auto.serve();
}

const AutoBanh = struct {
    gpa: Allocator,
    client: std.net.Server,

    pub fn init(gpa: Allocator, port: u16) !AutoBanh {
        const address = try std.net.Address.parseIp("127.0.0.1", port);
        const server = try address.listen(.{ .reuse_address = true });

        return .{
            .gpa = gpa,
            .client = server,
        };
    }

    pub fn serve(self: *AutoBanh) void {
        while (true) {
            const connection = self.client.accept() catch |err| {
                std.log.err("failed to accept connection: {s}", .{@errorName(err)});
                return;
            };
            _ = std.Thread.spawn(.{}, accept, .{ self, connection }) catch |err| {
                std.log.err("unable to spawn connection thread: {s}", .{@errorName(err)});
                connection.stream.close();
                continue;
            };
        }
    }

    fn accept(ws: *AutoBanh, connection: std.net.Server.Connection) void {
        defer connection.stream.close();

        var send_buffer: [4096]u8 = undefined;
        var recv_buffer: [8192]u8 = undefined;
        var connection_reader = connection.stream.reader(&recv_buffer);
        var connection_writer = connection.stream.writer(&send_buffer);
        var server: std.http.Server = .init(connection_reader.interface(), &connection_writer.interface);

        var request = server.receiveHead() catch |err| switch (err) {
            error.HttpConnectionClosing => return,
            else => return std.log.err("failed to receive http request: {t}", .{err}),
        };
        switch (request.upgradeRequested()) {
            .websocket => |opt_key| {
                const key = opt_key orelse return std.log.err("missing websocket key", .{});
                var web_socket = request.respondWebSocket(.{ .key = key, .allocator = ws.gpa }) catch {
                    return std.log.err("failed to respond web socket: {t}", .{connection_writer.err.?});
                };
                defer web_socket.deinit();

                web_socket.flush() catch {};

                return serveWebSocket(&web_socket) catch |err| {
                    std.log.err("failed to serve websocket: {t}", .{err});
                    return;
                };
            },
            .other => |name| return std.log.err("unknown upgrade request: {s}", .{name}),
            .none => unreachable,
        }
    }

    pub fn deinit(self: *AutoBanh) void {
        self.client.deinit();
    }

    pub fn serveWebSocket(websocket: *std.http.Server.WebSocket) !void {
        while (true) {
            const message = websocket.readMessage() catch |err| switch (err) {
                error.EndOfStream => {
                    try websocket.writeCloseFrame(.{ .exit_code = 1002 });
                    return;
                },
                error.InvalidUtf8Payload => {
                    try websocket.writeCloseFrame(.{ .exit_code = 1007 });
                    return err;
                },
                else => {
                    try websocket.writeCloseFrame(.{ .exit_code = 1002 });
                    return err;
                },
            };

            switch (message.opcode) {
                .binary,
                => try websocket.writeFrame(message.data, .binary),
                .text,
                => try websocket.writeFrame(message.data, .text),
                .ping,
                => try websocket.writeFrame(message.data, .pong),
                .connection_close,
                => return websocket.writeCloseFrame(.{ .exit_code = 0 }),
                // Ignore unsolicited pong messages.
                .pong,
                => continue,
                else => return error.UnexpectedOpcode,
            }
        }
    }
};

@Raiden1411
Copy link
Contributor Author

@scottredig

@scottredig
Copy link
Contributor

scottredig commented Aug 26, 2025 via email

This commit addresses the ziglang#24937 issue and updates the websocket
implementation to support both fragments and longer messages.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Websockets rejecting any fragmented frames with MessageTooBig is problematic
2 participants