From 31f58a17cefae8b7fcf19ccee340790793401a5f Mon Sep 17 00:00:00 2001 From: antmat Date: Fri, 14 Aug 2015 12:47:05 +0300 Subject: [PATCH 1/2] [Feature] Add ability to run fully custom subrequests. --- CMakeLists.txt | 6 +- ugh/subreq_external.c | 106 ++++++++++++++++++++++++++++++++++ ugh/ugh.h | 19 ++++++ ugh_example/external_module.c | 75 ++++++++++++++++++++++++ 4 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 ugh/subreq_external.c create mode 100644 ugh_example/external_module.c diff --git a/CMakeLists.txt b/CMakeLists.txt index eef4cd3..0b68a88 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,8 +47,10 @@ INSTALL_TEMPLATE (ugh/autoconf.h.in DESTINATION include/ugh) # modules -AUX_SOURCE_DIRECTORY (ugh_example SRC_UGH_EXAMPLE) -ADD_LIBRARY (ugh_example MODULE ${SRC_UGH_EXAMPLE}) +ADD_LIBRARY (ugh_example MODULE ugh_example/module.c) INSTALL (TARGETS ugh_example DESTINATION ${LIBDIR}/ugh) INSTALL_TEMPLATE (ugh_example/config.cfg.in DESTINATION etc/ugh_example) +ADD_LIBRARY (ugh_external_example MODULE ugh_example/external_module.c) +INSTALL (TARGETS ugh_external_example DESTINATION ${LIBDIR}/ugh) + diff --git a/ugh/subreq_external.c b/ugh/subreq_external.c new file mode 100644 index 0000000..b3aabbb --- /dev/null +++ b/ugh/subreq_external.c @@ -0,0 +1,106 @@ +#include "ugh.h" + +static +void ugh_subreq_external_timeout(EV_P_ ev_timer *w, int tev) +{ + ugh_subreq_external_t *r = aux_memberof(ugh_subreq_external_t, wev_timeout, w); + ugh_subreq_external_del(r); +} + + +static +void ugh_subreq_wcb_ready(EV_P_ ev_async *w, int tev) +{ + ugh_subreq_external_t *r = aux_memberof(ugh_subreq_external_t, wev_ready, w); + + ugh_subreq_external_del(r); +} + +//static +//int ugh_subreq_external_copy_chunk(ugh_subreq_external_t *r, char *data, size_t size) +//{ +// if (r->chunk_body_size < r->body.size + size) +// { +// char *old_body = r->body.data; + +// r->chunk_body_size *= 2; +// r->body.data = aux_pool_nalloc(r->c->pool, r->chunk_body_size); + +// memcpy(r->body.data, old_body, r->body.size); +// } + +// memcpy(r->body.data + r->body.size, data, size); +// r->body.size += size; + +// return 0; +//} + + +ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c) +{ + ugh_subreq_external_t *r; + + aux_pool_link(c->pool); /* use c->pool for subreq allocations */ + + r = (ugh_subreq_external_t *) aux_pool_calloc(c->pool, sizeof(*r)); + + if (NULL == r) + { + aux_pool_free(c->pool); + return NULL; + } + + r->c = c; + + r->timeout = UGH_CONFIG_SUBREQ_TIMEOUT; /* XXX this should be "no timeout" by default */ + + log_debug("ugh_external_subreq_add"); + + return r; +} + +int ugh_subreq_external_set_timeout(ugh_subreq_external_t *r, ev_tstamp timeout, int timeout_type) +{ + r->timeout = timeout; + return 0; +} + +void ugh_subreq_external_signal_ready(ugh_subreq_external_t* ex_subreq) { + ev_async_send(loop, &ex_subreq->wev_ready); +} + +int ugh_subreq_external_run(ugh_subreq_external_t *r) +{ + r->c->wait++; + r->on_request_done = ugh_subreq_external_signal_ready; + ev_timer_init(&r->wev_timeout, &ugh_subreq_external_timeout, 0, r->timeout); + ev_async_init(&r->wev_ready, &ugh_subreq_wcb_ready); + + ev_timer_again(loop, &r->wev_timeout); + ev_async_start(loop, &r->wev_ready); + r->run_external(NULL, r); + return 0; +} + +int ugh_subreq_external_del(ugh_subreq_external_t *r) +{ + ev_timer_stop(loop, &r->wev_timeout); + ev_async_stop(loop, &r->wev_ready); + + r->c->wait--; + + /* We check is_main_coro here, because we could possibly call + * ugh_subreq_del from module coroutine (e.g. when IP-address of + * subrequest was definitely mallformed) and in this case we don't need + * to call coro_transfer + */ + if (0 == r->c->wait && is_main_coro) + { + is_main_coro = 0; + coro_transfer(&ctx_main, &r->c->ctx); + is_main_coro = 1; + } + aux_pool_free(r->c->pool); + + return 0; +} diff --git a/ugh/ugh.h b/ugh/ugh.h index 02c46cd..cb3a478 100644 --- a/ugh/ugh.h +++ b/ugh/ugh.h @@ -256,6 +256,9 @@ ugh_upstream_t *ugh_upstream_get(ugh_config_t *cfg, const char *name, size_t siz typedef struct ugh_subreq ugh_subreq_t; +typedef struct ugh_subreq_external + ugh_subreq_external_t; + typedef struct ugh_channel ugh_channel_t; @@ -267,6 +270,18 @@ typedef int (*ugh_subreq_handle_fp)(ugh_subreq_t *r, char *data, size_t size); #define UGH_RESPONSE_CHUNKED -1 #define UGH_RESPONSE_CLOSE_AFTER_BODY -2 +struct ugh_subreq_external +{ + void* external_data; + void(*on_request_done)(ugh_subreq_external_t*); + void(*run_external)(void* /*external data*/, ugh_subreq_external_t* /*subrequest to run*/); + ugh_client_t *c; + ev_async wev_ready; + ev_timer wev_timeout; + ev_tstamp timeout; + strt body; +}; + struct ugh_subreq { ugh_client_t *c; @@ -352,6 +367,8 @@ struct ugh_subreq #define UGH_SUBREQ_PUSH 2 ugh_subreq_t *ugh_subreq_add(ugh_client_t *c, char *url, size_t size, int flags); +ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c); + int ugh_subreq_set_method(ugh_subreq_t *r, unsigned char method); int ugh_subreq_set_body(ugh_subreq_t *r, char *body, size_t body_size); int ugh_subreq_set_timeout(ugh_subreq_t *r, ev_tstamp timeout, int timeout_type); @@ -363,8 +380,10 @@ int ugh_subreq_set_timeout_connect(ugh_subreq_t *r, ev_tstamp timeout); int ugh_subreq_set_channel(ugh_subreq_t *r, ugh_channel_t *ch, unsigned tag); int ugh_subreq_run(ugh_subreq_t *r); +int ugh_subreq_external_run(ugh_subreq_external_t *r); int ugh_subreq_gen(ugh_subreq_t *r, strp u_host); int ugh_subreq_del(ugh_subreq_t *r, uint32_t ft_type, int ft_errno); +int ugh_subreq_external_del(ugh_subreq_external_t *r); strp ugh_subreq_get_host(ugh_subreq_t *r); in_port_t ugh_subreq_get_port(ugh_subreq_t *r); diff --git a/ugh_example/external_module.c b/ugh_example/external_module.c new file mode 100644 index 0000000..a1f0c41 --- /dev/null +++ b/ugh_example/external_module.c @@ -0,0 +1,75 @@ +#include + +typedef struct +{ + ugh_template_t unused_configurable_parameter; +} ugh_module_ext_example_conf_t; + +extern ugh_module_t ugh_module_ext_example; + +void run_ex_request(void* data, ugh_subreq_external_t* ex_subreq) { + ex_subreq->body.size = 9; + ex_subreq->body.data = "It works"; + ex_subreq->on_request_done(ex_subreq); +} + +int ugh_module_ext_example_handle(ugh_client_t *c, void *data, strp body) +{ +// ugh_module_ext_example_conf_t *conf = data; /* get module conf */ + + ugh_subreq_external_t *e_subreq = ugh_subreq_external_add(c); + e_subreq->run_external = run_ex_request; + ugh_subreq_external_run(e_subreq); + + /* wait for it (do smth in different coroutine while it is being downloaded) */ + + ugh_subreq_wait(c); + + body->data = e_subreq->body.data; + body->size = e_subreq->body.size; + + return UGH_HTTP_OK; +} + +int ugh_module_ext_example_init(ugh_config_t *cfg, void *data) +{ + ugh_module_ext_example_conf_t *conf = data; + + log_info("ugh_module_ext_example_init (called for each added handle, each time server is starting), conf=%p", &conf); + + return 0; +} + +int ugh_module_ext_example_free(ugh_config_t *cfg, void *data) +{ + log_info("ugh_module_ext_example_free (called for each added handle, each time server is stopped)"); + + return 0; +} + +int ugh_command_ext_example(ugh_config_t *cfg, int argc, char **argv, ugh_command_t *cmd) +{ + ugh_module_ext_example_conf_t *conf; + + conf = aux_pool_malloc(cfg->pool, sizeof(*conf)); + if (NULL == conf) return -1; + + ugh_module_handle_add(ugh_module_ext_example, conf, ugh_module_ext_example_handle); + + return 0; +} + +static ugh_command_t ugh_module_ext_example_cmds [] = +{ + ugh_make_command(ext_example), + ugh_make_command_template(unused_configurable_parameter, ugh_module_ext_example_conf_t, unused_configurable_parameter), + ugh_null_command +}; + +ugh_module_t ugh_module_ext_example = +{ + ugh_module_ext_example_cmds, + ugh_module_ext_example_init, + ugh_module_ext_example_free +}; + From 37afa99e8bd4809d282980bd7797240211b8f132 Mon Sep 17 00:00:00 2001 From: antmat Date: Fri, 14 Aug 2015 13:56:55 +0300 Subject: [PATCH 2/2] [Fix] Remove unnescessary callback for simplicity. --- ugh/subreq_external.c | 28 ++++++++-------------------- ugh/ugh.h | 5 ++++- ugh_example/external_module.c | 12 +++++------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/ugh/subreq_external.c b/ugh/subreq_external.c index b3aabbb..17578a1 100644 --- a/ugh/subreq_external.c +++ b/ugh/subreq_external.c @@ -16,24 +16,13 @@ void ugh_subreq_wcb_ready(EV_P_ ev_async *w, int tev) ugh_subreq_external_del(r); } -//static -//int ugh_subreq_external_copy_chunk(ugh_subreq_external_t *r, char *data, size_t size) -//{ -// if (r->chunk_body_size < r->body.size + size) -// { -// char *old_body = r->body.data; - -// r->chunk_body_size *= 2; -// r->body.data = aux_pool_nalloc(r->c->pool, r->chunk_body_size); - -// memcpy(r->body.data, old_body, r->body.size); -// } - -// memcpy(r->body.data + r->body.size, data, size); -// r->body.size += size; - -// return 0; -//} +int ugh_subreq_external_assign_body(ugh_subreq_external_t *r, char *data, size_t size) +{ + r->body.data = aux_pool_nalloc(r->c->pool, size); + r->body.size = size; + memcpy(r->body.data, data, size); + return 0; +} ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c) @@ -72,13 +61,12 @@ void ugh_subreq_external_signal_ready(ugh_subreq_external_t* ex_subreq) { int ugh_subreq_external_run(ugh_subreq_external_t *r) { r->c->wait++; - r->on_request_done = ugh_subreq_external_signal_ready; + r->request_done = ugh_subreq_external_signal_ready; ev_timer_init(&r->wev_timeout, &ugh_subreq_external_timeout, 0, r->timeout); ev_async_init(&r->wev_ready, &ugh_subreq_wcb_ready); ev_timer_again(loop, &r->wev_timeout); ev_async_start(loop, &r->wev_ready); - r->run_external(NULL, r); return 0; } diff --git a/ugh/ugh.h b/ugh/ugh.h index cb3a478..f77ad2d 100644 --- a/ugh/ugh.h +++ b/ugh/ugh.h @@ -273,13 +273,14 @@ typedef int (*ugh_subreq_handle_fp)(ugh_subreq_t *r, char *data, size_t size); struct ugh_subreq_external { void* external_data; - void(*on_request_done)(ugh_subreq_external_t*); + void(*request_done)(ugh_subreq_external_t*); void(*run_external)(void* /*external data*/, ugh_subreq_external_t* /*subrequest to run*/); ugh_client_t *c; ev_async wev_ready; ev_timer wev_timeout; ev_tstamp timeout; strt body; + size_t body_capacity; }; struct ugh_subreq @@ -369,6 +370,8 @@ struct ugh_subreq ugh_subreq_t *ugh_subreq_add(ugh_client_t *c, char *url, size_t size, int flags); ugh_subreq_external_t *ugh_subreq_external_add(ugh_client_t *c); +int ugh_subreq_external_assign_body(ugh_subreq_external_t *r, char *data, size_t size); + int ugh_subreq_set_method(ugh_subreq_t *r, unsigned char method); int ugh_subreq_set_body(ugh_subreq_t *r, char *body, size_t body_size); int ugh_subreq_set_timeout(ugh_subreq_t *r, ev_tstamp timeout, int timeout_type); diff --git a/ugh_example/external_module.c b/ugh_example/external_module.c index a1f0c41..74d9c4a 100644 --- a/ugh_example/external_module.c +++ b/ugh_example/external_module.c @@ -7,20 +7,18 @@ typedef struct extern ugh_module_t ugh_module_ext_example; -void run_ex_request(void* data, ugh_subreq_external_t* ex_subreq) { - ex_subreq->body.size = 9; - ex_subreq->body.data = "It works"; - ex_subreq->on_request_done(ex_subreq); -} - int ugh_module_ext_example_handle(ugh_client_t *c, void *data, strp body) { // ugh_module_ext_example_conf_t *conf = data; /* get module conf */ ugh_subreq_external_t *e_subreq = ugh_subreq_external_add(c); - e_subreq->run_external = run_ex_request; ugh_subreq_external_run(e_subreq); + // Here lies any magic by which request_done can be called. + // request done is safe to call from other thread + ugh_subreq_external_assign_body(e_subreq, "It works", sizeof("It works")); + e_subreq->request_done(e_subreq); + /* wait for it (do smth in different coroutine while it is being downloaded) */ ugh_subreq_wait(c);