Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/noun/trace.c
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,6 @@ u3t_sstack_init()

stk_u->off_w = 0;
stk_u->fow_w = 0;
u3t_sstack_push(c3__root);
#endif
}

Expand Down
226 changes: 207 additions & 19 deletions pkg/vere/io/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ typedef struct _u3_h2o_serv {
u3_noun ses; // valid session tokens
struct _u3_hreq* seq_u; // open slog requests
struct _u3_hreq* siq_u; // open spin requests
struct _u3_hreq* soq_u; // open spog requests
uv_timer_t* sit_u; // stream heartbeat
uv_timer_t* sin_u; // spinner stream timer
uv_timer_t* son_u; // spog stream timer
} u3_hfig;

/* u3_httd: general http device
Expand All @@ -128,6 +130,7 @@ static void _http_start_respond(u3_hreq* req_u,
u3_noun data,
u3_noun complete);
static void _http_spin_timer_cb(uv_timer_t* tim_u);
static void _http_spog_timer_cb(uv_timer_t* tim_u);

static const c3_i TCP_BACKLOG = 16;
static const c3_w HEARTBEAT_TIMEOUT = 20 * 1000;
Expand Down Expand Up @@ -484,6 +487,24 @@ _http_seq_link(u3_hcon* hon_u, u3_hreq* req_u)
fig_u->seq_u = req_u;
}

/* _http_spog_link(): store spog stream request in state
*/
static void
_http_spog_link(u3_hcon* hon_u, u3_hreq* req_u)
{
u3_hfig* fig_u = &hon_u->htp_u->htd_u->fig_u;
req_u->hon_u = hon_u;
req_u->seq_l = hon_u->seq_l++;
req_u->nex_u = fig_u->soq_u;

if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = req_u;
} else {
uv_timer_start(fig_u->son_u, _http_spog_timer_cb, SPIN_TIMER, 0);
}
fig_u->soq_u = req_u;
}

/* _http_spin_unlink(): remove spin stack request from state
*/
static void
Expand Down Expand Up @@ -531,6 +552,31 @@ _http_seq_unlink(u3_hreq* req_u)
}
}

/* _http_spog_unlink(): remove spog stack request from state
*/
static void
_http_spog_unlink(u3_hreq* req_u)
{
u3_hfig* fig_u = &req_u->hon_u->htp_u->htd_u->fig_u;
if ( 0 != req_u->pre_u ) {
req_u->pre_u->nex_u = req_u->nex_u;

if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = req_u->pre_u;
}
}
else {
fig_u->soq_u = req_u->nex_u;

if ( 0 != req_u->nex_u ) {
req_u->nex_u->pre_u = 0;
}
else if (req_u->tim_u != NULL) {
uv_timer_stop(req_u->tim_u);
}
}
}

/* _http_req_to_duct(): translate srv/con/req to duct
*/
static u3_noun
Expand Down Expand Up @@ -616,6 +662,16 @@ _http_seq_done(void* ptr_v)
_http_seq_unlink(seq_u);
}

/* _http_spog_done(): spog stream request finished, deallocation callback
*/
static void
_http_spog_done(void* ptr_v)
{
u3_hreq* spog_u = (u3_hreq*)ptr_v;
_http_req_close(spog_u);
_http_spog_unlink(spog_u);
}

static void
_http_hgen_send(u3_hgen* gen_u);

Expand Down Expand Up @@ -709,6 +765,21 @@ _http_seq_new(u3_hcon* hon_u, h2o_req_t* rec_u)

return req_u;
}
/* _http_spog_new(): receive spog stream http request.
*/
static u3_hreq*
_http_spog_new(u3_hcon* hon_u, h2o_req_t* rec_u)
{
u3_hreq* req_u = h2o_mem_alloc_shared(&rec_u->pool, sizeof(*req_u),
_http_spog_done);
memset(req_u, 0, sizeof(*req_u));
req_u->rec_u = rec_u;
req_u->sat_e = u3_rsat_plan;

_http_spog_link(hon_u, req_u);

return req_u;
}

