diff --git a/Makefile b/Makefile index 6fa55b8..88054f6 100644 --- a/Makefile +++ b/Makefile @@ -18,11 +18,17 @@ ifneq ($(findstring /usr/include/sys/event.h, $(wildcard /usr/include/sys/*.h)), UQUEUE=KQUEUE endif +ifeq ($(CPU_AFFINITY), 1) + CFLAGS += -D_GNU_SOURCE -DCPU_AFFINITY=1 +endif + + UQUEUE_SRC=backend/$(shell echo $(UQUEUE) | tr A-Z a-z).c SRC+=$(UQUEUE_SRC) OBJ=$(SRC:.c=.o) -all: prepare lib + +all: prepare lib example @echo "Done" prepare: diff --git a/example.c b/example.c index 3e9dac4..977bbce 100644 --- a/example.c +++ b/example.c @@ -70,8 +70,7 @@ int demo_accept(iomplx_item *item) int main() { iomplx_instance m; - - iomplx_init(&m, malloc, free, NULL, 4, 4); + iomplx_init(&m, malloc, free, NULL, 4, 4, 0, 0); iomplx_inet_listen(&m, "0.0.0.0", 2222, demo_accept, NULL); iomplx_launch(&m); diff --git a/iomplx.c b/iomplx.c index beff0a3..4976fb6 100644 --- a/iomplx.c +++ b/iomplx.c @@ -19,12 +19,14 @@ #include #include +#include #include #include #include #include #include #include +#include static inline void iomplx_monitor_add(iomplx_monitor *mon, iomplx_item *item) { @@ -53,6 +55,28 @@ static int iomplx_dummy_call1(iomplx_item *item) return 0; } +static inline void iomplx_active_list_size_fix(unsigned int *active_list_size, unsigned int threads) +{ + struct rlimit rlmt; + unsigned int active_items = 0; + + assert(threads); + assert(getrlimit(RLIMIT_NOFILE, &rlmt) != -1 || rlmt.rlim_cur != 0); + + if ( !active_list_size[THREAD_N] ) + active_list_size[THREAD_N] = IOMPLX_THREAD_N_ACTIVE_ITEMS; + + if ( !active_list_size[THREAD_0] ) + active_list_size[THREAD_0] = IOMPLX_THREAD_0_ACTIVE_ITEMS; + + active_items = active_list_size[THREAD_N] * threads + active_list_size[THREAD_0]; + + if ( active_items > rlmt.rlim_cur ) { + assert(active_list_size[THREAD_0] < rlmt.rlim_cur); + active_list_size[THREAD_N] = (rlmt.rlim_cur - active_list_size[THREAD_0]) / threads; + } +} + static const iomplx_callbacks iomplx_dummy_calls = { .calls_arr = {iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1} }; @@ -250,15 +274,19 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste return item_copy; } -void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity) +void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { mplx->item_alloc = alloc; mplx->item_free = free; mplx->thread_init = init; mplx->threads = threads; - mplx->active_list_size[THREAD_0] = 10; - mplx->active_list_size[THREAD_N] = IOMPLX_MAX_ACTIVE_ITEMS/threads + (IOMPLX_MAX_ACTIVE_ITEMS%threads != 0? 1 : 0); + + mplx->active_list_size[THREAD_0] = thread_zero_active_items; + mplx->active_list_size[THREAD_N] = thread_n_active_items; + + iomplx_active_list_size_fix(mplx->active_list_size, threads); iomplx_monitor_init(&mplx->monitor, timeout_granularity); + uqueue_init(&mplx->accept_uqueue); uqueue_init(&mplx->n_uqueue); } @@ -269,11 +297,31 @@ static void iomplx_start_threads(iomplx_instance *mplx) pthread_attr_t attr; pthread_t unused; +#if USE_CPU_AFFINITY + int procs = 0; + cpu_set_t cpu_set; + + procs = (unsigned int)sysconf( _SC_NPROCESSORS_ONLN ); + assert(procs != -1); + + //force to processor number + if(mplx->threads > procs) + mplx->threads = procs; + + CPU_ZERO(&cpu_set); +#endif pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - for(i = 0; i < mplx->threads; i++) + for(i = 0; i < mplx->threads; i++) { pthread_create(&unused, &attr, (void *(*)(void *))iomplx_thread_n, (void *)mplx); + +#if USE_CPU_AFFINITY + CPU_SET(i, &cpu_set); + pthread_setaffinity_np(unused, sizeof(cpu_set_t), &cpu_set); +#endif + + } } void iomplx_launch(iomplx_instance *mplx) diff --git a/iomplx.h b/iomplx.h index 7c51288..628c815 100644 --- a/iomplx.h +++ b/iomplx.h @@ -22,7 +22,20 @@ #include #include -#define IOMPLX_MAX_ACTIVE_ITEMS 200 +#if !defined(USE_CPU_AFFINITY) +#define USE_CPU_AFFINITY ( CPU_AFFINITY && __linux__ ) +#endif + +#if USE_CPU_AFFINITY +#if !defined(_GNU_SOURCE) +#define _GNU_SOURCE 1 +#endif +#include +#endif + + +#define IOMPLX_THREAD_0_ACTIVE_ITEMS 10 +#define IOMPLX_THREAD_N_ACTIVE_ITEMS 200 #define IOMPLX_READ_EVENT 1 #define IOMPLX_WRITE_EVENT 2 @@ -130,7 +143,7 @@ void iomplx_callbacks_init(iomplx_item *); iomplx_item *iomplx_item_add(iomplx_instance *, iomplx_item *, int); -void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int); +void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int, unsigned int, unsigned int); void iomplx_launch(iomplx_instance *); static inline void iomplx_item_filter_set(iomplx_item *item, int filter)