Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ ifneq ($(findstring /usr/include/sys/event.h, $(wildcard /usr/include/sys/*.h)),
UQUEUE=KQUEUE
endif

ifeq ($(CPU_AFFINITY), 1)
CFLAGS += -D_GNU_SOURCE -DCPU_AFFINITY=1
endif


UQUEUE_SRC=backend/$(shell echo $(UQUEUE) | tr A-Z a-z).c
SRC+=$(UQUEUE_SRC)

OBJ=$(SRC:.c=.o)
all: prepare lib

all: prepare lib example
@echo "Done"

prepare:
Expand Down
3 changes: 1 addition & 2 deletions example.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ int demo_accept(iomplx_item *item)
int main()
{
iomplx_instance m;

iomplx_init(&m, malloc, free, NULL, 4, 4);
iomplx_init(&m, malloc, free, NULL, 4, 4, 0, 0);
iomplx_inet_listen(&m, "0.0.0.0", 2222, demo_accept, NULL);
iomplx_launch(&m);

Expand Down
56 changes: 52 additions & 4 deletions iomplx.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

#include <pthread.h>
#include <sys/types.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <iomplx.h>
#include <time.h>
#include <unistd.h>
#include <assert.h>

static inline void iomplx_monitor_add(iomplx_monitor *mon, iomplx_item *item)
{
Expand Down Expand Up @@ -53,6 +55,28 @@ static int iomplx_dummy_call1(iomplx_item *item)
return 0;
}

static inline void iomplx_active_list_size_fix(unsigned int *active_list_size, unsigned int threads)
{
struct rlimit rlmt;
unsigned int active_items = 0;

assert(threads);
assert(getrlimit(RLIMIT_NOFILE, &rlmt) != -1 || rlmt.rlim_cur != 0);

if ( !active_list_size[THREAD_N] )
active_list_size[THREAD_N] = IOMPLX_THREAD_N_ACTIVE_ITEMS;

if ( !active_list_size[THREAD_0] )
active_list_size[THREAD_0] = IOMPLX_THREAD_0_ACTIVE_ITEMS;

active_items = active_list_size[THREAD_N] * threads + active_list_size[THREAD_0];

if ( active_items > rlmt.rlim_cur ) {
assert(active_list_size[THREAD_0] < rlmt.rlim_cur);
active_list_size[THREAD_N] = (rlmt.rlim_cur - active_list_size[THREAD_0]) / threads;
}
}

static const iomplx_callbacks iomplx_dummy_calls = {
.calls_arr = {iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1, iomplx_dummy_call1}
};
Expand Down Expand Up @@ -250,15 +274,19 @@ iomplx_item *iomplx_item_add(iomplx_instance *mplx, iomplx_item *item, int liste
return item_copy;
}

void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity)
void iomplx_init(iomplx_instance *mplx, alloc_func alloc, free_func free, init_func init, unsigned int threads, unsigned int timeout_granularity, unsigned int thread_zero_active_items, unsigned int thread_n_active_items)
{
mplx->item_alloc = alloc;
mplx->item_free = free;
mplx->thread_init = init;
mplx->threads = threads;
mplx->active_list_size[THREAD_0] = 10;
mplx->active_list_size[THREAD_N] = IOMPLX_MAX_ACTIVE_ITEMS/threads + (IOMPLX_MAX_ACTIVE_ITEMS%threads != 0? 1 : 0);

mplx->active_list_size[THREAD_0] = thread_zero_active_items;
mplx->active_list_size[THREAD_N] = thread_n_active_items;

iomplx_active_list_size_fix(mplx->active_list_size, threads);
iomplx_monitor_init(&mplx->monitor, timeout_granularity);

uqueue_init(&mplx->accept_uqueue);
uqueue_init(&mplx->n_uqueue);
}
Expand All @@ -269,11 +297,31 @@ static void iomplx_start_threads(iomplx_instance *mplx)
pthread_attr_t attr;
pthread_t unused;

#if USE_CPU_AFFINITY
int procs = 0;
cpu_set_t cpu_set;

procs = (unsigned int)sysconf( _SC_NPROCESSORS_ONLN );
assert(procs != -1);

//force to processor number
if(mplx->threads > procs)
mplx->threads = procs;

CPU_ZERO(&cpu_set);
#endif
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

for(i = 0; i < mplx->threads; i++)
for(i = 0; i < mplx->threads; i++) {
pthread_create(&unused, &attr, (void *(*)(void *))iomplx_thread_n, (void *)mplx);

#if USE_CPU_AFFINITY
CPU_SET(i, &cpu_set);
pthread_setaffinity_np(unused, sizeof(cpu_set_t), &cpu_set);
#endif

}
}

void iomplx_launch(iomplx_instance *mplx)
Expand Down
17 changes: 15 additions & 2 deletions iomplx.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@
#include <mempool.h>
#include <sys/socket.h>

#define IOMPLX_MAX_ACTIVE_ITEMS 200
#if !defined(USE_CPU_AFFINITY)
#define USE_CPU_AFFINITY ( CPU_AFFINITY && __linux__ )
#endif

#if USE_CPU_AFFINITY
#if !defined(_GNU_SOURCE)
#define _GNU_SOURCE 1
#endif
#include <sched.h>
#endif


#define IOMPLX_THREAD_0_ACTIVE_ITEMS 10
#define IOMPLX_THREAD_N_ACTIVE_ITEMS 200

#define IOMPLX_READ_EVENT 1
#define IOMPLX_WRITE_EVENT 2
Expand Down Expand Up @@ -130,7 +143,7 @@ void iomplx_callbacks_init(iomplx_item *);

iomplx_item *iomplx_item_add(iomplx_instance *, iomplx_item *, int);

void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int);
void iomplx_init(iomplx_instance *, alloc_func, free_func, init_func, unsigned int, unsigned int, unsigned int, unsigned int);
void iomplx_launch(iomplx_instance *);

static inline void iomplx_item_filter_set(iomplx_item *item, int filter)
Expand Down