static void
_http_cache_respond(u3_hreq* req_u, u3_noun nun);
Expand Down Expand Up @@ -1622,6 +1693,43 @@ _http_seq_accept(h2o_handler_t* han_u, h2o_req_t* rec_u)
return 0;
}

/* _http_spog_accept(): handle incoming http request on spogstream endpoint
*/
static c3_i
_http_spog_accept(h2o_handler_t* han_u, h2o_req_t* rec_u)
{
u3_hcon* hon_u = _http_rec_sock(rec_u);
c3_o aut_o = _http_req_is_auth(&hon_u->htp_u->htd_u->fig_u, rec_u);

// if the request is not authenticated, reject it
//
if ( c3n == aut_o ) {
u3_hreq* req_u = _http_req_prepare(rec_u, _http_req_new);
req_u->sat_e = u3_rsat_plan;
_http_start_respond(req_u, 403, u3_nul, u3_nul, c3y);
}
// if it is authenticated, send slogstream/sse headers
//
else {
u3_hreq* req_u = _http_req_prepare(rec_u, _http_spog_new);
u3_noun hed = u3nl(u3nc(u3i_string("Content-Type"),
u3i_string("text/event-stream")),
u3nc(u3i_string("Cache-Control"),
u3i_string("no-cache")),
u3nc(u3i_string("Connection"),
u3i_string("keep-alive")),
u3_none);

_http_start_respond(req_u, 200, hed, u3_nul, c3n);

//TODO auth token may expire at some point. if we want to close the
// slogstream when that happens, we need to store the token that
// was used alongside it...
}

return 0;
}

/* _http_sat_accept(): handle incoming http request on status endpoint
*/
static c3_i
Expand Down Expand Up @@ -2174,6 +2282,11 @@ _http_serv_init_h2o(SSL_CTX* tls_u, c3_o log, c3_o red)
pac_u = h2o_config_register_path(h2o_u->hos_u, "/~_~/slog", 0);
han_u = h2o_create_handler(pac_u, sizeof(*han_u));
han_u->on_req = _http_seq_accept;
// spog stream
//
pac_u = h2o_config_register_path(h2o_u->hos_u, "/~_~/spog", 0);
han_u = h2o_create_handler(pac_u, sizeof(*han_u));
han_u->on_req = _http_spog_accept;

