diff --git a/tools/scripts/asynccache/AsyncCache.proto b/tools/scripts/asynccache/AsyncCache.proto new file mode 100755 index 0000000..914392e --- /dev/null +++ b/tools/scripts/asynccache/AsyncCache.proto @@ -0,0 +1,31 @@ +// Options for caching a block asynchronously +// next available id: 6 +message AsyncCacheRequest { + optional int64 block_id = 1; + // TODO(calvin): source host and port should be replace with WorkerNetAddress + optional string source_host = 2; + optional int32 source_port = 3; + optional OpenUfsBlockOptions open_ufs_block_options = 4; + optional int64 length = 5; +} + +// Options to open a UFS block. +// next available id: 7 +message OpenUfsBlockOptions { + optional string ufs_path = 1; + // The offset of the block in within the file. + optional int64 offset_in_file = 2; + // The block size. + optional int64 block_size = 3; + optional int32 maxUfsReadConcurrency = 4; + optional int64 mountId = 5; + // If set, do not try to cache the block locally when reading the data from the UFS. + optional bool no_cache = 6; + // The client does not need to set this. This is set by the worker. + optional string user = 7; +} + +message LocalBlockOpenResponse { + optional string path = 1; +} + diff --git a/tools/scripts/asynccache/alluxiosc.c b/tools/scripts/asynccache/alluxiosc.c new file mode 100755 index 0000000..fb49cd8 --- /dev/null +++ b/tools/scripts/asynccache/alluxiosc.c @@ -0,0 +1,160 @@ +#define _GNU_SOURCE_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MNT_MAX 100 +char *seed = ".tmp.ava.alluxiosc.tmp"; // keep same with server +char *cache_seed = ".tmp.cache.tmp.ava.alluxiosc.tmp"; // keep same with server + +typedef struct ava_struct { + char *mnt_slots[MNT_MAX]; + int mnt_idx; + int debug; + int query; + int cache; +} ava_struct; + +ava_struct ava; + +FILE* (*my_fopen)(const char *filename, const char* mode); +int (*my_open)(const char *filename, int flags); +int (*my_open64)(const char *filename, int flags); + +int (*my__xstat)(int ver, const char *path, struct stat *buf); +int (*my__xstat64)(int ver, const char *path, struct stat64 *buf); + +void __attribute__ ((constructor)) init(void){ + FILE *file= setmntent("/proc/mounts", "r"); + struct mntent *ent; + + ava.debug = (getenv("alluxiosc_debug") != NULL); + ava.query = (getenv("alluxiosc_query") != NULL); + ava.cache = (getenv("alluxiosc_cache") != NULL); + if (ava.cache) seed = cache_seed; + ava.mnt_idx = 0; + + my_fopen = dlsym(RTLD_NEXT, "fopen"); + my_open = dlsym(RTLD_NEXT, "open"); + my_open64 = dlsym(RTLD_NEXT, "open64"); + my__xstat = dlsym(RTLD_NEXT, "__xstat"); + my__xstat64 = dlsym(RTLD_NEXT, "__xstat64"); + + if (file== NULL) { + perror("setmntent"); + return; + } + + while (NULL != (ent = getmntent(file))) { + if (0 == strcmp("alluxio-fuse", ent->mnt_fsname)) { + ava.mnt_slots[ava.mnt_idx++] = strdup(ent->mnt_dir); + } + } + endmntent(file); +} + +int is_alluxio_file(const char *fullpath) +{ + for (int i = 0; i < ava.mnt_idx; i++) { + char *path = strstr(fullpath, ava.mnt_slots[i]); + if (path == fullpath) { + return 1; + } + } + return 0; +} + +int get_sc(const char *filename, char *sc, int size, int *af) +{ + char buf[PATH_MAX + 1], *path = NULL; + int rc = 0, i = 0; + FILE *f = NULL; + + *af = 0; + if (NULL == realpath(filename, buf)) return 0; + if (!is_alluxio_file(buf)) return 0; + *af = 1; + + strcat(buf, seed); + + if ((f = my_fopen(buf, "r")) && fgets(sc, size, f)) { + i = strlen(sc); + if (sc[i - 1] == '\n') sc[i - 1] = '\0'; + rc = ('/' == sc[0]); + } + if (f) fclose(f); + return rc; +} + +int __xstat(int ver, const char *path, struct stat *buf) +{ + char full[PATH_MAX + 1]; + if (ava.query) { + char sc[PATH_MAX + 1]; + int af, rc = get_sc(path, sc, sizeof(sc), &af); + printf("query=%s\n", sc); + return my__xstat(ver, path, buf); + } + + if (NULL == realpath(path, full)) return my__xstat(ver, path, buf); // error out + if (!is_alluxio_file(full)) return my__xstat(ver, path, buf); + + if (ava.debug) fprintf(stderr, "--- async cache %s\n", full); + strcat(full, seed); + return my__xstat(ver, full, buf); +} + +int __xstat64(int ver, const char *path, struct stat64 *buf) +{ + char full[PATH_MAX + 1]; + if (ava.query) { + char sc[PATH_MAX + 1]; + int af, rc = get_sc(path, sc, sizeof(sc), &af); + printf("query=%s\n", sc); + return my__xstat64(ver, path, buf); + } + + if (NULL == realpath(path, full)) return my__xstat64(ver, path, buf); // error out + if (!is_alluxio_file(full)) return my__xstat64(ver, path, buf); + + if (ava.debug) fprintf(stderr, "--- async cache64 %s\n", full); + strcat(full, seed); + return my__xstat64(ver, full, buf); +} + +FILE* fopen(const char* filename, const char* mode){ + char sc[PATH_MAX + 1]; + memset(sc, 0, sizeof(sc)); + int af, rc = get_sc(filename, sc, sizeof(sc), &af); + if (ava.debug && af) fprintf(stderr, "--- fopen filename=%s, rc=%d, sc=%s\n", filename, rc, sc); + FILE *f = rc ? my_fopen(sc, mode) : NULL; + return f ? f : my_fopen(filename, mode); +} + +int open(const char *filename, int flags) +{ + char sc[PATH_MAX + 1]; + memset(sc, 0, sizeof(sc)); + int af, rc = get_sc(filename, sc, sizeof(sc), &af); + if (ava.debug && af) fprintf(stderr, "--- open filename=%s, rc=%d, sc=%s\n", filename, rc, sc); + int fd = rc ? my_open(sc, flags) : 0; + return fd ? fd : my_open(filename, flags); +} + +int open64(const char *filename, int flags) +{ + char sc[PATH_MAX + 1]; + memset(sc, 0, sizeof(sc)); + int af, rc = get_sc(filename, sc, sizeof(sc), &af); + if (ava.debug && af) fprintf(stderr, "--- open64 filename=%s, rc=%d, sc=%s\n", filename, rc, sc); + int fd = rc ? my_open64(sc, flags) : 0; + return fd ? fd : my_open64(filename, flags); +} + diff --git a/tools/scripts/asynccache/ava_cache.py b/tools/scripts/asynccache/ava_cache.py new file mode 100755 index 0000000..ecc374d --- /dev/null +++ b/tools/scripts/asynccache/ava_cache.py @@ -0,0 +1,81 @@ +#! /usr/bin/python3 +""" +apt-get update +apt-get install -y python3-pip +pip3 install google-apputils +pip3 install protobuf +protoc --proto_path=. --python_out=. AsyncCache.proto +""" + +from AsyncCache_pb2 import AsyncCacheRequest, LocalBlockOpenResponse +import sys +import socket +import struct +import time + +if len(sys.argv) != 4: + print("Usage:", sys.argv[0], "ip port file_list") + sys.exit(-1) + +ip = sys.argv[1] +port = int(sys.argv[2]) +flist = sys.argv[3] + +address = (ip, port) +clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +clientsocket.connect(address) + +TH_HI = 100 +TH_LOW = 50 +ID_PURGE_BLOCK = 0 +ID_QUERY_BLOCK = -1 + + +def async_cache(id, length): + data = AsyncCacheRequest() + data.length = length + data.block_id = id + s = data.SerializeToString() + # (long)length - (int)id - (int)packet_length - packet + clientsocket.sendall(struct.pack('>QII', 16 + len(s), 112, len(s)) + s) + + +def evict(id): + async_cache(id, ID_PURGE_BLOCK) + + +def trim(idlist, low): + rsp = LocalBlockOpenResponse() + while len(idlist) > low: + not_done = [] + for id in idlist: + async_cache(id, ID_QUERY_BLOCK) + result = clientsocket.recv(16) + if result == b'': + raise RuntimeError("socket connection broken") + alen, retid, plen = struct.unpack('>QII', result) + if retid != 106: + raise RuntimeError("not expect id") + result = clientsocket.recv(plen) + rsp.ParseFromString(result) + if rsp.path[0] != '/': + print("done ", id) + else: + not_done.append(id) + evict(id) + idlist = not_done + time.sleep(2) + return idlist + + +idlist = [] +with open(flist, "r") as lines: + for line in lines: + id = int(line.split()[0]) + if len(idlist) >= TH_HI: + idlist = trim(idlist, TH_LOW) + print("evicting ", id) + evict(id) + idlist.append(id) + +trim(idlist, 0) diff --git a/tools/scripts/asynccache/ava_cache.sh b/tools/scripts/asynccache/ava_cache.sh new file mode 100755 index 0000000..72349e5 --- /dev/null +++ b/tools/scripts/asynccache/ava_cache.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +if [ $# -lt 2 ] +then + echo usage "$0 " + exit -1 +fi + +so=$1 +list=$2 +if [ ! -e $so ] +then + echo "so file $so does not exist" + exit -1 +fi +if [ ! -e $list ] +then + echo "list file $list does not exist" + exit -1 +fi + +MAX=50 + +cache=() +trim() { + left=() + for f in ${cache[@]} + do + if [ ! -e "$f" ] + then + echo "remove non existing $f ..." + continue + fi + ok=`alluxiosc_query=1 LD_PRELOAD=$so ls "$f" | grep 'query=/'` + if [ -z "$ok" ] + then + echo "trying to cache again $f ..." + left+=("$f") + alluxiosc_cache=1 LD_PRELOAD=$so ls "$f" > /dev/null & + else + echo "done cache $f ..." + fi + done + cache=(${left[@]}) +} + +while read line +do + while [ ${#cache[@]} -gt $MAX ] + do + sleep 2 + trim + done + + f=`echo $line | awk '{print $1}'` + cache+=("$f") + echo "trying to cache $f ..." + alluxiosc_cache=1 LD_PRELOAD=$so ls "$f" > /dev/null & +done < $list + +while [ ${#cache[@]} -gt 0 ] +do + sleep 1 + trim +done + diff --git a/tools/scripts/asynccache/mk_alluxiosc.sh b/tools/scripts/asynccache/mk_alluxiosc.sh new file mode 100755 index 0000000..0547f58 --- /dev/null +++ b/tools/scripts/asynccache/mk_alluxiosc.sh @@ -0,0 +1,2 @@ +rm alluxiosc.so +gcc -D_GNU_SOURCE -fPIC -shared -O2 alluxiosc.c -o alluxiosc.so -ldl