Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 146 additions & 20 deletions pkg/vere/io/cttp.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
h2o_http1client_t* cli_u; // h2o client
u3_csat sat_e; // connection state
c3_o sec; // yes == https
c3_o stm_o; // yes == streaming response
c3_w ipf_w; // IP
c3_c* ipf_c; // IP (string)
c3_c* hot_c; // host
Expand Down Expand Up @@ -535,6 +536,36 @@ _cttp_creq_free(u3_creq* ceq_u)
c3_free(ceq_u);
}

/* _cttp_creq_is_streaming(): check for x-urbit-stream header, strip if found
*/
static c3_o
_cttp_creq_is_streaming(u3_hhed** hed_u)
{
u3_hhed** cur_u = hed_u;

while ( *cur_u ) {
u3_hhed* hes_u = *cur_u;

// check for x-urbit-stream (case-insensitive)
//
if ( (14 == hes_u->nam_w) &&
(0 == strncasecmp(hes_u->nam_c, "x-urbit-stream", 14)) )
{
// strip the header
//
*cur_u = hes_u->nex_u;
c3_free(hes_u->nam_c);
c3_free(hes_u->val_c);
c3_free(hes_u);
return c3y;
}

cur_u = &((*cur_u)->nex_u);
}

return c3n;
}

/* _cttp_creq_new(): create a u3_creq from an +http-request
*
* If we were rewriting all of this from scratch, this isn't how we'd do it.
Expand Down Expand Up @@ -594,6 +625,7 @@ _cttp_creq_new(u3_cttp* ctp_u, c3_l num_l, u3_noun hes)
ceq_u->url_c = _cttp_creq_url(u3k(pul));

ceq_u->hed_u = _cttp_heds_from_noun(u3k(headers));
ceq_u->stm_o = _cttp_creq_is_streaming(&ceq_u->hed_u);

if ( u3_nul != body ) {
ceq_u->bod_u = _cttp_bod_from_octs(u3k(u3t(body)));
Expand Down Expand Up @@ -727,18 +759,75 @@ _cttp_http_client_receive(u3_creq* ceq_u, c3_w sas_w, u3_noun mes, u3_noun uct)
u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad));
}

/* _cttp_creq_fail(): dispatch error response
/* _cttp_creq_send_start(): send %start event (headers arrived)
*/
static void
_cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c)
_cttp_creq_send_start(u3_creq* ceq_u, c3_w sas_w, u3_noun hed, u3_noun uct, c3_o fin_o)
{
// XX anything other than a 504?
c3_w cod_w = 504;
u3_cttp* ctp_u = ceq_u->ctp_u;

u3l_log("http: fail (%d, %d): %s", ceq_u->num_l, cod_w, err_c);
u3_noun wir = u3nt(u3i_string("http-client"),
u3dc("scot", c3__uv, ctp_u->sev_l),
u3_nul);
u3_noun cad = u3nt(u3i_string("receive"),
ceq_u->num_l,
u3nq(u3i_string("start"), u3nc(sas_w, hed), uct, fin_o));

u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad));
}

/* _cttp_creq_send_continue(): send %continue event (body chunk)
*/
static void
_cttp_creq_send_continue(u3_creq* ceq_u, u3_noun uct, c3_o fin_o)
{
u3_cttp* ctp_u = ceq_u->ctp_u;

// XX include err_c as response body?
_cttp_http_client_receive(ceq_u, cod_w, u3_nul, u3_nul);
u3_noun wir = u3nt(u3i_string("http-client"),
u3dc("scot", c3__uv, ctp_u->sev_l),
u3_nul);
u3_noun cad = u3nt(u3i_string("receive"),
ceq_u->num_l,
u3nt(u3i_string("continue"), uct, fin_o));

u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad));
}