// status (per spinner)
//
Expand Down Expand Up @@ -2778,10 +2891,12 @@ _http_stream_slog(void* vop_p, c3_w pri_w, u3_noun tan)
{
u3_httd* htd_u = (u3_httd*)vop_p;
u3_hreq* seq_u = htd_u->fig_u.seq_u;
u3_hreq* soq_u = htd_u->fig_u.soq_u;


// only do the work if there are open slog streams
//
if ( 0 != seq_u ) {
if ( 0 != seq_u || 0 != soq_u ) {
u3_weak data = u3_none;

if ( c3y == u3a_is_atom(tan) ) {
Expand Down Expand Up @@ -2835,6 +2950,10 @@ _http_stream_slog(void* vop_p, c3_w pri_w, u3_noun tan)
_http_continue_respond(seq_u, u3k(data), c3n);
seq_u = seq_u->nex_u;
}
while ( 0 != soq_u ) {
_http_continue_respond(soq_u, u3k(data), c3n);
soq_u = soq_u->nex_u;
}
}

u3z(data);
Expand All @@ -2858,49 +2977,113 @@ _http_spin_timer_cb(uv_timer_t* tim_u)
if ( NULL == stk_u ) return;
c3_w pos_w = stk_u->off_w;
c3_w out_w = 0;
c3_w zis_w = sizeof(c3_w);

while (pos_w > 4) {
while (pos_w > 0) {
c3_w len_w;
pos_w -=4;
pos_w -= zis_w;

if ( siz_w < out_w + 4 ) {
if ( siz_w < out_w + zis_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}

memcpy(&len_w, &stk_u->dat_y[pos_w], 4);
memcpy(&len_w, &stk_u->dat_y[pos_w], zis_w);
pos_w -= len_w;

if ( siz_w < out_w + 4 ) {
if ( siz_w < out_w + zis_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}
buf_c[out_w++] = '/';

if ( siz_w < out_w + len_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}

memcpy(buf_c + out_w, &stk_u->dat_y[pos_w], len_w);
out_w += len_w;
}
buf_c[out_w++] = '/';
buf_c[out_w] = '\0';

if ( 0 != stk_u->off_w ) {
u3_noun tan = u3i_string(buf_c);
u3_noun lin = u3i_list(u3i_string("data:"),
tan,
c3_s2('\n', '\n'),
u3_none);
u3_atom txt = u3qc_rap(3, lin);
u3_noun dat = u3nt(u3_nul, u3r_met(3, txt), txt);
u3_noun tan = u3i_string(buf_c);
u3_noun lin = u3i_list(u3i_string("data:"),
tan,
c3_s2('\n', '\n'),
u3_none);
u3_atom txt = u3qc_rap(3, lin);
u3_noun dat = u3nt(u3_nul, u3r_met(3, txt), txt);
while ( 0 != siq_u ) {
_http_continue_respond(siq_u, u3k(dat), c3n);
siq_u = siq_u->nex_u;
}
u3z(dat); u3z(lin);
uv_timer_start(htd_u->fig_u.sin_u, _http_spin_timer_cb,
SPIN_TIMER, 0);
}
}

while ( 0 != siq_u ) {
_http_continue_respond(siq_u, u3k(dat), c3n);
siq_u = siq_u->nex_u;
/* _http_spog_timer_cb(): send heartbeat to slog streams and restart timer
*/
static void
_http_spog_timer_cb(uv_timer_t* tim_u)
{
u3_httd* htd_u = tim_u->data;
u3_hreq* soq_u = htd_u->fig_u.soq_u;

if ( 0 != soq_u ) {
c3_w siz_w = 1024;
c3_c* buf_c = c3_malloc(siz_w);
u3t_spin* stk_u = htd_u->stk_u;
if ( NULL == stk_u ) return;
c3_w pos_w = stk_u->off_w;
c3_w out_w = 0;
c3_w zis_w = sizeof(c3_w);

while (pos_w > 0) {
c3_w len_w;
pos_w -= zis_w;

if ( siz_w < out_w + zis_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}

memcpy(&len_w, &stk_u->dat_y[pos_w], zis_w);
pos_w -= len_w;

if ( siz_w < out_w + zis_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}
u3z(dat); u3z(lin);
buf_c[out_w++] = '/';

if ( siz_w < out_w + len_w ) {
buf_c = c3_realloc(buf_c, siz_w*2);
siz_w *= 2;
}

memcpy(buf_c + out_w, &stk_u->dat_y[pos_w], len_w);
out_w += len_w;
}
uv_timer_start(htd_u->fig_u.sin_u, _http_spin_timer_cb,
buf_c[out_w++] = '/';
buf_c[out_w] = '\0';

u3_noun tan = u3i_string(buf_c);
u3_noun lin = u3i_list(u3i_string("spin:"),
tan,
c3_s2('\n', '\n'),
u3_none);
u3_atom txt = u3qc_rap(3, lin);
u3_noun dat = u3nt(u3_nul, u3r_met(3, txt), txt);
while ( 0 != soq_u ) {
_http_continue_respond(soq_u, u3k(dat), c3n);
soq_u = soq_u->nex_u;
}
u3z(dat); u3z(lin);
uv_timer_start(htd_u->fig_u.son_u, _http_spog_timer_cb,
SPIN_TIMER, 0);
}
}
Expand Down Expand Up @@ -3168,6 +3351,11 @@ u3_http_io_init(u3_pier* pir_u)
uv_timer_init(u3L, sin_u);
htd_u->fig_u.sin_u = sin_u;

uv_timer_t* son_u = c3_malloc(sizeof(*son_u));
son_u->data = htd_u;
uv_timer_init(u3L, son_u);
htd_u->fig_u.son_u = son_u;

{
u3_noun now;
struct timeval tim_u;
Expand Down