diff --git a/pkg/vere/io/cttp.c b/pkg/vere/io/cttp.c index af0e88e9ee..353dd100c0 100644 --- a/pkg/vere/io/cttp.c +++ b/pkg/vere/io/cttp.c @@ -32,6 +32,7 @@ h2o_http1client_t* cli_u; // h2o client u3_csat sat_e; // connection state c3_o sec; // yes == https + c3_o stm_o; // yes == streaming response c3_w ipf_w; // IP c3_c* ipf_c; // IP (string) c3_c* hot_c; // host @@ -535,6 +536,36 @@ _cttp_creq_free(u3_creq* ceq_u) c3_free(ceq_u); } +/* _cttp_creq_is_streaming(): check for x-urbit-stream header, strip if found +*/ +static c3_o +_cttp_creq_is_streaming(u3_hhed** hed_u) +{ + u3_hhed** cur_u = hed_u; + + while ( *cur_u ) { + u3_hhed* hes_u = *cur_u; + + // check for x-urbit-stream (case-insensitive) + // + if ( (14 == hes_u->nam_w) && + (0 == strncasecmp(hes_u->nam_c, "x-urbit-stream", 14)) ) + { + // strip the header + // + *cur_u = hes_u->nex_u; + c3_free(hes_u->nam_c); + c3_free(hes_u->val_c); + c3_free(hes_u); + return c3y; + } + + cur_u = &((*cur_u)->nex_u); + } + + return c3n; +} + /* _cttp_creq_new(): create a u3_creq from an +http-request * * If we were rewriting all of this from scratch, this isn't how we'd do it. @@ -594,6 +625,7 @@ _cttp_creq_new(u3_cttp* ctp_u, c3_l num_l, u3_noun hes) ceq_u->url_c = _cttp_creq_url(u3k(pul)); ceq_u->hed_u = _cttp_heds_from_noun(u3k(headers)); + ceq_u->stm_o = _cttp_creq_is_streaming(&ceq_u->hed_u); if ( u3_nul != body ) { ceq_u->bod_u = _cttp_bod_from_octs(u3k(u3t(body))); @@ -727,18 +759,75 @@ _cttp_http_client_receive(u3_creq* ceq_u, c3_w sas_w, u3_noun mes, u3_noun uct) u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); } -/* _cttp_creq_fail(): dispatch error response +/* _cttp_creq_send_start(): send %start event (headers arrived) */ static void -_cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) +_cttp_creq_send_start(u3_creq* ceq_u, c3_w sas_w, u3_noun hed, u3_noun uct, c3_o fin_o) { - // XX anything other than a 504? - c3_w cod_w = 504; + u3_cttp* ctp_u = ceq_u->ctp_u; - u3l_log("http: fail (%d, %d): %s", ceq_u->num_l, cod_w, err_c); + u3_noun wir = u3nt(u3i_string("http-client"), + u3dc("scot", c3__uv, ctp_u->sev_l), + u3_nul); + u3_noun cad = u3nt(u3i_string("receive"), + ceq_u->num_l, + u3nq(u3i_string("start"), u3nc(sas_w, hed), uct, fin_o)); + + u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); +} + +/* _cttp_creq_send_continue(): send %continue event (body chunk) +*/ +static void +_cttp_creq_send_continue(u3_creq* ceq_u, u3_noun uct, c3_o fin_o) +{ + u3_cttp* ctp_u = ceq_u->ctp_u; - // XX include err_c as response body? - _cttp_http_client_receive(ceq_u, cod_w, u3_nul, u3_nul); + u3_noun wir = u3nt(u3i_string("http-client"), + u3dc("scot", c3__uv, ctp_u->sev_l), + u3_nul); + u3_noun cad = u3nt(u3i_string("receive"), + ceq_u->num_l, + u3nt(u3i_string("continue"), uct, fin_o)); + + u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); +} + +/* _cttp_creq_send_cancel(): send %cancel event +*/ +static void +_cttp_creq_send_cancel(u3_creq* ceq_u) +{ + u3_cttp* ctp_u = ceq_u->ctp_u; + + u3_noun wir = u3nt(u3i_string("http-client"), + u3dc("scot", c3__uv, ctp_u->sev_l), + u3_nul); + u3_noun cad = u3nt(u3i_string("receive"), + ceq_u->num_l, + u3nc(u3i_string("cancel"), u3_nul)); + + u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); +} + +/* _cttp_creq_fail(): dispatch error response +*/ +static void +_cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) +{ + if ( c3y == ceq_u->stm_o ) { + // streaming: send %cancel + // + u3l_log("http: fail (%d): %s", ceq_u->num_l, err_c); + _cttp_creq_send_cancel(ceq_u); + } + else { + // buffered: send fake 504 + // + c3_w cod_w = 504; + u3l_log("http: fail (%d, %d): %s", ceq_u->num_l, cod_w, err_c); + _cttp_http_client_receive(ceq_u, cod_w, u3_nul, u3_nul); + } _cttp_creq_free(ceq_u); } @@ -770,17 +859,37 @@ _cttp_creq_on_body(h2o_http1client_t* cli_u, const c3_c* err_c) } h2o_buffer_t* buf_u = cli_u->sock->input; + c3_o fin_o = ( h2o_http1client_error_is_eos == err_c ) ? c3y : c3n; + + if ( c3y == ceq_u->stm_o ) { + // streaming: send %continue immediately + // + if ( buf_u->size || (c3y == fin_o) ) { + u3_noun uct = u3_nul; + if ( buf_u->size ) { + u3_noun oct = u3i_bytes(buf_u->size, (c3_y*)buf_u->bytes); + uct = u3nc(u3_nul, u3nc(buf_u->size, oct)); + h2o_buffer_consume(&cli_u->sock->input, buf_u->size); + } + _cttp_creq_send_continue(ceq_u, uct, fin_o); + } - if ( buf_u->size ) { - _cttp_cres_fire_body(ceq_u->res_u, - _cttp_bod_new(buf_u->size, buf_u->bytes)); - h2o_buffer_consume(&cli_u->sock->input, buf_u->size); + if ( c3y == fin_o ) { + _cttp_creq_free(ceq_u); + } } + else { + // buffered: append to body queue + // + if ( buf_u->size ) { + _cttp_cres_fire_body(ceq_u->res_u, + _cttp_bod_new(buf_u->size, buf_u->bytes)); + h2o_buffer_consume(&cli_u->sock->input, buf_u->size); + } - // We're using the end of stream thing here to queue event to urbit. we'll - // need to separate this into our own timer for partial progress sends. - if ( h2o_http1client_error_is_eos == err_c ) { - _cttp_creq_respond(ceq_u); + if ( c3y == fin_o ) { + _cttp_creq_respond(ceq_u); + } } return 0; @@ -800,12 +909,29 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i, return 0; } - _cttp_cres_new(ceq_u, (c3_w)sas_i); - ceq_u->res_u->hed = _cttp_heds_to_noun(hed_u, hed_t); + if ( c3y == ceq_u->stm_o ) { + // streaming: send %start immediately + // + u3_noun hed = _cttp_heds_to_noun(hed_u, hed_t); + c3_o fin_o = ( h2o_http1client_error_is_eos == err_c ) ? c3y : c3n; - if ( h2o_http1client_error_is_eos == err_c ) { - _cttp_creq_respond(ceq_u); - return 0; + _cttp_creq_send_start(ceq_u, (c3_w)sas_i, hed, u3_nul, fin_o); + + if ( c3y == fin_o ) { + _cttp_creq_free(ceq_u); + return 0; + } + } + else { + // buffered: store headers, wait for body + // + _cttp_cres_new(ceq_u, (c3_w)sas_i); + ceq_u->res_u->hed = _cttp_heds_to_noun(hed_u, hed_t); + + if ( h2o_http1client_error_is_eos == err_c ) { + _cttp_creq_respond(ceq_u); + return 0; + } } return _cttp_creq_on_body;