/* _cttp_creq_send_cancel(): send %cancel event
*/
static void
_cttp_creq_send_cancel(u3_creq* ceq_u)
{
u3_cttp* ctp_u = ceq_u->ctp_u;

u3_noun wir = u3nt(u3i_string("http-client"),
u3dc("scot", c3__uv, ctp_u->sev_l),
u3_nul);
u3_noun cad = u3nt(u3i_string("receive"),
ceq_u->num_l,
u3nc(u3i_string("cancel"), u3_nul));

u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad));
}

/* _cttp_creq_fail(): dispatch error response
*/
static void
_cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c)
{
if ( c3y == ceq_u->stm_o ) {
// streaming: send %cancel
//
u3l_log("http: fail (%d): %s", ceq_u->num_l, err_c);
_cttp_creq_send_cancel(ceq_u);
}
else {
// buffered: send fake 504
//
c3_w cod_w = 504;
u3l_log("http: fail (%d, %d): %s", ceq_u->num_l, cod_w, err_c);
_cttp_http_client_receive(ceq_u, cod_w, u3_nul, u3_nul);
}
_cttp_creq_free(ceq_u);
}

Expand Down Expand Up @@ -770,17 +859,37 @@ _cttp_creq_on_body(h2o_http1client_t* cli_u, const c3_c* err_c)
}

h2o_buffer_t* buf_u = cli_u->sock->input;
c3_o fin_o = ( h2o_http1client_error_is_eos == err_c ) ? c3y : c3n;

if ( c3y == ceq_u->stm_o ) {
// streaming: send %continue immediately
//
if ( buf_u->size || (c3y == fin_o) ) {
u3_noun uct = u3_nul;
if ( buf_u->size ) {
u3_noun oct = u3i_bytes(buf_u->size, (c3_y*)buf_u->bytes);
uct = u3nc(u3_nul, u3nc(buf_u->size, oct));
h2o_buffer_consume(&cli_u->sock->input, buf_u->size);
}
_cttp_creq_send_continue(ceq_u, uct, fin_o);
}

if ( buf_u->size ) {
_cttp_cres_fire_body(ceq_u->res_u,
_cttp_bod_new(buf_u->size, buf_u->bytes));
h2o_buffer_consume(&cli_u->sock->input, buf_u->size);
if ( c3y == fin_o ) {
_cttp_creq_free(ceq_u);
}
}
else {
// buffered: append to body queue
//
if ( buf_u->size ) {
_cttp_cres_fire_body(ceq_u->res_u,
_cttp_bod_new(buf_u->size, buf_u->bytes));
h2o_buffer_consume(&cli_u->sock->input, buf_u->size);
}

// We're using the end of stream thing here to queue event to urbit. we'll
// need to separate this into our own timer for partial progress sends.
if ( h2o_http1client_error_is_eos == err_c ) {
_cttp_creq_respond(ceq_u);
if ( c3y == fin_o ) {
_cttp_creq_respond(ceq_u);
}
}

return 0;
Expand All @@ -800,12 +909,29 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i,
return 0;
}

_cttp_cres_new(ceq_u, (c3_w)sas_i);
ceq_u->res_u->hed = _cttp_heds_to_noun(hed_u, hed_t);
if ( c3y == ceq_u->stm_o ) {
// streaming: send %start immediately
//
u3_noun hed = _cttp_heds_to_noun(hed_u, hed_t);
c3_o fin_o = ( h2o_http1client_error_is_eos == err_c ) ? c3y : c3n;

if ( h2o_http1client_error_is_eos == err_c ) {
_cttp_creq_respond(ceq_u);
return 0;
_cttp_creq_send_start(ceq_u, (c3_w)sas_i, hed, u3_nul, fin_o);

if ( c3y == fin_o ) {
_cttp_creq_free(ceq_u);
return 0;
}
}
else {
// buffered: store headers, wait for body
//
_cttp_cres_new(ceq_u, (c3_w)sas_i);
ceq_u->res_u->hed = _cttp_heds_to_noun(hed_u, hed_t);

if ( h2o_http1client_error_is_eos == err_c ) {
_cttp_creq_respond(ceq_u);
return 0;
}
}

return _cttp_creq_on_body;
Expand Down