Kouhei Sutou
null+****@clear*****
Sun Apr 3 01:15:29 JST 2016
Kouhei Sutou 2016-04-03 01:15:29 +0900 (Sun, 03 Apr 2016) New Revision: f3d825b42af41172715a145ffa2557c12b7cf21b https://github.com/groonga/groonga/commit/f3d825b42af41172715a145ffa2557c12b7cf21b Message: groonga-http: support request_timeout Modified files: lib/grn_ctx.h src/groonga.c Modified: lib/grn_ctx.h (+5 -0) =================================================================== --- lib/grn_ctx.h 2016-04-03 01:07:40 +0900 (93bb595) +++ lib/grn_ctx.h 2016-04-03 01:15:29 +0900 (e33a9c0) @@ -520,9 +520,14 @@ extern grn_timeval grn_starttime; #ifndef GRN_TIMEVAL_STR_FORMAT #define GRN_TIMEVAL_STR_FORMAT "%04d-%02d-%02d %02d:%02d:%02d.%06d" #endif /* GRN_TIMEVAL_STR_FORMAT */ +#define GRN_TIMEVAL_TO_MSEC(timeval) \ + (((timeval)->tv_sec * GRN_TIME_MSEC_PER_SEC) + \ + ((timeval)->tv_nsec / GRN_TIME_NSEC_PER_MSEC)) #define GRN_TIME_NSEC_PER_SEC 1000000000 #define GRN_TIME_NSEC_PER_SEC_F 1000000000.0 +#define GRN_TIME_NSEC_PER_MSEC 1000000 #define GRN_TIME_NSEC_PER_USEC (GRN_TIME_NSEC_PER_SEC / GRN_TIME_USEC_PER_SEC) +#define GRN_TIME_MSEC_PER_SEC 1000 #define GRN_TIME_NSEC_TO_USEC(nsec) ((nsec) / GRN_TIME_NSEC_PER_USEC) #define GRN_TIME_USEC_TO_NSEC(usec) ((usec) * GRN_TIME_NSEC_PER_USEC) Modified: src/groonga.c (+246 -20) =================================================================== --- src/groonga.c 2016-04-03 01:07:40 +0900 (a209136) +++ src/groonga.c 2016-04-03 01:15:29 +0900 (c95c2df) @@ -122,6 +122,25 @@ grn_rc_to_exit_code(grn_rc rc) } } +static void +break_accept_event_loop(grn_ctx *ctx) +{ + grn_com *client; + const char *address; + + if (strcmp(bind_address, "0.0.0.0") == 0) { + address = "127.0.0.1"; + } else if (strcmp(bind_address, "::") == 0) { + address = "::1"; + } else { + address = bind_address; + } + client = grn_com_copen(ctx, NULL, address, port); + if (client) { + grn_com_close(ctx, client); + } +} + #ifdef GRN_WITH_LIBEDIT #include <locale.h> #include <histedit.h> @@ -525,6 +544,229 @@ groonga_set_thread_limit(uint32_t new_limit, void *data) } } +typedef struct { + grn_mutex mutex; + grn_ctx ctx; + grn_pat *entries; + uint64_t earliest_unix_time_msec; +} request_timer_data; +static request_timer_data the_request_timer_data; + +static void * +request_timer_register(const char *request_id, + unsigned int request_id_size, + double timeout, + void *user_data) +{ + request_timer_data *data = user_data; + grn_id id = GRN_ID_NIL; + + { + grn_ctx *ctx = &(data->ctx); + grn_bool is_first_timer; + grn_timeval tv; + uint64_t timeout_unix_time_msec; + void *value; + + MUTEX_LOCK(data->mutex); + is_first_timer = (grn_pat_size(ctx, data->entries) == 0); + grn_timeval_now(ctx, &tv); + timeout_unix_time_msec = GRN_TIMEVAL_TO_MSEC(&tv) + (timeout * 1000); + while (GRN_TRUE) { + int added; + id = grn_pat_add(ctx, data->entries, + &timeout_unix_time_msec, sizeof(uint64_t), + &value, &added); + if (added != 0) { + break; + } + timeout_unix_time_msec++; + } + grn_memcpy(value, &request_id_size, sizeof(unsigned int)); + grn_memcpy(((uint8_t *)value) + sizeof(unsigned int), + request_id, request_id_size); + if (data->earliest_unix_time_msec == 0 || + data->earliest_unix_time_msec > timeout_unix_time_msec) { + data->earliest_unix_time_msec = timeout_unix_time_msec; + } + if (is_first_timer) { + break_accept_event_loop(ctx); + } + MUTEX_UNLOCK(data->mutex); + } + + return (void *)(uint64_t)id; +} + +static void +request_timer_unregister(void *timer_id, + void *user_data) +{ + request_timer_data *data = user_data; + grn_id id = (grn_id)(uint64_t)timer_id; + + { + grn_ctx *ctx = &(data->ctx); + uint64_t timeout_unix_time_msec; + int key_size; + + MUTEX_LOCK(data->mutex); + key_size = grn_pat_get_key(ctx, + data->entries, + id, + &timeout_unix_time_msec, + sizeof(uint64_t)); + if (key_size > 0) { + grn_pat_delete_by_id(ctx, data->entries, id, NULL); + if (data->earliest_unix_time_msec >= timeout_unix_time_msec) { + data->earliest_unix_time_msec = 0; + } + } + MUTEX_UNLOCK(data->mutex); + } +} + +static void +request_timer_fin(void *user_data) +{ + request_timer_data *data = user_data; + + { + grn_ctx *ctx = &(data->ctx); + grn_pat_close(ctx, data->entries); + grn_ctx_fin(ctx); + MUTEX_FIN(data->mutex); + } +} + +static void +request_timer_init(void) +{ + static grn_request_timer timer; + request_timer_data *data = &the_request_timer_data; + grn_ctx *ctx; + + MUTEX_INIT(data->mutex); + ctx = &(data->ctx); + grn_ctx_init(ctx, 0); + data->entries = grn_pat_create(ctx, + NULL, + sizeof(uint64_t), + GRN_TABLE_MAX_KEY_SIZE, + GRN_OBJ_KEY_UINT); + data->earliest_unix_time_msec = 0; + + timer.user_data = data; + timer.register_func = request_timer_register; + timer.unregister_func = request_timer_unregister; + timer.fin_func = request_timer_fin; + + grn_request_timer_set(&timer); +} + +static grn_bool +request_timer_ensure_earliest_unix_time_msec(void) +{ + request_timer_data *data = &the_request_timer_data; + grn_ctx *ctx; + grn_pat_cursor *cursor; + + if (data->earliest_unix_time_msec > 0) { + return GRN_TRUE; + } + + ctx = &(data->ctx); + cursor = grn_pat_cursor_open(ctx, data->entries, + NULL, 0, + NULL, 0, + 0, 1, GRN_CURSOR_ASCENDING); + if (!cursor) { + return GRN_FALSE; + } + while (grn_pat_cursor_next(ctx, cursor) != GRN_ID_NIL) { + void *key; + uint64_t timeout_unix_time_msec; + + grn_pat_cursor_get_key(ctx, cursor, &key); + timeout_unix_time_msec = *(uint64_t *)key; + data->earliest_unix_time_msec = timeout_unix_time_msec; + break; + } + grn_pat_cursor_close(ctx, cursor); + + return data->earliest_unix_time_msec > 0; +} + +static int +request_timer_get_poll_timeout(void) +{ + request_timer_data *data = &the_request_timer_data; + int timeout = 1000; + grn_ctx *ctx; + grn_timeval tv; + + MUTEX_LOCK(data->mutex); + ctx = &(data->ctx); + if (grn_pat_size(ctx, data->entries) == 0) { + goto exit; + } + + if (!request_timer_ensure_earliest_unix_time_msec()) { + goto exit; + } + + grn_timeval_now(ctx, &tv); + timeout = data->earliest_unix_time_msec - GRN_TIMEVAL_TO_MSEC(&tv); + if (timeout < 0) { + timeout = 0; + } else if (timeout > 1000) { + timeout = 1000; + } + +exit : + MUTEX_UNLOCK(data->mutex); + + return timeout; +} + +static void +request_timer_process_timeout(void) +{ + request_timer_data *data = &the_request_timer_data; + grn_ctx *ctx; + grn_timeval tv; + uint64_t max; + grn_pat_cursor *cursor; + + ctx = &(data->ctx); + if (grn_pat_size(ctx, data->entries) == 0) { + return; + } + + grn_timeval_now(ctx, &tv); + max = GRN_TIMEVAL_TO_MSEC(&tv); + cursor = grn_pat_cursor_open(ctx, data->entries, + NULL, 0, + &max, sizeof(uint64_t), + 0, -1, GRN_CURSOR_ASCENDING); + if (!cursor) { + return; + } + + grn_id id; + while ((id = grn_pat_cursor_next(ctx, cursor)) != GRN_ID_NIL) { + void *value; + const char *request_id; + unsigned int request_id_size; + + grn_pat_cursor_get_value(ctx, cursor, &value); + request_id_size = *((unsigned int *)value); + request_id = (const char *)(((uint8_t *)value) + sizeof(unsigned int)); + grn_request_canceler_cancel(request_id, request_id_size); + } + grn_pat_cursor_close(ctx, cursor); +} + static void reset_ready_notify_pipe(void) { @@ -658,7 +900,9 @@ daemonize(void) static void run_server_loop(grn_ctx *ctx, grn_com_event *ev) { - while (!grn_com_event_poll(ctx, ev, 1000) && grn_gctx.stat != GRN_CTX_QUIT) { + request_timer_init(); + while (!grn_com_event_poll(ctx, ev, request_timer_get_poll_timeout()) && + grn_gctx.stat != GRN_CTX_QUIT) { grn_edge *edge; while ((edge = (grn_edge *)grn_com_queue_deque(ctx, &ctx_old))) { grn_obj *msg; @@ -674,6 +918,7 @@ run_server_loop(grn_ctx *ctx, grn_com_event *ev) } grn_edges_delete(ctx, edge); } + request_timer_process_timeout(); /* todo : log stat */ } for (;;) { @@ -2126,25 +2371,6 @@ check_rlimit_nofile(grn_ctx *ctx) #endif /* WIN32 */ } -static void -break_accept_event_loop(grn_ctx *ctx) -{ - grn_com *client; - const char *address; - - if (strcmp(bind_address, "0.0.0.0") == 0) { - address = "127.0.0.1"; - } else if (strcmp(bind_address, "::") == 0) { - address = "::1"; - } else { - address = bind_address; - } - client = grn_com_copen(ctx, NULL, address, port); - if (client) { - grn_com_close(ctx, client); - } -} - static grn_thread_func_result CALLBACK h_worker(void *arg) { -------------- next part -------------- HTML����������������������������...Télécharger