From 20c70ff6e48b00b9a7d40c8be4e6f91634a3b0a4 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Fri, 20 Apr 2012 17:17:47 -0300 Subject: [PATCH 1/8] thread_n max_items available validated --- example.c | 2 +- iomplx.c | 43 ++++++++++++++++++++++++++++++++++++------- iomplx.h | 5 +++-- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/example.c b/example.c index 3e9dac4..6d7aa0b 100644 --- a/example.c +++ b/example.c @@ -71,7 +71,7 @@ int main() { iomplx_instance m; - iomplx_init(&m, malloc, free, NULL, 4, 4); + iomplx_init(&m, malloc, free, NULL, 4, 4, 0, 0); iomplx_inet_listen(&m, "0.0.0.0", 2222, demo_accept, NULL); iomplx_launch(&m); diff --git a/iomplx.c b/iomplx.c index beff0a3..05d93ee 100644 --- a/iomplx.c +++ b/iomplx.c @@ -19,12 +19,14 @@ #include #include +#include #include #include #include #include #include #include +#include static inline void iomplx_monitor_add(iomplx_monitor *mon, iomplx_item *item) { @@ -53,6 +55,24 @@ static int iomplx_dummy_call1(iomplx_item *item) return 0; } +static unsigned int iomplx_calc_thread_n_active_items(unsigned int threads, unsigned int thread_n_active_items) +{ + struct rlimit rlmt; + + if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { + return -1; + } else { + + if (!thread_n_active_items) + thread_n_active_items = IOMPLX_THREAD_N_ACTIVE_ITEMS; + + if ( (thread_n_active_items * threads) > (unsigned int)rlmt.rlim_cur) + return -1; + } + + return thread_n_active_items; +} + static const iomplx_callbacks iomplx_dummy_calls = { .calls_arr = {iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1} }; @@ -250,14 +270,23 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste return item_copy; } -void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity) +void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { - mplx->item_alloc = alloc; - mplx->item_free = free; - mplx->thread_init = init; - mplx->threads = threads; - mplx->active_list_size[THREAD_0] = 10; - mplx->active_list_size[THREAD_N] = IOMPLX_MAX_ACTIVE_ITEMS/threads + (IOMPLX_MAX_ACTIVE_ITEMS%threads != 0? 1 : 0); + mplx->item_alloc = alloc; + mplx->item_free = free; + mplx->thread_init = init; + mplx->threads = threads; + + if (!thread_zero_active_items) + thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; + + thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); + + assert(thread_n_active_items != -1); + + mplx->active_list_size[THREAD_0] = thread_zero_active_items; + mplx->active_list_size[THREAD_N] = thread_n_active_items; + iomplx_monitor_init(&mplx->monitor, timeout_granularity); uqueue_init(&mplx->accept_uqueue); uqueue_init(&mplx->n_uqueue); diff --git a/iomplx.h b/iomplx.h index 7c51288..90b28ed 100644 --- a/iomplx.h +++ b/iomplx.h @@ -22,7 +22,8 @@ #include #include -#define IOMPLX_MAX_ACTIVE_ITEMS 200 +#define IOMPLX_THREAD_ZERO_ACTIVE_ITEMS 10 +#define IOMPLX_THREAD_N_ACTIVE_ITEMS 200 #define IOMPLX_READ_EVENT 1 #define IOMPLX_WRITE_EVENT 2 @@ -130,7 +131,7 @@ void iomplx_callbacks_init(iomplx_item *); iomplx_item *iomplx_item_add(iomplx_instance *, iomplx_item *, int); -void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int); +void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int, unsigned int, unsigned int); void iomplx_launch(iomplx_instance *); static inline void iomplx_item_filter_set(iomplx_item *item, int filter) From 0d92810b2806269557bd9e3a5c120389e2e79399 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Fri, 20 Apr 2012 17:26:26 -0300 Subject: [PATCH 2/8] fixes indent errors --- iomplx.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/iomplx.c b/iomplx.c index 05d93ee..21434ab 100644 --- a/iomplx.c +++ b/iomplx.c @@ -55,7 +55,7 @@ static int iomplx_dummy_call1(iomplx_item *item) return 0; } -static unsigned int iomplx_calc_thread_n_active_items(unsigned int threads, unsigned int thread_n_active_items) +static unsigned int iomplx_calc_thread_n_active_items(unsigned int threads, unsigned int thread_n_active_items) { struct rlimit rlmt; @@ -63,7 +63,7 @@ static unsigned int iomplx_calc_thread_n_active_items(unsigned int threads, unsi return -1; } else { - if (!thread_n_active_items) + if (!thread_n_active_items) thread_n_active_items = IOMPLX_THREAD_N_ACTIVE_ITEMS; if ( (thread_n_active_items * threads) > (unsigned int)rlmt.rlim_cur) @@ -272,19 +272,19 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { - mplx->item_alloc = alloc; - mplx->item_free = free; - mplx->thread_init = init; - mplx->threads = threads; + mplx->item_alloc = alloc; + mplx->item_free = free; + mplx->thread_init = init; + mplx->threads = threads; - if (!thread_zero_active_items) - thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; + if(!thread_zero_active_items) + thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; - thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); + thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); - assert(thread_n_active_items != -1); + assert(thread_n_active_items != -1); - mplx->active_list_size[THREAD_0] = thread_zero_active_items; + mplx->active_list_size[THREAD_0] = thread_zero_active_items; mplx->active_list_size[THREAD_N] = thread_n_active_items; iomplx_monitor_init(&mplx->monitor, timeout_granularity); From c0d9ba86c19bbf9ae2d412b2efe9a3c1fd5fb1e8 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Fri, 20 Apr 2012 17:28:01 -0300 Subject: [PATCH 3/8] fixes indent errors --- iomplx.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/iomplx.c b/iomplx.c index 21434ab..31c54a3 100644 --- a/iomplx.c +++ b/iomplx.c @@ -272,24 +272,24 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { - mplx->item_alloc = alloc; - mplx->item_free = free; - mplx->thread_init = init; - mplx->threads = threads; + mplx->item_alloc = alloc; + mplx->item_free = free; + mplx->thread_init = init; + mplx->threads = threads; - if(!thread_zero_active_items) - thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; + if(!thread_zero_active_items) + thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; - thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); + thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); - assert(thread_n_active_items != -1); + assert(thread_n_active_items != -1); - mplx->active_list_size[THREAD_0] = thread_zero_active_items; - mplx->active_list_size[THREAD_N] = thread_n_active_items; + mplx->active_list_size[THREAD_0] = thread_zero_active_items; + mplx->active_list_size[THREAD_N] = thread_n_active_items; - iomplx_monitor_init(&mplx->monitor, timeout_granularity); - uqueue_init(&mplx->accept_uqueue); - uqueue_init(&mplx->n_uqueue); + iomplx_monitor_init(&mplx->monitor, timeout_granularity); + uqueue_init(&mplx->accept_uqueue); + uqueue_init(&mplx->n_uqueue); } static void iomplx_start_threads(iomplx_instance *mplx) From c4c4ade9f0ea74656d7b1056a69ea06246feaaae Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Tue, 24 Apr 2012 15:07:07 -0300 Subject: [PATCH 4/8] fixed iomplx_active_list_size_fix, according to PR #1 --- example.c | 1 - iomplx.c | 49 ++++++++++++++++++++++++------------------------- iomplx.h | 2 +- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/example.c b/example.c index 6d7aa0b..977bbce 100644 --- a/example.c +++ b/example.c @@ -70,7 +70,6 @@ int demo_accept(iomplx_item *item) int main() { iomplx_instance m; - iomplx_init(&m, malloc, free, NULL, 4, 4, 0, 0); iomplx_inet_listen(&m, "0.0.0.0", 2222, demo_accept, NULL); iomplx_launch(&m); diff --git a/iomplx.c b/iomplx.c index 31c54a3..5ad83b1 100644 --- a/iomplx.c +++ b/iomplx.c @@ -55,22 +55,26 @@ static int iomplx_dummy_call1(iomplx_item *item) return 0; } -static unsigned int iomplx_calc_thread_n_active_items(unsigned int threads, unsigned int thread_n_active_items) +static inline void iomplx_active_list_size_fix(unsigned int *active_list_size, unsigned int threads) { struct rlimit rlmt; + unsigned int active_items = 0; - if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { - return -1; - } else { + assert(threads); + assert(getrlimit(RLIMIT_NOFILE, &rlmt) != -1 || rlmt.rlim_cur != 0); - if (!thread_n_active_items) - thread_n_active_items = IOMPLX_THREAD_N_ACTIVE_ITEMS; + if ( !active_list_size[THREAD_N] ) + active_list_size[THREAD_N] = IOMPLX_THREAD_N_ACTIVE_ITEMS; - if ( (thread_n_active_items * threads) > (unsigned int)rlmt.rlim_cur) - return -1; - } + if ( !active_list_size[THREAD_0] ) + active_list_size[THREAD_0] = IOMPLX_THREAD_0_ACTIVE_ITEMS; - return thread_n_active_items; + active_items = active_list_size[THREAD_N] * threads + active_list_size[THREAD_0]; + + if ( active_items > rlmt.rlim_cur ) { + assert(active_list_size[THREAD_0] < rlmt.rlim_cur); + active_list_size[THREAD_N] = (rlmt.rlim_cur - active_list_size[THREAD_0]) / threads; + } } static const iomplx_callbacks iomplx_dummy_calls = { @@ -272,24 +276,19 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { - mplx->item_alloc = alloc; - mplx->item_free = free; - mplx->thread_init = init; - mplx->threads = threads; - - if(!thread_zero_active_items) - thread_zero_active_items = IOMPLX_THREAD_ZERO_ACTIVE_ITEMS; - - thread_n_active_items = iomplx_calc_thread_n_active_items(threads, thread_n_active_items); + mplx->item_alloc = alloc; + mplx->item_free = free; + mplx->thread_init = init; + mplx->threads = threads; - assert(thread_n_active_items != -1); + mplx->active_list_size[THREAD_0] = thread_zero_active_items; + mplx->active_list_size[THREAD_N] = thread_n_active_items; - mplx->active_list_size[THREAD_0] = thread_zero_active_items; - mplx->active_list_size[THREAD_N] = thread_n_active_items; + iomplx_active_list_size_fix(mplx->active_list_size, threads); + iomplx_monitor_init(&mplx->monitor, timeout_granularity); - iomplx_monitor_init(&mplx->monitor, timeout_granularity); - uqueue_init(&mplx->accept_uqueue); - uqueue_init(&mplx->n_uqueue); + uqueue_init(&mplx->accept_uqueue); + uqueue_init(&mplx->n_uqueue); } static void iomplx_start_threads(iomplx_instance *mplx) diff --git a/iomplx.h b/iomplx.h index 90b28ed..b5ac20a 100644 --- a/iomplx.h +++ b/iomplx.h @@ -22,7 +22,7 @@ #include #include -#define IOMPLX_THREAD_ZERO_ACTIVE_ITEMS 10 +#define IOMPLX_THREAD_0_ACTIVE_ITEMS 10 #define IOMPLX_THREAD_N_ACTIVE_ITEMS 200 #define IOMPLX_READ_EVENT 1 From bc01344afefb4812d080580d03bcdcdb49280482 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Tue, 24 Apr 2012 15:11:09 -0300 Subject: [PATCH 5/8] fixed iomplx_active_list_size_fix, according to PR #1 --- iomplx.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/iomplx.c b/iomplx.c index 5ad83b1..4e7c84a 100644 --- a/iomplx.c +++ b/iomplx.c @@ -276,10 +276,10 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items) { - mplx->item_alloc = alloc; - mplx->item_free = free; - mplx->thread_init = init; - mplx->threads = threads; + mplx->item_alloc = alloc; + mplx->item_free = free; + mplx->thread_init = init; + mplx->threads = threads; mplx->active_list_size[THREAD_0] = thread_zero_active_items; mplx->active_list_size[THREAD_N] = thread_n_active_items; @@ -288,7 +288,7 @@ void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_f iomplx_monitor_init(&mplx->monitor, timeout_granularity); uqueue_init(&mplx->accept_uqueue); - uqueue_init(&mplx->n_uqueue); + uqueue_init(&mplx->n_uqueue); } static void iomplx_start_threads(iomplx_instance *mplx) From 9c54c5be088511fddb55e6cb9f739dac060e1c42 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Tue, 24 Apr 2012 15:14:24 -0300 Subject: [PATCH 6/8] fixed iomplx_active_list_size_fix, according to PR #1 --- iomplx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iomplx.c b/iomplx.c index 4e7c84a..fd57b46 100644 --- a/iomplx.c +++ b/iomplx.c @@ -57,7 +57,7 @@ static int iomplx_dummy_call1(iomplx_item *item) static inline void iomplx_active_list_size_fix(unsigned int *active_list_size, unsigned int threads) { - struct rlimit rlmt; + struct rlimit rlmt; unsigned int active_items = 0; assert(threads); From 85342fe240cabd3cf36d60caeab23c3087f1e5ef Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Wed, 25 Apr 2012 15:00:55 -0300 Subject: [PATCH 7/8] Added CPU_AFFINITY for thread creation --- Makefile | 8 +++++++- iomplx.c | 22 +++++++++++++++++++++- iomplx.h | 12 ++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 6fa55b8..88054f6 100644 --- a/Makefile +++ b/Makefile @@ -18,11 +18,17 @@ ifneq ($(findstring /usr/include/sys/event.h, $(wildcard /usr/include/sys/*.h)), UQUEUE=KQUEUE endif +ifeq ($(CPU_AFFINITY), 1) + CFLAGS += -D_GNU_SOURCE -DCPU_AFFINITY=1 +endif + + UQUEUE_SRC=backend/$(shell echo $(UQUEUE) | tr A-Z a-z).c SRC+=$(UQUEUE_SRC) OBJ=$(SRC:.c=.o) -all: prepare lib + +all: prepare lib example @echo "Done" prepare: diff --git a/iomplx.c b/iomplx.c index fd57b46..5dc4b3d 100644 --- a/iomplx.c +++ b/iomplx.c @@ -297,11 +297,31 @@ static void iomplx_start_threads(iomplx_instance *mplx) pthread_attr_t attr; pthread_t unused; +#if USE_CPU_AFFINITY + int procs = 0; + cpu_set_t cpu_set; + + procs = (unsigned int)sysconf( _SC_NPROCESSORS_ONLN ); + assert(procs != -1); + + //force to processor number + if(mplx->threads > procs) + mplx->threads = procs; + + CPU_ZERO(&cpu_set); +#endif pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - for(i = 0; i < mplx->threads; i++) + for(i = 0; i < mplx->threads; i++) { pthread_create(&unused, &attr, (void *(*)(void *))iomplx_thread_n, (void *)mplx); + +#if USE_CPU_AFFINITY + CPU_SET(i, &cpu_set); + pthread_setaffinity_np(unused, sizeof(cpu_set_t), &cpu_set); +#endif + + } } void iomplx_launch(iomplx_instance *mplx) diff --git a/iomplx.h b/iomplx.h index b5ac20a..628c815 100644 --- a/iomplx.h +++ b/iomplx.h @@ -22,6 +22,18 @@ #include #include +#if !defined(USE_CPU_AFFINITY) +#define USE_CPU_AFFINITY ( CPU_AFFINITY && __linux__ ) +#endif + +#if USE_CPU_AFFINITY +#if !defined(_GNU_SOURCE) +#define _GNU_SOURCE 1 +#endif +#include +#endif + + #define IOMPLX_THREAD_0_ACTIVE_ITEMS 10 #define IOMPLX_THREAD_N_ACTIVE_ITEMS 200 From af37a85682965946d41c267cbaa43b46811ad252 Mon Sep 17 00:00:00 2001 From: Jorge Niedbalski R Date: Wed, 25 Apr 2012 15:02:25 -0300 Subject: [PATCH 8/8] Added CPU_AFFINITY for thread creation --- iomplx.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iomplx.c b/iomplx.c index 5dc4b3d..4976fb6 100644 --- a/iomplx.c +++ b/iomplx.c @@ -299,7 +299,7 @@ static void iomplx_start_threads(iomplx_instance *mplx) #if USE_CPU_AFFINITY int procs = 0; - cpu_set_t cpu_set; + cpu_set_t cpu_set; procs = (unsigned int)sysconf( _SC_NPROCESSORS_ONLN ); assert(procs != -1); @@ -317,7 +317,7 @@ static void iomplx_start_threads(iomplx_instance *mplx) pthread_create(&unused, &attr, (void *(*)(void *))iomplx_thread_n, (void *)mplx); #if USE_CPU_AFFINITY - CPU_SET(i, &cpu_set); + CPU_SET(i, &cpu_set); pthread_setaffinity_np(unused, sizeof(cpu_set_t), &cpu_set); #endif