diff --git a/README.md b/README.md
index 804f031e..9632a949 100755
--- a/README.md
+++ b/README.md
@@ -103,8 +103,9 @@ Browser IE10+
## 安装说明
- jobx分为两个jobx-server端和jobx-agent端,jobx-server端即为一个web可视化的中央管理调度平台,jobx-agent为要管理的任务的机器
- agent和server都依赖zookeeper,安装部署jobx之前必须先安装和启动zookeeper,server和agent必须连接同一个zookeeper,server端依赖redis
+ jobx分为两个jobx-server端和jobx-agent端,jobx-server端即为一个web可视化的中央管理调度平台,
+ jobx-agent为要管理的任务的机器,agent和server都依赖zookeeper,安装部署jobx之前必须先安装和启动zookeeper
+ server和agent必须连接同一个zookeeper,server端依赖redis
## 编译步骤:
@@ -112,179 +113,43 @@ Browser IE10+
1)下载源码:
> git clone https://github.com/jobxhub/jobx.git
-2):修改server端的配置信息
- 1:创建数据,数据库名字可以是jobx或者其他
- 2:进入jobx-server/src/main/resources 修改config.properties里的jdbc连接信息
-
- #jdbc
- jdbc.driver=com.mysql.jdbc.Driver
- jdbc.url=jdbc:mysql://${mysql_host}:3306/jobx?useUnicode=true&characterEncoding=UTF-8
- jdbc.username=${user}
- jdbc.password=${password}
-
- #redis
- redis.host=${redis.host}
- redis.password=${redis.password}
- redis.port=${redis.port}
-
- #zookeeper
- jobx.registry=zookeeper://${zookeeper_host}:2181?bakup=${zookeeper_host1}:2181,${zookeeper_host2}:2181
-
-3):修改agent端配置信息
- cd JobX/jobx-agent/src/conf/conf
-
- #zookepper注册中心
- jobx.registry=zookeeper://127.0.0.1:2181
+2):编译:
+ 进入deployment
+ > cd deployment
+ #修改配置信息
+ jobx.password=jobx
+ jobx.port=1577
+ jobx.registry=zookeeper://${zookeeper_host}:2181?bakup=${zookeeper_host1}:2181,${zookeeper_host2}:2181
-4):进入源码目录执行编译:
- *nix平台执行 sh build.sh
- window平台双击 build.bat
- 编译完成的文件在build/dist下
+ #jobx.host=
+ jdbc.driver=com.mysql.jdbc.Driver
+ jdbc.url=jdbc:mysql://localhost:3306/jobx
+ jdbc.username=root
+ jdbc.password=${password}
-5):启动agent
- 1)自动化部署
- *nix平台: 执行 sh agent.sh
- window平台: 需要进入jobx-agent/target下,解包jobx-agent-${version}.tar.gz到指定的位置,进入bin,执行startup.bat
+ jobx.cluster=false
+ jobx.cached=memcached
- 2) 手动部署agent步骤
+ redis.host=${redis.host}
+ redis.password=${redis.password}
+ redis.port=6379
+
+ memcached.servers=${memcached.servers}
+ memcached.protocol=BINARY
+
+ *nix平台执行 sh build.sh
+ window平台双击 build.bat
- 将jobx-agent-${version}.tar.gz包拷贝到要管理任务的目标服务器,解包,会看到以下目录
- ---bin/
- | startup.sh #agent的启动脚本,调用的是jobx.sh来完成
- | shutdown.sh #agent停止脚本,调用的是jobx.sh来完成
- | jobx.sh #agent控制启动|停止的脚本
- | monitor.sh #实时监控获取数据需要的脚本,由系统调度
- | kill.sh #kill任务时需要的脚本,由系统调度
- ---conf/
- conf.properties #agent配置文件
- | log4j.properties #log4j配置文件
- ---lib/
- | *.jar #agent运行需要的jar文件
- ---temp/
- | *.sh #用于存放项目生成的零时文件的目录
- ---logs
- | jobx.out #项目启动会产生的Log文件
-
- > tar -xzvf jobx-agent-${version}.tar.gz
- 1)修改conf/conf.properties里的配置信息
- #zookepper注册中心
- jobx.registry=zookeeper://${zookeeper_host}:2181
- #agent Ip,确保server可以通过此ip访问到该agent(主要实现agent自动注册)
- jobx.host=127.0.0.1
- 2)启动jobx-agent 进入jobx-agent/bin
- > cd jobx-agent/bin
- > sh startup.sh
- 这里可以接受两个参数,分别是服务启动的端口和密码(默认端口是:1577,默认密码:jobx)以及agent自动注册的url和密码
- 如要指定参数启动命令如下:
- > sh startup.sh -P10001 -p123456
- 参数说明:
- -P (大写的p)为agent启动的端口,选填,如果不输入默认启动端口是1577
- -p (小写的p)为当前agent的连接密码,选填,如果不输入默认连接该机器的密码是jobx
- 以下两个参数为agent自动注册需要的两个参数(选填)
- 该脚本启动之后agent就自动注册到server端了
- 更多详细的启动信息请查看logs/jobx.out
-
- 3)停止jobx-agent 进入jobx-agent/bin 执行:
- > cd jobx-agent/bin
- > sh shutdown.sh
-
-
-6):启动server
+3):启动
+ 1) agent
+ *nix平台: 执行 sh agent.sh
+ window平台: 需要进入jobx-agent/target下,解包jobx-agent-${version}.tar.gz 双击bin/startup.bat
- 1) 自动化部署
- *nix平台执行 server.sh
- window平台执行server.bat即可完成启动
-
- 2) 手动发布 tomcat或者其他web服务器
- tomcat发布项目步骤:
- 找到build/dist/jobx-server.war
- tomcat部署有两种部署方式
- 1):直接部署到webapps下:
- 1:下载tomcat8或者以上版本(http://tomcat.apache.org)
- 2:解压tomcat,删除webapps目录下的全部文件
- > rm -rf ${tomcat_home}/webapps/*
- 3:在webapps下新建ROOT文件夹
- > mkdir ${tomcat_home}/webapps/ROOT
- 4:将war解包到ROOT下并删除war文件(注意解包完毕一定要删除war包)
- > mv server.war ${tomcat_home}/webapps/ROOT
- > cd ${tomcat_home}/webapps/ROOT
- > jar -xvf server.war
- > rm -rf server.war
- 5:更改jdbc和zookeeper配置信息
- > vi ${tomcat_home}/webapps/ROOT/WEB-INF/classes/config.properties
- 6:完成启动
- 2):通过配置server.xml外部指向
- 1:将war包解压到指定的路径,如 /data/www/jobx,并删除war包
- 2:更改jdbc配置文件
- vi /data/www/jobx/WEB-INF/classes/config.properties
- 3:进入tomcat的conf中修改server.xml配置文件
- 下面附上我的完整的server.xml配置:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 配置里Host里的appBase和Context的docBase即为外部解压的项目的路径
- 推荐第二种外部部署的方式
-
- 启动tomcat,打开浏览器以$ip:$port的方式访问,如: http://192.168.0.188:8080
-
- 不论哪种方式部署,第一次会自动创建表,默认初始用户名jobx,密码jobx,第一次登陆会提示修改密码.
-
- 3):进入到jobx的管理端,如果agent也启动了,应该可以直接在server的执行器页面看到agent,则添加任务即可...
+ 2) server
+ *nix平台: 执行 sh server.sh 8080
+ window平台 双击 server.bat
+
+ 3) 进入到jobx的管理端,如果agent也启动了,应该可以直接在server的执行器页面看到agent,则添加任务即可...
```
@@ -293,12 +158,10 @@ Browser IE10+
1):如果自行编译项目的,有可能agent端的脚本执行失败,这时请更改agent/bin下所有的脚本的字符集
a) vim *.sh
b) :set ff=unix 保存退出即可
-
-2):如果脚本字符编码已经是unix,还是启动失败,请尝试给启动脚本添加权限 chmod 777 bin/*
-3):如果agent已经成功启动server还是连接不上,请检查agent端口是否开放(如很多云服务器得开放端口才能访问)
+2):如果agent已经成功启动server还是连接不上,请检查agent端口是否开放(如很多云服务器得开放端口才能访问)
-4):如果server端用nginx做反向代理,配置如下:
+3):如果server端用nginx做反向代理,配置如下:
upstream jobx {
@@ -346,14 +209,9 @@ server {
## 常见问题:
```
1)创建作业运行身份无法选择?
- 由于考虑到权限的问题,当前登录的用户不能随便指定任务的执行身份,需要超级管理员权限的用户(jobx)登录,在设置页面统一设置运行身份,多个运行身份用","分割,然后超级管理员在编辑用户
- 为该用户指定可以执行的身份(可选择多个),这样用户在创建任务的时候就可以选择指定身份去执行了
-
- 2) executor.so:cannot execute binary file
- 需要授权agent/bin下面的所有文件777权限
- > chmod 777 jobx-agent/bin/*
- 如果授权完还提示这个错,则需要进入源码目录(JobX/jobx-executor/src/main/c/executor.c),手动gcc编译下executor.c文件,然后将编译完的文件命名成executor.so,放到jobx-agent/bin下
- 并且授777权限
+ 由于考虑到权限的问题,当前登录的用户不能随便指定任务的执行身份,需要超级管理员权限的用户(jobx)登录,在设置页面统一设置运行身份,
+ 多个运行身份用","分割,然后超级管理员在编辑用户,为该用户指定可以执行的身份(可选择多个)
+ 这样用户在创建任务的时候就可以选择指定身份去执行了
```
diff --git a/agent.sh b/deployment/agent.sh
similarity index 86%
rename from agent.sh
rename to deployment/agent.sh
index 6b0839e5..e7c004e9 100644
--- a/agent.sh
+++ b/deployment/agent.sh
@@ -58,37 +58,34 @@ done
PRGDIR=`dirname "$PRG"`
WORKDIR=`cd "$PRGDIR" >/dev/null; pwd`;
+WORKBASE=`cd "$PRGDIR"/../ >/dev/null; pwd`;
# Get standard environment variables
###############################################################################################
APP_ARTIFACT=jobx-agent
APP_VERSION="1.2.0-RELEASE";
APP_TAR_NAME=${APP_ARTIFACT}-${APP_VERSION}.tar.gz
-MAVEN_TARGET_TAR="${WORKDIR}"/${APP_ARTIFACT}/target/${APP_TAR_NAME}
-DIST_PATH=${WORKDIR}/dist/
+MAVEN_TARGET_TAR="${WORKBASE}"/${APP_ARTIFACT}/target/${APP_TAR_NAME}
+DEPLOY_PATH=${WORKDIR}/jobx-agent
+CONFIG_TEMPLATE=${WORKDIR}/conf.properties
+CONFIG_PATH=${DEPLOY_PATH}/conf/conf.properties
###############################################################################################
-[ ! -d "${DIST_PATH}" ] && mkdir -p "${DIST_PATH}"
-
-DEPLOY_PATH=${WORKDIR}/dist/jobx-agent
-
#先检查dist下是否有war包
-if [ ! -f "${DIST_PATH}/${APP_TAR_NAME}" ] ; then
+if [ ! -f "${WORKDIR}/${APP_TAR_NAME}" ] ; then
#dist下没有tar包则检查agent的target下是否有tar包.
if [ ! -f "${MAVEN_TARGET_TAR}" ] ; then
echo_r "[JobX] please build project first!"
exit 0;
else
- cp ${MAVEN_TARGET_TAR} ${DIST_PATH};
+ cp ${MAVEN_TARGET_TAR} ${WORKDIR};
fi
fi
[ -d "${DEPLOY_PATH}" ] && rm -rf ${DEPLOY_PATH}/* || mkdir -p ${DEPLOY_PATH}
-
-tar -xzvf ${DIST_PATH}/${APP_TAR_NAME} -C ${DEPLOY_PATH}/../ >/dev/null 2>&1 && chmod +x ${DEPLOY_PATH}/bin/* >/dev/null 2>&1
-
+#untar..
+tar -xzvf ${WORKDIR}/${APP_TAR_NAME} && chmod +x ${DEPLOY_PATH}/bin/* >/dev/null 2>&1
EXECUTABLE=${DEPLOY_PATH}/bin/startup.sh
-
# Check that target executable exists
if $os400; then
# -x will Only work on the os400 if the files are:
@@ -104,5 +101,4 @@ else
exit 1
fi
fi
-
exec "$EXECUTABLE" "$@"
diff --git a/build.bat b/deployment/build.bat
similarity index 84%
rename from build.bat
rename to deployment/build.bat
index 396282f9..7142f691 100644
--- a/build.bat
+++ b/deployment/build.bat
@@ -29,13 +29,13 @@
@REM In debug mode we need a real JDK (JAVA_HOME)
echo\
-echo _______
-echo /\ _________ ______ _____ / /
-echo (()) ______ / ________ / ___ \/ /
-echo \/ ___ _ / _ __ \_ __ \ ___ /
-echo / /_/ / / /_/ / /_/ / __ . \
+echo _______
+echo /\ _________ ______ _____ / /
+echo (()) ______ / ________ / ___ \/ /
+echo \/ ___ _ / _ __ \_ __ \ ___ /
+echo / /_/ / / /_/ / /_/ / __ . \
echo \____/ \____/ /_.___/ __ / \_\__
-echo _____ /
+echo _____ /
echo\
if ""%1"" == ""debug"" goto needJavaHome
@@ -106,14 +106,14 @@ setlocal
@REM Guess JOBX_HOME if not defined
set "WORK_DIR=%~dp0"
-set "JOBX_HOME=%WORK_DIR%"
-set "JOBX_BASE=%JOBX_HOME%"
+set "JOBX_HOME=%WORK_DIR%\..\"
@REM #################################################################################################
set JOBX_VERSION=1.2.0-RELEASE
-set DIST_HOME=%JOBX_HOME%\dist
set JOBX_AGENT=%JOBX_HOME%\jobx-agent\target\jobx-agent-%JOBX_VERSION%.tar.gz
set JOBX_SERVER=%JOBX_HOME%\jobx-server\target\jobx-server-%JOBX_VERSION%.war
+set JOBX_AGENT_TAR=%WORK_DIR%\jobx-agent-%JOBX_VERSION%.tar.gz
+set JOBX_SERVER_WAR=%WORK_DIR%\jobx-server-%JOBX_VERSION%.war
set "EXECUTABLE=%JOBX_HOME%\.mvn\mvnw.cmd"
@REM #################################################################################################
@@ -123,16 +123,16 @@ echo This file is needed to run this program
goto exit
:okExec
-call "%EXECUTABLE%" "clean" "install" "-Dmaven.test.skip=true"
-if %errorlevel%==0 goto toDist
+call "%EXECUTABLE%" "-f" "%JOBX_HOME%\pom.xml" "clean" "install" "-Dmaven.test.skip=true"
+if %errorlevel%==0 goto toCopy
goto exit
-:toDist
-if exist "%DIST_HOME%" rd /s /q %DIST_HOME%
-if not exist "%DIST_HOME%" mkdir %DIST_HOME%
-copy %JOBX_AGENT% %DIST_HOME%
-copy %JOBX_SERVER% %DIST_HOME%
-echo [JobX] build jobx @Version %JOBX_VERSION% successfully! please goto %DIST_HOME%
+:toCopy
+if exist "%JOBX_AGENT_TAR%" rd /s /q %JOBX_AGENT_TAR%
+if exist "%JOBX_SERVER_WAR%" rd /s /q %JOBX_SERVER_WAR%
+copy %JOBX_AGENT% %WORK_DIR%
+copy %JOBX_SERVER% %WORK_DIR%
+echo [JobX] build jobx @Version %JOBX_VERSION% successfully! please goto %WORK_DIR%
pause
:exit
@@ -140,4 +140,3 @@ exit /b 1
:end
exit /b 0
-
diff --git a/build.sh b/deployment/build.sh
similarity index 79%
rename from build.sh
rename to deployment/build.sh
index 37e284da..424cc513 100644
--- a/build.sh
+++ b/deployment/build.sh
@@ -52,14 +52,16 @@ done
PRGDIR=`dirname "$PRG"`
-WORKDIR=`cd "$PRGDIR" >/dev/null; pwd`;
+WORK_DIR=`cd "$PRGDIR" >/dev/null; pwd`;
+WORK_BASE=`cd "$PRGDIR"/../ >/dev/null; pwd`;
# Get standard environment variables
##############################################################################################
JOBX_VERSION="1.2.0-RELEASE"; ##
-JOBX_AGENT=${WORKDIR}/jobx-agent/target/jobx-agent-${JOBX_VERSION}.tar.gz ##
-JOBX_SERVER=${WORKDIR}/jobx-server/target/jobx-server-${JOBX_VERSION}.war ##
-DIST_HOME="${WORKDIR}/dist" ##
+JOBX_AGENT=${WORK_BASE}/jobx-agent/target/jobx-agent-${JOBX_VERSION}.tar.gz ##
+JOBX_SERVER=${WORK_BASE}/jobx-server/target/jobx-server-${JOBX_VERSION}.war ##
+EXEC_LIB=${WORK_DIR}/executor.c ##
+JOBX_AGENT_BIN_DIR=${WORK_BASE}/jobx-agent/src/assembly/bin ##
##############################################################################################
echo_r () {
@@ -151,7 +153,6 @@ if [ "$1" = "debug" ] ; then
fi
fi
fi
-
# Don't override the endorsed dir if the user has set it previously
if [ -z "$JAVA_ENDORSED_DIRS" ]; then
# Set the default -Djava.endorsed.dirs argument
@@ -170,37 +171,54 @@ if [ -z "$JAVACMD" ] ; then
JAVACMD="`which java`"
fi
fi
-
#check java exists.
$JAVACMD >/dev/null 2>&1
-
if [ $? -ne 1 ];then
echo_r "ERROR: java is not install,please install java first!"
exit 1;
fi
-
#check openjdk
if [ "`$JAVACMD -version 2>&1 | head -1|grep "openjdk"|wc -l`"x == "1"x ]; then
echo_r "ERROR: please uninstall OpenJDK and install JDK 1.7+ first"
exit 1;
fi
-
echo_w "build jobx Starting...";
-
-if [ ! -f "${WORKDIR}/.mvn/mvnw" ];then
- echo_r "ERROR: ${WORKDIR}/.mvn/mvnw is not exists,This file is needed to run this program!"
+if [ ! -f "${WORK_BASE}/.mvn/mvnw" ];then
+ echo_r "ERROR: ${WORK_BASE}/.mvn/mvnw is not exists,This file is needed to run this program!"
exit 1;
fi
-${WORKDIR}/.mvn/mvnw clean install -Dmaven.test.skip=true;
+#gcc compile executor.c
+exec_retval=0
+GCCCMD="`which gcc`" >/dev/null 2>&1
+if [ $? -eq 1 ];then
+ exec_retval=-1
+else
+ echo_g "compile executor.c starting..."
+ ${GCCCMD} ${EXEC_LIB} -o executor.so
+ ret_val=$?
+ if [ ${ret_val} -eq 0 ] ; then
+ exec_retval=0
+ mv executor.so ${JOBX_AGENT_BIN_DIR}
+ else
+ exec_retval=1
+ fi
+fi
-retval=$?
-if [ ${retval} -eq 0 ] ; then
- [ ! -d "${DIST_HOME}" ] && mkdir ${DIST_HOME} || rm -rf ${DIST_HOME}/* ;
- cp ${JOBX_AGENT} ${DIST_HOME}
- cp ${JOBX_SERVER} ${DIST_HOME}
- printf "[${BLUE_COLOR}jobx${RES}] ${WHITE_COLOR}build jobx @Version ${JOBX_VERSION} successfully! please goto${RES} ${GREEN_COLOR}${DIST_HOME}${RES}\n"
+${WORK_BASE}/.mvn/mvnw -f ${WORK_BASE}/pom.xml clean install -Dmaven.test.skip=true;
+ret_val=$?
+if [ ${ret_val} -eq 0 ] ; then
+ cp ${JOBX_AGENT} ${WORK_DIR}
+ cp ${JOBX_SERVER} ${WORK_DIR}
+ printf "[${BLUE_COLOR}jobx${RES}] ${WHITE_COLOR}build jobx @Version ${JOBX_VERSION} successfully! please goto${RES} ${GREEN_COLOR}${WORK_DIR}${RES}\n"
+ if [ ${exec_retval} -eq 1 ]; then
+ echo_w "WARN: compile executor.c error,please compile executor.c by yourself."
+ elif [ ${exec_retval} -eq -1 ]; then
+ echo_w "WARN: compile executor.c error,not found gcc,please compile executor.c by yourself."
+ else
+ rm -rf ${JOBX_AGENT_BIN_DIR}/executor.so >/dev/null 2>&1
+ fi
exit 0
else
echo_r "build jobx failed! please try again "
diff --git a/jobx-executor/src/main/c/executor.c b/deployment/executor.c
similarity index 99%
rename from jobx-executor/src/main/c/executor.c
rename to deployment/executor.c
index f2c88459..42eddf58 100644
--- a/jobx-executor/src/main/c/executor.c
+++ b/deployment/executor.c
@@ -129,4 +129,4 @@ int main(int argc, char **argv){
return 0;
}
-}
+}
\ No newline at end of file
diff --git a/server.bat b/deployment/server.bat
similarity index 92%
rename from server.bat
rename to deployment/server.bat
index 36d18168..acefe31c 100644
--- a/server.bat
+++ b/deployment/server.bat
@@ -103,23 +103,21 @@ setlocal
@REM Guess JOBX_HOME if not defined
set "WORK_DIR=%~dp0"
-set "JOBX_HOME=%WORK_DIR%"
-set "JOBX_BASE=%JOBX_HOME%"
+set "WORK_BASE=%WORK_DIR%\..\"
@REM #################################################################################################
set APP_ARTIFACT=jobx-server
set APP_VERSION=1.2.0-RELEASE
set APP_WAR_NAME=%APP_ARTIFACT%-%APP_VERSION%.war
-set MAVEN_TARGET_WAR=%JOBX_HOME%%APP_ARTIFACT%\target\%APP_WAR_NAME%
-set DIST_PATH=%JOBX_HOME%dist
-set DEPLOY_PATH=%DIST_PATH%\%APP_ARTIFACT%
+set MAVEN_TARGET_WAR=%WORK_BASE%%APP_ARTIFACT%\target\%APP_WAR_NAME%
+set DEPLOY_PATH=%WORK_DIR%\%APP_ARTIFACT%
set CONTAINER_PATH=%DEPLOY_PATH%\container
@REM #################################################################################################
-if exist "%DIST_PATH%\%APP_WAR_NAME%" goto initEnv
-if not exist %DIST_PATH% mkdir %DIST_PATH%
+if exist "%WORK_BASE%\%APP_WAR_NAME%" goto initEnv
+if not exist %WORK_BASE% mkdir %WORK_BASE%
if exist "%MAVEN_TARGET_WAR%" (
- copy %MAVEN_TARGET_WAR% %DIST_PATH%
+ copy %MAVEN_TARGET_WAR% %WORK_BASE%
goto initEnv
) else (
echo [JobX] please build project first!
@@ -129,7 +127,7 @@ if exist "%MAVEN_TARGET_WAR%" (
:initEnv
if not exist "%DEPLOY_PATH%" (
mkdir %DEPLOY_PATH%
- copy %DIST_PATH%\%APP_WAR_NAME% %DEPLOY_PATH%
+ copy %WORK_BASE%\%APP_WAR_NAME% %DEPLOY_PATH%
cd %DEPLOY_PATH%
%_RUNJAR% xvf %APP_WAR_NAME% 1>nul
del %DEPLOY_PATH%\%APP_WAR_NAME%
@@ -141,7 +139,7 @@ cd %DEPLOY_PATH%
@REM copy container to deploy_path
if not exist "%CONTAINER_PATH%" (
mkdir %CONTAINER_PATH%
- xcopy %JOBX_HOME%%APP_ARTIFACT%\container %CONTAINER_PATH% /E 1>nul
+ xcopy %WORK_BASE%%APP_ARTIFACT%\container %CONTAINER_PATH% /E 1>nul
)
@REM create log
set LOG_PATH=%CONTAINER_PATH%\logs
diff --git a/server.sh b/deployment/server.sh
similarity index 85%
rename from server.sh
rename to deployment/server.sh
index 9eb2b144..8cca8635 100644
--- a/server.sh
+++ b/deployment/server.sh
@@ -107,45 +107,39 @@ done
# Get standard environment variables
PRGDIR=`dirname "$PRG"`
-WORK_DIR=`cd "$PRGDIR" >/dev/null; pwd`;
+WORKDIR=`cd "$PRGDIR" >/dev/null; pwd`;
+WORKBASE=`cd "$PRGDIR"/../ >/dev/null; pwd`;
# Get standard environment variables
###############################################################################################
APP_ARTIFACT=jobx-server
APP_VERSION="1.2.0-RELEASE";
APP_WAR_NAME=${APP_ARTIFACT}-${APP_VERSION}.war
-MAVEN_TARGET_WAR=${WORK_DIR}/${APP_ARTIFACT}/target/${APP_WAR_NAME}
-DIST_PATH=${WORK_DIR}/dist/
-DEPLOY_PATH=${DIST_PATH}${APP_ARTIFACT}
+MAVEN_TARGET_WAR=${WORKBASE}/${APP_ARTIFACT}/target/${APP_WAR_NAME}
+DEPLOY_PATH=${WORKDIR}/${APP_ARTIFACT}
LIB_PATH=${DEPLOY_PATH}/WEB-INF/lib
-CONTAINER_PATH=${DEPLOY_PATH}/container
-LOG_PATH=${CONTAINER_PATH}/logs
+CONFIG_TEMPLATE=${WORKDIR}/conf.properties
+CONFIG_PATH=${DEPLOY_PATH}/WEB-INF/classes/config.properties
###############################################################################################
#先检查dist下是否有war包
-if [ ! -f "${DIST_PATH}/${APP_WAR_NAME}" ] ; then
+if [ ! -f "${WORKDIR}/${APP_WAR_NAME}" ] ; then
#dist下没有war包则检查server的target下是否有war包.
if [ ! -f "${MAVEN_TARGET_WAR}" ] ; then
echo_w "[JobX] please build project first!"
exit 0;
else
- cp ${MAVEN_TARGET_WAR} ${DIST_PATH};
+ cp ${MAVEN_TARGET_WAR} ${WORKDIR};
fi
fi
if [ ! -f "${DEPLOY_PATH}" ] ; then
mkdir -p ${DEPLOY_PATH}
# unpackage war to dist
- cp ${DIST_PATH}/${APP_WAR_NAME} ${DEPLOY_PATH} &&
+ cp ${WORKDIR}/${APP_WAR_NAME} ${DEPLOY_PATH} &&
cd ${DEPLOY_PATH} &&
${RUNJAR} xvf ${APP_WAR_NAME} >/dev/null 2>&1 &&
- rm -rf ${DEPLOY_PATH}/${APP_WAR_NAME} &&
- #copy jars...
- cp -r ${WORK_DIR}/${APP_ARTIFACT}/container ${DEPLOY_PATH}
+ rm -rf ${DEPLOY_PATH}/${APP_WAR_NAME}
fi
-if [ ! -d "${LOG_PATH}" ] ; then
- mkdir -p ${LOG_PATH}
-fi
-LOG_PATH=${LOG_PATH}/jobx.out
# Add jars to classpath
if [ ! -z "$CLASSPATH" ] ; then
@@ -157,10 +151,8 @@ do
CLASSPATH="$CLASSPATH":"$jar"
done
CLASSPATH="$CLASSPATH":${DEPLOY_PATH}/WEB-INF/classes
-
#default launcher
[ -z "${JOBX_LAUNCHER}" ] && JOBX_LAUNCHER="tomcat";
-
#server'port
if [ $# -gt 0 ] ;then
JOBX_PORT=$1
@@ -174,19 +166,14 @@ if [ $# -gt 0 ] ;then
fi
fi
[ -z "${JOBX_PORT}" ] && JOBX_PORT="20501";
-
#start server....
printf "[${BLUE_COLOR}jobx${RES}] ${WHITE_COLOR} server Starting @ [${GREEN_COLOR}${JOBX_PORT}${RES}].... ${RES}\n"
-
-MAIN="com.jobxhub.server.bootstrap.Startup"
+MAIN="com.jobxhub.server.JobXServer"
cd ${DEPLOY_PATH}
eval "$RUNJAVA" \
-classpath "$CLASSPATH" \
-Dserver.launcher=${JOBX_LAUNCHER} \
-Dserver.port=${JOBX_PORT} \
- ${MAIN} $1 >> ${LOG_PATH} 2>&1 &
-
-printf "[${BLUE_COLOR}jobx${RES}] ${WHITE_COLOR} please see log for more detail:${RES}${GREEN_COLOR} $LOG_PATH ${RES}\n"
-
+ ${MAIN} $1 >> /dev/null 2>&1 &
+printf "[${BLUE_COLOR}jobx${RES}] ${WHITE_COLOR} please see log for more detail:${RES}${GREEN_COLOR} ${DEPLOY_PATH}/jobx.out ${RES}\n"
exit $?
-
diff --git a/jobx-agent/assembly.xml b/jobx-agent/assembly.xml
index 5d57c52b..586affe8 100644
--- a/jobx-agent/assembly.xml
+++ b/jobx-agent/assembly.xml
@@ -11,33 +11,34 @@
- src/conf/bin
+ src/assembly/binbin
+ 0755
- ${project.build.directory}/bin
- bin
-
-
- src/conf/lib
+ src/assembly/liblib
+ 0644
- src/conf/native
+ src/assembly/nativenative
+ 0644
- src/conf/conf
+ src/assembly/confconf
+ 0644
- src/conf/logs
+ src/assembly/logslogs
+ 0644
-
- src/conf/temp
+ src/assembly/temptemp
+ 0755
\ No newline at end of file
diff --git a/jobx-agent/pom.xml b/jobx-agent/pom.xml
index 4347754f..50339595 100644
--- a/jobx-agent/pom.xml
+++ b/jobx-agent/pom.xml
@@ -3,17 +3,13 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-agentjarjobx-agent
-
- src/main/c
-
-
com.jobxhub
@@ -21,7 +17,7 @@
com.jobxhub
- jobx-rpc
+ jobx-rpc-apicom.corundumstudio.socketio
@@ -39,7 +35,6 @@
net.java.dev.jnajna
-
net.java.dev.jnajna-platform
diff --git a/jobx-agent/src/conf/bin/jobx.bat b/jobx-agent/src/assembly/bin/jobx.bat
similarity index 100%
rename from jobx-agent/src/conf/bin/jobx.bat
rename to jobx-agent/src/assembly/bin/jobx.bat
diff --git a/jobx-agent/src/conf/bin/jobx.sh b/jobx-agent/src/assembly/bin/jobx.sh
similarity index 100%
rename from jobx-agent/src/conf/bin/jobx.sh
rename to jobx-agent/src/assembly/bin/jobx.sh
diff --git a/jobx-agent/src/assembly/bin/kill.sh b/jobx-agent/src/assembly/bin/kill.sh
new file mode 100644
index 00000000..2526e627
--- /dev/null
+++ b/jobx-agent/src/assembly/bin/kill.sh
@@ -0,0 +1,76 @@
+#!/bin/bash
+#
+# Copyright (c) 2015 The JobX Project
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+#kill_model
+# 0) soft kill
+# 1) force kill
+#
+kill_model=$1
+
+#this process id
+pid=$2
+
+if [ !"${kill_model}" == "0"x ] ||[ !"${kill_model}" == "1"x ]; then
+ echo "[JobX] the first args must be [0|1]"
+ exit 1;
+fi
+
+if [ "${pid}"x == ""x ];then
+ echo "[JobX] pid is null"
+ exit 1;
+fi
+
+
+# if pstree exists
+if [ -n "`which pstree`" ];then
+ array=$(pstree -p ${pid}| sed 's/[^0-9]/ /g');
+ for id in ${array}
+ do
+ if [ ${pid} -gt 300 ];then
+ if [ "${kill_model}" == "0"x ];then
+ kill ${id} >/dev/null 2>&1;
+ else
+ kill -9 ${id} >/dev/null 2>&1;
+ fi
+ fi
+ done
+else
+ while true
+ do
+ #find pid by ppid
+ cmd="ps -ef|awk '{if(\$3~/${pid}/) print \$2}'"
+ pid=$(eval ${cmd})
+ if [ "${pid}"x == ""x ] || [ ${pid} -lt 300 ] ; then
+ break;
+ fi
+ if [ "${kill_model}"x == "0"x ];then
+ kill ${pid} >/dev/null 2>&1;
+ else
+ kill -9 ${pid} >/dev/null 2>&1;
+ fi
+ done
+fi
+
+echo "[JobX] kill done";
+
+exit 0;
\ No newline at end of file
diff --git a/jobx-agent/src/conf/bin/monitor.sh b/jobx-agent/src/assembly/bin/monitor.sh
similarity index 100%
rename from jobx-agent/src/conf/bin/monitor.sh
rename to jobx-agent/src/assembly/bin/monitor.sh
diff --git a/jobx-agent/src/conf/bin/setclasspath.bat b/jobx-agent/src/assembly/bin/setclasspath.bat
similarity index 100%
rename from jobx-agent/src/conf/bin/setclasspath.bat
rename to jobx-agent/src/assembly/bin/setclasspath.bat
diff --git a/jobx-agent/src/conf/bin/setclasspath.sh b/jobx-agent/src/assembly/bin/setclasspath.sh
similarity index 100%
rename from jobx-agent/src/conf/bin/setclasspath.sh
rename to jobx-agent/src/assembly/bin/setclasspath.sh
diff --git a/jobx-agent/src/conf/bin/shutdown.bat b/jobx-agent/src/assembly/bin/shutdown.bat
similarity index 100%
rename from jobx-agent/src/conf/bin/shutdown.bat
rename to jobx-agent/src/assembly/bin/shutdown.bat
diff --git a/jobx-agent/src/conf/bin/shutdown.sh b/jobx-agent/src/assembly/bin/shutdown.sh
similarity index 100%
rename from jobx-agent/src/conf/bin/shutdown.sh
rename to jobx-agent/src/assembly/bin/shutdown.sh
diff --git a/jobx-agent/src/conf/bin/startup.bat b/jobx-agent/src/assembly/bin/startup.bat
similarity index 100%
rename from jobx-agent/src/conf/bin/startup.bat
rename to jobx-agent/src/assembly/bin/startup.bat
diff --git a/jobx-agent/src/conf/bin/startup.sh b/jobx-agent/src/assembly/bin/startup.sh
similarity index 100%
rename from jobx-agent/src/conf/bin/startup.sh
rename to jobx-agent/src/assembly/bin/startup.sh
diff --git a/jobx-agent/src/conf/conf/conf.properties b/jobx-agent/src/assembly/conf/conf.properties
similarity index 100%
rename from jobx-agent/src/conf/conf/conf.properties
rename to jobx-agent/src/assembly/conf/conf.properties
diff --git a/jobx-agent/src/conf/conf/log4j.properties b/jobx-agent/src/assembly/conf/log4j.properties
similarity index 100%
rename from jobx-agent/src/conf/conf/log4j.properties
rename to jobx-agent/src/assembly/conf/log4j.properties
diff --git a/jobx-agent/src/conf/logs/.keep b/jobx-agent/src/assembly/logs/.keep
similarity index 100%
rename from jobx-agent/src/conf/logs/.keep
rename to jobx-agent/src/assembly/logs/.keep
diff --git a/jobx-agent/src/conf/native/libsigar-amd64-freebsd-6.so b/jobx-agent/src/assembly/native/libsigar-amd64-freebsd-6.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-amd64-freebsd-6.so
rename to jobx-agent/src/assembly/native/libsigar-amd64-freebsd-6.so
diff --git a/jobx-agent/src/conf/native/libsigar-amd64-linux.so b/jobx-agent/src/assembly/native/libsigar-amd64-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-amd64-linux.so
rename to jobx-agent/src/assembly/native/libsigar-amd64-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-amd64-solaris.so b/jobx-agent/src/assembly/native/libsigar-amd64-solaris.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-amd64-solaris.so
rename to jobx-agent/src/assembly/native/libsigar-amd64-solaris.so
diff --git a/jobx-agent/src/conf/native/libsigar-ia64-hpux-11.sl b/jobx-agent/src/assembly/native/libsigar-ia64-hpux-11.sl
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ia64-hpux-11.sl
rename to jobx-agent/src/assembly/native/libsigar-ia64-hpux-11.sl
diff --git a/jobx-agent/src/conf/native/libsigar-ia64-linux.so b/jobx-agent/src/assembly/native/libsigar-ia64-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ia64-linux.so
rename to jobx-agent/src/assembly/native/libsigar-ia64-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-pa-hpux-11.sl b/jobx-agent/src/assembly/native/libsigar-pa-hpux-11.sl
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-pa-hpux-11.sl
rename to jobx-agent/src/assembly/native/libsigar-pa-hpux-11.sl
diff --git a/jobx-agent/src/conf/native/libsigar-ppc-aix-5.so b/jobx-agent/src/assembly/native/libsigar-ppc-aix-5.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ppc-aix-5.so
rename to jobx-agent/src/assembly/native/libsigar-ppc-aix-5.so
diff --git a/jobx-agent/src/conf/native/libsigar-ppc-linux.so b/jobx-agent/src/assembly/native/libsigar-ppc-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ppc-linux.so
rename to jobx-agent/src/assembly/native/libsigar-ppc-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-ppc64-aix-5.so b/jobx-agent/src/assembly/native/libsigar-ppc64-aix-5.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ppc64-aix-5.so
rename to jobx-agent/src/assembly/native/libsigar-ppc64-aix-5.so
diff --git a/jobx-agent/src/conf/native/libsigar-ppc64-linux.so b/jobx-agent/src/assembly/native/libsigar-ppc64-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-ppc64-linux.so
rename to jobx-agent/src/assembly/native/libsigar-ppc64-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-s390x-linux.so b/jobx-agent/src/assembly/native/libsigar-s390x-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-s390x-linux.so
rename to jobx-agent/src/assembly/native/libsigar-s390x-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-sparc-solaris.so b/jobx-agent/src/assembly/native/libsigar-sparc-solaris.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-sparc-solaris.so
rename to jobx-agent/src/assembly/native/libsigar-sparc-solaris.so
diff --git a/jobx-agent/src/conf/native/libsigar-sparc64-solaris.so b/jobx-agent/src/assembly/native/libsigar-sparc64-solaris.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-sparc64-solaris.so
rename to jobx-agent/src/assembly/native/libsigar-sparc64-solaris.so
diff --git a/jobx-agent/src/conf/native/libsigar-universal-macosx.dylib b/jobx-agent/src/assembly/native/libsigar-universal-macosx.dylib
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-universal-macosx.dylib
rename to jobx-agent/src/assembly/native/libsigar-universal-macosx.dylib
diff --git a/jobx-agent/src/conf/native/libsigar-universal64-macosx.dylib b/jobx-agent/src/assembly/native/libsigar-universal64-macosx.dylib
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-universal64-macosx.dylib
rename to jobx-agent/src/assembly/native/libsigar-universal64-macosx.dylib
diff --git a/jobx-agent/src/conf/native/libsigar-x86-freebsd-5.so b/jobx-agent/src/assembly/native/libsigar-x86-freebsd-5.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-x86-freebsd-5.so
rename to jobx-agent/src/assembly/native/libsigar-x86-freebsd-5.so
diff --git a/jobx-agent/src/conf/native/libsigar-x86-freebsd-6.so b/jobx-agent/src/assembly/native/libsigar-x86-freebsd-6.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-x86-freebsd-6.so
rename to jobx-agent/src/assembly/native/libsigar-x86-freebsd-6.so
diff --git a/jobx-agent/src/conf/native/libsigar-x86-linux.so b/jobx-agent/src/assembly/native/libsigar-x86-linux.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-x86-linux.so
rename to jobx-agent/src/assembly/native/libsigar-x86-linux.so
diff --git a/jobx-agent/src/conf/native/libsigar-x86-solaris.so b/jobx-agent/src/assembly/native/libsigar-x86-solaris.so
similarity index 100%
rename from jobx-agent/src/conf/native/libsigar-x86-solaris.so
rename to jobx-agent/src/assembly/native/libsigar-x86-solaris.so
diff --git a/jobx-agent/src/conf/native/sigar-amd64-winnt.dll b/jobx-agent/src/assembly/native/sigar-amd64-winnt.dll
similarity index 100%
rename from jobx-agent/src/conf/native/sigar-amd64-winnt.dll
rename to jobx-agent/src/assembly/native/sigar-amd64-winnt.dll
diff --git a/jobx-agent/src/conf/native/sigar-x86-winnt.dll b/jobx-agent/src/assembly/native/sigar-x86-winnt.dll
similarity index 100%
rename from jobx-agent/src/conf/native/sigar-x86-winnt.dll
rename to jobx-agent/src/assembly/native/sigar-x86-winnt.dll
diff --git a/jobx-agent/src/conf/native/sigar-x86-winnt.lib b/jobx-agent/src/assembly/native/sigar-x86-winnt.lib
similarity index 100%
rename from jobx-agent/src/conf/native/sigar-x86-winnt.lib
rename to jobx-agent/src/assembly/native/sigar-x86-winnt.lib
diff --git a/jobx-agent/src/conf/temp/.keep b/jobx-agent/src/assembly/temp/.keep
similarity index 100%
rename from jobx-agent/src/conf/temp/.keep
rename to jobx-agent/src/assembly/temp/.keep
diff --git a/jobx-agent/src/conf/bin/executor.so b/jobx-agent/src/conf/bin/executor.so
deleted file mode 100755
index cc59c71f..00000000
Binary files a/jobx-agent/src/conf/bin/executor.so and /dev/null differ
diff --git a/jobx-agent/src/main/java/com/jobxhub/agent/bootstrap/JobXAgent.java b/jobx-agent/src/main/java/com/jobxhub/agent/bootstrap/JobXAgent.java
index 3c566f2e..7fb627f3 100755
--- a/jobx-agent/src/main/java/com/jobxhub/agent/bootstrap/JobXAgent.java
+++ b/jobx-agent/src/main/java/com/jobxhub/agent/bootstrap/JobXAgent.java
@@ -39,7 +39,6 @@
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -397,7 +396,7 @@ private void await() throws Exception {
* @throws Exception
*/
- private void shutdown() throws Exception {
+ private void shutdown() {
String address = "localhost";
@@ -412,11 +411,6 @@ private void shutdown() throws Exception {
}
stream.flush();
socket.close();
- } catch (ConnectException ce) {
- if (logger.isErrorEnabled()) {
- logger.error("[JobX] Agent.stop error:{} ", ce);
- }
- System.exit(1);
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error("[JobX] Agent.stop error:{} ", e);
diff --git a/jobx-agent/src/main/java/com/jobxhub/agent/process/ExecuteUser.java b/jobx-agent/src/main/java/com/jobxhub/agent/process/ExecuteUser.java
index 1b4e23c5..2176454b 100644
--- a/jobx-agent/src/main/java/com/jobxhub/agent/process/ExecuteUser.java
+++ b/jobx-agent/src/main/java/com/jobxhub/agent/process/ExecuteUser.java
@@ -22,19 +22,22 @@
package com.jobxhub.agent.process;
import com.jobxhub.common.Constants;
+import com.jobxhub.common.util.AssertUtils;
+import com.jobxhub.common.util.CommonUtils;
import com.jobxhub.common.util.IOUtils;
-import com.jobxhub.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+
+import static com.jobxhub.common.util.CommandUtils.*;
public class ExecuteUser {
private final static Logger logger = LoggerFactory.getLogger(ExecuteUser.class);
+
public ExecuteUser() {
if (!IOUtils.fileExists(Constants.JOBX_EXECUTE_AS_USER_LIB)) {
throw new RuntimeException("[JobX]not found ExecuteUser binary. Invalid Path: " + Constants.JOBX_EXECUTE_AS_USER_LIB);
@@ -44,14 +47,14 @@ public ExecuteUser() {
/**
* API to execute a command on behalf of another user.
*
- * @param user The proxy user
+ * @param user The proxy user
* @param command the list containing the program and its arguments
* @return The return value of the shell command
*/
- public int execute(final String user, final List command) throws IOException {
- logger.info("[Jobx]execute Command {} ",StringUtils.joinString(command));
+ public static int execute(final String user, final File file, final String command) throws IOException {
+ logger.info("[Jobx]execute Command {} ", command);
final Process process = new ProcessBuilder()
- .command(buildCommand(user, command))
+ .command(buildCommand(user, file, command))
.inheritIO()
.start();
int exitCode;
@@ -64,11 +67,32 @@ public int execute(final String user, final List command) throws IOExcep
return exitCode;
}
- public static List buildCommand(final String user, final List command) {
- final List commandList = new ArrayList();
- commandList.add(Constants.JOBX_EXECUTE_AS_USER_LIB);
- commandList.add(user);
- commandList.addAll(command);
- return commandList;
+ public static String buildCommand(final String proxyUser, final File execFile, final String command) {
+ AssertUtils.notNull(command);
+
+ if (CommonUtils.isWindows()) {
+ return command;
+ }
+
+ //写入命令到文件
+ write(execFile, command);
+
+ String execCmd = String.format("/bin/bash +x %s", execFile.getAbsolutePath());
+ if ( CommonUtils.notEmpty(proxyUser)) {
+ //授权文件...
+ try {
+ chown(false, proxyUser, proxyUser, execFile);
+ } catch (Exception e) {
+ throw new RuntimeException("[JobX] chown command file error,{}", e.getCause());
+ }
+ return Constants.JOBX_EXECUTE_AS_USER_LIB
+ .concat(IOUtils.BLANK_CHAR)
+ .concat(proxyUser)
+ .concat(IOUtils.BLANK_CHAR)
+ .concat(execCmd);
+
+ }
+
+ return execCmd;
}
}
diff --git a/jobx-agent/src/main/java/com/jobxhub/agent/process/JobXProcess.java b/jobx-agent/src/main/java/com/jobxhub/agent/process/JobXProcess.java
index 69e3465c..337f24c3 100644
--- a/jobx-agent/src/main/java/com/jobxhub/agent/process/JobXProcess.java
+++ b/jobx-agent/src/main/java/com/jobxhub/agent/process/JobXProcess.java
@@ -23,6 +23,7 @@
import com.jobxhub.agent.util.ProcessLogger;
import com.jobxhub.common.Constants;
+import com.jobxhub.common.Constants.ExitCode ;
import com.jobxhub.common.logging.LoggerFactory;
import com.jobxhub.common.util.CommonUtils;
import com.jobxhub.common.util.IOUtils;
@@ -49,61 +50,48 @@ public class JobXProcess {
private Logger processLogger;
- private final String workingDir;
-
- public static String KILL_COMMAND = "kill";
-
- private final List command;
+ private final String command;
private final int timeout;
private final CountDownLatch startupLatch;
private final CountDownLatch completeLatch;
+ private Integer processId;
+ private ExitCode kill;
private File logFile;
- private volatile Integer processId;
- private volatile Process process;
- private boolean killed = false;
+ private File execShell;
+ private Process process;
private String execUser;
- private final String runAsUserBinary = Constants.JOBX_EXECUTE_AS_USER_LIB;
public JobXProcess(String command, int timeout, String pid, String execUser) {
- this.workingDir = IOUtils.getTmpdir();
this.timeout = timeout;
- this.logFile = new File(Constants.JOBX_LOG_PATH + "/." + pid + ".log");
+ this.logFile = getLogFile(pid);
this.processId = -1;
this.processLogger = this.getLogger(pid);
this.startupLatch = new CountDownLatch(1);
this.completeLatch = new CountDownLatch(1);
this.execUser = execUser;
- List commandLine = getCommandLine(command);
- if (isExecAsUser()) {
- this.command = ExecuteUser.buildCommand(execUser,commandLine);
- }else {
- this.command = commandLine;
- }
+ this.execShell = getExecShell(pid);
+ this.command = ExecuteUser.buildCommand(execUser, execShell, command);
}
/**
* Execute this process, blocking until it has completed.
*/
public int start() {
-
if (this.isStarted() || this.isComplete()) {
throw new IllegalStateException("[JobX]The process can only be used once.");
}
- ProcessBuilder builder = new ProcessBuilder(this.command);
- builder.directory(new File(this.workingDir));
- builder.redirectErrorStream(true);
-
int exitCode = -1;
try {
this.watchTimeOut();
- this.process = builder.start();
+ this.process = Runtime.getRuntime().exec(this.command);
this.processId = getProcessId();
- if (processId == null) {
- this.logger.debug("[JobX]Spawned thread with unknown process id");
+ if (this.processId == 0) {
+ this.logger.info("[JobX]Spawned thread with unknown process id");
} else {
- this.logger.debug("[JobX]Spawned thread with process id " + processId);
+ this.logger.info("[JobX]Spawned thread with process id " + this.processId);
}
+
this.startupLatch.countDown();
ProcessLogger outputLogger = ProcessLogger.getLoger(this.process.getInputStream(), this.processLogger, Level.INFO);
ProcessLogger errorLogger = ProcessLogger.getLoger(this.process.getErrorStream(), this.processLogger, Level.ERROR);
@@ -136,11 +124,12 @@ public int start() {
IOUtils.closeQuietly(this.process.getInputStream());
IOUtils.closeQuietly(this.process.getOutputStream());
IOUtils.closeQuietly(this.process.getErrorStream());
+
//最后以特殊不了见的字符作为log和exitCode+结束时间的分隔符.
this.processLogger.info(IOUtils.FIELD_TERMINATED_BY + exitCode + IOUtils.TAB + new Date().getTime());
this.process.destroy();
- if (this.killed) {
- exitCode = Constants.StatusCode.KILL.getValue();
+ if (this.kill!=null) {
+ exitCode = this.kill.getValue();
}
return exitCode;
}
@@ -153,10 +142,10 @@ private void watchTimeOut() {
@Override
public void run() {
//kill job...
- kill();
+ kill(Constants.ExitCode.TIME_OUT);
timer.cancel();
}
- }, 0,timeout * 60 * 1000);
+ }, timeout * 60 * 1000);
}
}
@@ -174,11 +163,17 @@ public String getLogMessage() {
}
public void deleteLog() {
- if (this.logFile.exists()) {
+ if (CommonUtils.notEmpty(this.logFile)) {
this.logFile.delete();
}
}
+ public void deleteExecShell() {
+ if (CommonUtils.notEmpty(this.execShell)) {
+ this.execShell.delete();
+ }
+ }
+
/**
* Await the completion of this process
*
@@ -199,21 +194,20 @@ public void awaitStartup() throws InterruptedException {
this.startupLatch.await();
}
- public void kill() {
+ public void kill(ExitCode kill) {
if (isStarted()) {
+ this.kill = kill;
try {
- if (CommonUtils.isWindows()) {
- killed = true;
- hardKill();
- }else {
- boolean flag = softKill(1000*5,TimeUnit.SECONDS);
- if (!flag) {
- hardKill();
+ if (CommonUtils.isUnix()) {
+ boolean killed = this.softKill(1000*5,TimeUnit.SECONDS);
+ if (!killed) {
+ this.hardKill();
}
+ }else {
+ this.hardKill();
}
}catch (Exception e) {
- killed = false;
- logger.info("[JobX]Kill attempt failed:{}",e.getMessage());
+ e.printStackTrace();
}
}
}
@@ -228,22 +222,22 @@ public void kill() {
private boolean softKill(long time, TimeUnit unit) throws InterruptedException {
if (this.processId != 0 && isStarted()) {
try {
+ String KillCMD = String.format("/bin/bash +x %s %d %d",Constants.JOBX_KILL_FILE.getAbsolutePath(),0,this.processId);
if (isExecAsUser()) {
- String cmd = String.format(
- "%s %s %s %d",
- runAsUserBinary,
+ KillCMD = String.format(
+ "%s %s %s",
+ Constants.JOBX_EXECUTE_AS_USER_LIB,
this.execUser,
- KILL_COMMAND,
- this.processId
+ KillCMD
);
- Runtime.getRuntime().exec(cmd);
- } else {
- String cmd = String.format("%s %d", KILL_COMMAND, this.processId);
- Runtime.getRuntime().exec(cmd);
}
+ Process process = Runtime.getRuntime().exec(KillCMD);
+ process.waitFor();
+ process.destroy();
+ this.processLogger.error("[JobX]hardKill successful,pid:" + this.processId);
return this.completeLatch.await(time, unit);
} catch (IOException e) {
- this.processLogger.error("[JobX]Kill attempt failed.", e);
+ this.processLogger.error("[JobX]softKill failed.pid:" + this.processId);
}
return false;
}
@@ -256,50 +250,43 @@ private boolean softKill(long time, TimeUnit unit) throws InterruptedException {
private void hardKill() {
if ( isRunning() && this.processId != null ) {
try {
- String cmd = "";
+ String killCMD = String.format("/bin/bash +x %s %d %d",Constants.JOBX_KILL_FILE.getAbsolutePath(),1,this.processId);
if (CommonUtils.isUnix()) {
if (isExecAsUser()) {
- cmd = String.format("%s %s %s -9 %d",
- this.runAsUserBinary,
- this.execUser, KILL_COMMAND,
- this.processId);
- } else {
- cmd = String.format("%s -9 %d", KILL_COMMAND, this.processId);
+ killCMD = String.format("%s %s %s",
+ Constants.JOBX_EXECUTE_AS_USER_LIB,
+ this.execUser,
+ killCMD);
}
}else if(CommonUtils.isWindows()) {
- cmd = String.format("cmd.exe /c taskkill /PID %s /F /T ",this.processId) ;
+ killCMD = String.format("cmd.exe /c taskkill /PID %s /F /T ",this.processId) ;
}
- Runtime runtime =Runtime.getRuntime();
- Process process = runtime.exec(cmd);
+ Process process = Runtime.getRuntime().exec(killCMD);
process.waitFor();
process.destroy();
- this.processLogger.error("[JobX]Kill attempt successful.");
+ this.processLogger.error("[JobX]hardKill successful.");
}catch (Exception e) {
- this.processLogger.error("[JobX]Kill attempt failed.", e);
+ this.processLogger.error("[JobX]hardKill failed.", e);
}
this.processId = null;
}
}
- /**
- * Attempt to get the process id for this process
- *
- * @return The id of the process
- */
private Integer getProcessId() {
try {
if (this.process == null) return null;
if (CommonUtils.isUnix()) {
Field field = ReflectUtils.getField(this.process.getClass(), "pid");
- return field.getInt(this.process);
+ if (field!=null) {
+ return field.getInt(this.process);
+ }
}else if(CommonUtils.isWindows()) {
Field field = ReflectUtils.getField(this.process.getClass(), "handle");
- field.setAccessible(true);
- Kernel32 kernel = Kernel32.INSTANCE;
- WinNT.HANDLE handle = new WinNT.HANDLE();
- long handl = field.getLong(this.process);
- handle.setPointer(Pointer.createConstant(handl));
- return kernel.GetProcessId(handle);
+ if (field!=null) {
+ WinNT.HANDLE handle = new WinNT.HANDLE();
+ handle.setPointer(Pointer.createConstant(field.getLong(this.process)));
+ return Kernel32.INSTANCE.GetProcessId(handle);
+ }
}
}catch (Exception e) {
e.printStackTrace();
@@ -358,54 +345,15 @@ private Logger getLogger(String name) {
return logger;
}
- private List getCommandLine(String command) {
- ArrayList commands = new ArrayList();
- int index = 0;
-
- StringBuffer buffer = new StringBuffer(command.length());
-
- boolean isApos = false;
- boolean isQuote = false;
- while (index < command.length()) {
- char c = command.charAt(index);
- switch (c) {
- case ' ':
- if (!isQuote && !isApos) {
- String arg = buffer.toString();
- buffer = new StringBuffer(command.length() - index);
- if (arg.length() > 0) {
- commands.add(arg);
- }
- } else {
- buffer.append(c);
- }
- break;
- case '\'':
- if (!isQuote) {
- isApos = !isApos;
- } else {
- buffer.append(c);
- }
- break;
- case '"':
- if (!isApos) {
- isQuote = !isQuote;
- } else {
- buffer.append(c);
- }
- break;
- default:
- buffer.append(c);
- }
-
- index++;
- }
-
- if (buffer.length() > 0) {
- String arg = buffer.toString();
- commands.add(arg);
+ private File getExecShell(String pid) {
+ if (CommonUtils.isUnix()) {
+ return new File(String.format("%s/.%s.%s",Constants.JOBX_TMP_PATH,pid,"sh"));
}
+ return null;
+ }
- return Arrays.asList(commands.toArray(new String[commands.size()]));
+ private File getLogFile(String pid) {
+ return new File(Constants.JOBX_LOG_PATH + "/." + pid + ".log");
}
+
}
diff --git a/jobx-agent/src/main/java/com/jobxhub/agent/service/AgentService.java b/jobx-agent/src/main/java/com/jobxhub/agent/service/AgentService.java
index fff43746..82627f12 100644
--- a/jobx-agent/src/main/java/com/jobxhub/agent/service/AgentService.java
+++ b/jobx-agent/src/main/java/com/jobxhub/agent/service/AgentService.java
@@ -23,6 +23,7 @@
import com.alibaba.fastjson.JSON;
import com.jobxhub.agent.process.JobXProcess;
import com.jobxhub.common.Constants;
+import com.jobxhub.common.Constants.ExitCode ;
import com.jobxhub.common.api.AgentJob;
import com.jobxhub.common.ext.ExtensionLoader;
import com.jobxhub.common.job.Action;
@@ -91,8 +92,8 @@ public Response handle(Request request) {
return Response.response(request)
.setSuccess(false)
.setResult(result)
- .setExitCode(Constants.StatusCode.ERROR_PASSWORD.getValue())
- .setMessage(Constants.StatusCode.ERROR_PASSWORD.getDescription())
+ .setExitCode(ExitCode.ERROR_PASSWORD.getValue())
+ .setMessage(ExitCode.ERROR_PASSWORD.getDescription())
.end();
}
@@ -143,7 +144,7 @@ public Response ping(Request request) {
return Response.response(request)
.setResult(result)
.setSuccess(true)
- .setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue())
+ .setExitCode(ExitCode.SUCCESS_EXIT.getValue())
.end();
}
@@ -151,14 +152,14 @@ public Response ping(Request request) {
public Response path(Request request) {
//返回密码文件的路径...
return Response.response(request).setSuccess(true)
- .setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue())
+ .setExitCode(ExitCode.SUCCESS_EXIT.getValue())
.setMessage(Constants.JOBX_HOME)
.end();
}
@Override
public Response listPath(Request request) {
- Response response = Response.response(request).setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue());
+ Response response = Response.response(request).setExitCode(ExitCode.SUCCESS_EXIT.getValue());
String path = request.getParams().getString(Constants.PARAM_LISTPATH_PATH_KEY);
if (CommonUtils.isEmpty(path)) return response.setSuccess(false).end();
File file = new File(path);
@@ -191,7 +192,7 @@ public Response monitor(Request request) {
Map map = monitor.toMap();
response.setResult(map)
.setSuccess(true)
- .setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue())
+ .setExitCode(ExitCode.SUCCESS_EXIT.getValue())
.end();
return response;
} catch (SigarException e) {
@@ -224,14 +225,14 @@ public Response execute(final Request request) {
processMap.put(pid,jobXProcess);
try {
- int exitCode = jobXProcess.start();
- response.setExitCode(exitCode);
+ response.setExitCode(jobXProcess.start());
}catch (Exception e) {
response.setExitCode(-1);
}finally {
String message = jobXProcess.getLogMessage();
response.setMessage(message);
response.end();
+ jobXProcess.deleteExecShell();
//todo 得确保server和agent是连接的状态才可以清理log...
jobXProcess.deleteLog();
processMap.remove(pid);
@@ -244,7 +245,7 @@ public Response password(Request request) {
String newPassword = request.getParams().getString(Constants.PARAM_NEWPASSWORD_KEY);
Response response = Response.response(request);
if (isEmpty(newPassword)) {
- return response.setSuccess(false).setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue()).setMessage("密码不能为空").end();
+ return response.setSuccess(false).setExitCode(ExitCode.SUCCESS_EXIT.getValue()).setMessage("密码不能为空").end();
}
//把老的注册删除
@@ -256,7 +257,7 @@ public Response password(Request request) {
//最新密码信息注册进来
register(request.getHost(),request.getPort());
- return response.setSuccess(true).setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue()).end();
+ return response.setSuccess(true).setExitCode(ExitCode.SUCCESS_EXIT.getValue()).end();
}
@Override
@@ -268,13 +269,13 @@ public Response kill(Request request) {
Response response = Response.response(request);
JobXProcess jobXProcess = processMap.get(pid);
if (jobXProcess!=null) {
- jobXProcess.kill();
- response.setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue()).end();
+ jobXProcess.kill(ExitCode.KILL);
+ response.setExitCode(ExitCode.SUCCESS_EXIT.getValue()).end();
if (logger.isInfoEnabled()) {
logger.info("[JobX]:kill successful");
}
}else {
- response.setExitCode(Constants.StatusCode.ERROR_EXIT.getValue()).end();
+ response.setExitCode(ExitCode.ERROR_EXIT.getValue()).end();
if (logger.isInfoEnabled()) {
logger.info("[JobX]:kill error,can not found process");
}
@@ -305,7 +306,7 @@ public Response proxy(Request request) {
} catch (Exception e) {
e.printStackTrace();
response = Response.response(request);
- response.setExitCode(Constants.StatusCode.ERROR_EXIT.getValue())
+ response.setExitCode(ExitCode.ERROR_EXIT.getValue())
.setMessage("[JobX]:proxy error:" + e.getLocalizedMessage())
.setSuccess(false)
.end();
@@ -318,9 +319,9 @@ public Response macId(Request request) {
String guid = getMacId();
Response response = Response.response(request).end();
if (notEmpty(guid)) {
- return response.setMessage(guid).setSuccess(true).setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue());
+ return response.setMessage(guid).setSuccess(true).setExitCode(ExitCode.SUCCESS_EXIT.getValue());
}
- return response.setSuccess(false).setExitCode(Constants.StatusCode.ERROR_EXIT.getValue());
+ return response.setSuccess(false).setExitCode(ExitCode.ERROR_EXIT.getValue());
}
/**
diff --git a/jobx-api/pom.xml b/jobx-api/pom.xml
index 11392d70..a3131851 100644
--- a/jobx-api/pom.xml
+++ b/jobx-api/pom.xml
@@ -5,7 +5,7 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-api
diff --git a/jobx-common/pom.xml b/jobx-common/pom.xml
index ac8497b9..bd83887c 100644
--- a/jobx-common/pom.xml
+++ b/jobx-common/pom.xml
@@ -5,7 +5,7 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-common
diff --git a/jobx-common/src/main/java/com/jobxhub/common/Constants.java b/jobx-common/src/main/java/com/jobxhub/common/Constants.java
index a21ba5ce..abc3d3ec 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/Constants.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/Constants.java
@@ -169,6 +169,8 @@ public class Constants {
public static final String JOBX_LOG_PATH = JOBX_HOME + "/logs";
+ public static final String JOBX_TMP_PATH = JOBX_HOME + "/temp";
+
public static final String JOBX_USER_HOME = SystemPropertyUtils.get("user.home") + File.separator + ".jobx";
public static final File JOBX_UID_FILE = new File(JOBX_USER_HOME,"id");
@@ -184,6 +186,11 @@ public class Constants {
*/
public static final File JOBX_PID_FILE = new File(SystemPropertyUtils.get("jobx.pid", JOBX_HOME + "/jobx.pid"));
+ /**
+ * kill
+ */
+ public static final File JOBX_KILL_FILE = new File(JOBX_HOME + "/bin/kill.sh");
+
/**
* monitor file
*/
@@ -226,7 +233,7 @@ public void setName(String name) {
}
}
- public enum StatusCode implements Serializable {
+ public enum ExitCode implements Serializable {
SUCCESS_EXIT(0x0, "正常退出"),
ERROR_EXIT(0x1, "异常退出"),
ERROR_PING(-0x63, "连接失败,ping不通"),
@@ -240,7 +247,7 @@ public enum StatusCode implements Serializable {
private Integer value;
private String description;
- StatusCode(Integer value, String description) {
+ ExitCode(Integer value, String description) {
this.value = value;
this.description = description;
}
diff --git a/jobx-common/src/main/java/com/jobxhub/common/job/Request.java b/jobx-common/src/main/java/com/jobxhub/common/job/Request.java
index 5df58a4c..644b6c65 100755
--- a/jobx-common/src/main/java/com/jobxhub/common/job/Request.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/job/Request.java
@@ -91,8 +91,8 @@ public Request setAction(Action action) {
}
public Integer getTimeOut() {
- //如果timeOut为设置,则返回24小时(1440分钟)
- if ( timeOut == null || timeOut<=0 ) {
+ //如果timeOut未设置,则返回24小时(1440分钟)
+ if ( timeOut == null || timeOut<0 ) {
return 60 * 24;
}
return timeOut;
diff --git a/jobx-common/src/main/java/com/jobxhub/common/job/Response.java b/jobx-common/src/main/java/com/jobxhub/common/job/Response.java
index c4cf1335..ddaf5f06 100755
--- a/jobx-common/src/main/java/com/jobxhub/common/job/Response.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/job/Response.java
@@ -89,7 +89,7 @@ public int getExitCode() {
public Response setExitCode(int exitCode) {
this.exitCode = exitCode;
- setSuccess(Constants.StatusCode.SUCCESS_EXIT.getValue() == exitCode);
+ setSuccess(Constants.ExitCode.SUCCESS_EXIT.getValue() == exitCode);
return this;
}
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/CommandUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/CommandUtils.java
index c17f9e08..244bc85c 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/CommandUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/CommandUtils.java
@@ -26,8 +26,14 @@
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.*;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
/**
@@ -43,38 +49,11 @@ public abstract class CommandUtils implements Serializable {
private static final long serialVersionUID = 6458428317155311192L;
- public static File createLogFile(String logFileName) {
- String dirPath = IOUtils.getTmpdir();
- File dir = new File(dirPath);
- if (!dir.exists()) dir.mkdirs();
- String tempLogFilePath = dirPath + File.separator + logFileName + ".log";
- File logFile = new File(tempLogFilePath);
- return logFile;
- }
+ private static Logger logger = LoggerFactory.getLogger(CommandUtils.class);
- public static File createAttachmentFile(String fileName, String content) {
- String dirPath = Constants.JOBX_USER_HOME;
- File dir = new File(dirPath);
- if (!dir.exists()) dir.mkdirs();
+ public static String DEFAULT_USER = "root";
- String tempShellFilePath = dirPath + File.separator + fileName + ".txt";
- File attachmentFile = new File(tempShellFilePath);
- try {
- if (attachmentFile.exists()) {
- attachmentFile.delete();
- }
- attachmentFile.createNewFile();
- FileWriter fw = new FileWriter(attachmentFile);
- BufferedWriter out = new BufferedWriter(fw);
- out.write(content, 0, content.length() - 1);
- out.flush();
- out.close();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- return attachmentFile;
- }
- }
+ public static String BASH_SCHEAM = "#!/bin/bash";
public static File createShellFile(String command, String shellFileName) {
String dirPath = IOUtils.getTmpdir();
@@ -119,7 +98,6 @@ public static String executeShell(File shellFile, String... args) {
exec.setExitValues(null);
PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream);
exec.setStreamHandler(streamHandler);
-
exec.execute(commandLine);
info = outputStream.toString().trim();
} catch (Exception e) {
@@ -136,31 +114,192 @@ public static String executeShell(File shellFile, String... args) {
}
- public static String executeScript(String scriptText) {
- String info = null;
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ public static int executeScript(String script) {
+ if (CommonUtils.isEmpty(script)) {
+ throw new IllegalStateException("[JobX] script == null.");
+ }
+ ProcessBuilder builder = new ProcessBuilder(getCommandLine(script));
+ builder.directory(new File(IOUtils.getTmpdir()));
+ builder.redirectErrorStream(true);
+
+ int exitCode = -1;
+ Process process = null;
+ CountDownLatch startupLatch = new CountDownLatch(1);
+ CountDownLatch completeLatch = new CountDownLatch(1);
+
try {
- CommandLine commandLine = CommandLine.parse(scriptText);
- DefaultExecutor exec = new DefaultExecutor();
- exec.setExitValues(null);
- PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream);
- exec.setStreamHandler(streamHandler);
+ process = builder.start();
+ int processId = getPID(process);
+ if (processId == 0) {
+ logger.debug("[JobX]Spawned thread with unknown process id");
+ } else {
+ logger.debug("[JobX]Spawned thread with process id " + processId);
+ }
+ startupLatch.countDown();
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("[JobX]Process interrupted. Exit code is " + exitCode, e);
+ }
- exec.execute(commandLine);
- info = outputStream.toString().trim();
+ completeLatch.countDown();
+
+ String output = new StringBuilder()
+ .append("Stdout:\n")
+ .append(StringUtils.join(IOUtils.readLines(process.getInputStream()),IOUtils.LINE_SEPARATOR_UNIX))
+ .append("\n\n")
+ .append("Stderr:\n")
+ .append(StringUtils.join(IOUtils.readLines(process.getErrorStream()),IOUtils.LINE_SEPARATOR_UNIX))
+ .append("\n")
+ .toString();
+
+ logger.info("[JobX] executeScript,cmd:{},resulr:{}",script,output);
+
+ }catch (Exception e) {
+ logger.error("[JobX] executeScript,error:{}",e.getMessage());
+ }finally {
+ if (process!=null) {
+ IOUtils.closeQuietly(process.getInputStream());
+ IOUtils.closeQuietly(process.getOutputStream());
+ IOUtils.closeQuietly(process.getErrorStream());
+ }
+ return exitCode;
+ }
+ }
+
+ public static int getPID(Process process) {
+ int processId = 0;
+ try {
+ Field field = ReflectUtils.getField(process.getClass(), "pid");
+ processId = field.getInt(process);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ return processId;
+ }
+
+ public static Integer getPIDByPPID(Integer ppid) {
+ if (ppid==null||ppid == 0) return -1;
+ try {
+ String cmd = String.format("ps -ef|awk '{if($3~/%d/) print $2}'",ppid);
+ return CommonUtils.toInt(execute(cmd),0);
} catch (Exception e) {
e.printStackTrace();
- } finally {
+ }
+ return null;
+ }
+
+ public static String execute(String command) {
+ Process process = null;
+ StringBuffer buffer = new StringBuffer();
+ try {
+ process = Runtime.getRuntime().exec(command);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ buffer.append(line).append("\n");
+ }
+ process.waitFor();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if (process != null) {
try {
- outputStream.flush();
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
+ process.getErrorStream().close();
+ process.getInputStream().close();
+ process.getOutputStream().close();
+ } catch (Exception ee) {
}
- return info;
}
+ return buffer.toString();
+ }
+
+ public static List getCommandLine(String command) {
+ ArrayList commands = new ArrayList();
+ int index = 0;
+
+ StringBuffer buffer = new StringBuffer(command.length());
+
+ boolean isApos = false;
+ boolean isQuote = false;
+ while (index < command.length()) {
+ char c = command.charAt(index);
+ switch (c) {
+ case ' ':
+ if (!isQuote && !isApos) {
+ String arg = buffer.toString();
+ buffer = new StringBuffer(command.length() - index);
+ if (arg.length() > 0) {
+ commands.add(arg);
+ }
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '\'':
+ if (!isQuote) {
+ isApos = !isApos;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ case '"':
+ if (!isApos) {
+ isQuote = !isQuote;
+ } else {
+ buffer.append(c);
+ }
+ break;
+ default:
+ buffer.append(c);
+ }
+
+ index++;
+ }
+
+ if (buffer.length() > 0) {
+ String arg = buffer.toString();
+ commands.add(arg);
+ }
+
+ return commands;
}
+
+ public static void write(File shellFile, String command) {
+ try {
+ if (!shellFile.exists()) {
+ PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(shellFile)));
+ out.write(BASH_SCHEAM);
+ out.write("\n");
+ out.write(command);
+ out.flush();
+ out.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ }
+ }
+
+ public static int chown(boolean r,String user,String group,File file) throws IOException, InterruptedException {
+ return runAsExecUser(DEFAULT_USER,String.format("chown %s %s:%s %s",(r?"-R":""),user,group,file.getAbsolutePath()));
+ }
+
+ private static int runAsExecUser(final String execUser,final String command) throws IOException, InterruptedException {
+ String execCmd = Constants.JOBX_EXECUTE_AS_USER_LIB
+ .concat(IOUtils.BLANK_CHAR)
+ .concat(execUser)
+ .concat(IOUtils.BLANK_CHAR)
+ .concat(command);
+ final Process process = Runtime.getRuntime().exec(execCmd);
+ return process.waitFor();
+ }
+
+
}
+
+
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/CommonUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/CommonUtils.java
index 234944ca..f6e32700 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/CommonUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/CommonUtils.java
@@ -21,7 +21,6 @@
package com.jobxhub.common.util;
-import com.jobxhub.common.Constants;
import com.jobxhub.common.util.collection.HashMap;
import java.io.File;
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/HttpClientUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/HttpClientUtils.java
index 1fa54a90..df310f51 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/HttpClientUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/HttpClientUtils.java
@@ -206,7 +206,7 @@ private static String getResult(HttpRequestBase request) {
CloseableHttpClient httpClient = getHttpClient();
try {
CloseableHttpResponse response = httpClient.execute(request);
- // response.getStatusLine().getStatusCode();
+ // response.getStatusLine().getExitCode();
HttpEntity entity = response.getEntity();
if (entity != null) {
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/IOUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/IOUtils.java
index 63f6d863..86061caa 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/IOUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/IOUtils.java
@@ -65,6 +65,7 @@ public abstract class IOUtils implements Serializable {
private static final int BUFFER_SIZE = 1024 * 8;
+ public static final String BLANK_CHAR = " ";
public static String readText(File file, String charset) {
InputStream inputStream = null;
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/ReflectUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/ReflectUtils.java
index c6c26939..fa679cc3 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/ReflectUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/ReflectUtils.java
@@ -1622,4 +1622,12 @@ public static boolean isPrototype(Class clazz) {
return clazz.getClassLoader() == null;
}
+ public static Object getFieldValue(Object obj, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ Field field = getField(obj.getClass(),fieldName);
+ if (field!=null) {
+ return field.get(obj);
+ }
+ return null;
+ }
+
}
diff --git a/jobx-common/src/main/java/com/jobxhub/common/util/StringUtils.java b/jobx-common/src/main/java/com/jobxhub/common/util/StringUtils.java
index da1c8c21..f3538ca9 100644
--- a/jobx-common/src/main/java/com/jobxhub/common/util/StringUtils.java
+++ b/jobx-common/src/main/java/com/jobxhub/common/util/StringUtils.java
@@ -354,6 +354,17 @@ public static String htmlEncode(String source) {
}
+ public static String htmlDecode(String html) {
+ if (html!=null) {
+ return html.replaceAll("<","<")
+ .replaceAll(">",">")
+ .replaceAll("&","&")
+ .replaceAll(""","\"");
+
+ }
+ return html;
+ }
+
/**
* 取字符串的前toCount个字符
*
@@ -992,5 +1003,6 @@ public static void main(String[] args) {
System.out.println(camelToSplitName("getName","_"));
}
+
}
diff --git a/jobx-examples/pom.xml b/jobx-examples/pom.xml
index c49cfa70..9679154b 100755
--- a/jobx-examples/pom.xml
+++ b/jobx-examples/pom.xml
@@ -3,7 +3,7 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-examples
diff --git a/jobx-executor/pom.xml b/jobx-executor/pom.xml
deleted file mode 100644
index c136e5ca..00000000
--- a/jobx-executor/pom.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-
-
- jobx-executor
- com.jobxhub
- 1.2.0-RELEASE
- 4.0.0
- so
- jobx-executor
-
-
- ${project.basedir}
-
-
-
-
-
- org.codehaus.mojo
- native-maven-plugin
- 1.0-alpha-9
- true
-
- linux
-
-
- src/main/c
-
- executor.c
-
-
-
- generic-classic
- g++
-
-
-
-
-
\ No newline at end of file
diff --git a/jobx-registry/pom.xml b/jobx-registry/pom.xml
index 93d48cba..81072675 100644
--- a/jobx-registry/pom.xml
+++ b/jobx-registry/pom.xml
@@ -5,7 +5,7 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-registry
diff --git a/jobx-registry/src/test/java/com/jobxhub/registry/RegistryTest.java b/jobx-registry/src/test/java/com/jobxhub/registry/RegistryTest.java
index 56c8b8f3..fb1f5489 100644
--- a/jobx-registry/src/test/java/com/jobxhub/registry/RegistryTest.java
+++ b/jobx-registry/src/test/java/com/jobxhub/registry/RegistryTest.java
@@ -36,7 +36,6 @@ public void delete() throws IOException {
@Test
public void lister() throws IOException {
-
zookeeperClient.addChildListener("/jobx/agent",new ChildListener(){
@Override
public void childChanged(String path, List children) {
diff --git a/jobx-rpc/jobx-rpc-api/pom.xml b/jobx-rpc/jobx-rpc-api/pom.xml
new file mode 100644
index 00000000..1d9d397b
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-api/pom.xml
@@ -0,0 +1,24 @@
+
+
+
+ jobx-rpc
+ com.jobxhub
+ ${revision}
+
+ 4.0.0
+ jar
+ jobx-rpc-api
+
+
+
+ com.jobxhub
+ jobx-common
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
\ No newline at end of file
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/Client.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Client.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/Client.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Client.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/InvokeCallback.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/InvokeCallback.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/InvokeCallback.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/InvokeCallback.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/Invoker.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Invoker.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/Invoker.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Invoker.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/RpcFuture.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/RpcFuture.java
similarity index 97%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/RpcFuture.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/RpcFuture.java
index df018091..53931aee 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/RpcFuture.java
+++ b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/RpcFuture.java
@@ -31,7 +31,9 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -146,7 +148,7 @@ public void caught(Throwable throwable) {
this.response.setThrowable(throwable);
this.response.setStartTime(this.startTime);
this.response.setSuccess(false);
- this.response.setExitCode(Constants.StatusCode.ERROR_EXEC.getValue());
+ this.response.setExitCode(Constants.ExitCode.ERROR_EXEC.getValue());
invokeCallback();
} finally {
lock.unlock();
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/Server.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Server.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/Server.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/Server.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/ServerHandler.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/ServerHandler.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/ServerHandler.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/ServerHandler.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/support/ChannelWrapper.java b/jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/support/ChannelWrapper.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/support/ChannelWrapper.java
rename to jobx-rpc/jobx-rpc-api/src/main/java/com.jobxhub.rpc/support/ChannelWrapper.java
diff --git a/jobx-rpc/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Client b/jobx-rpc/jobx-rpc-api/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Client
similarity index 100%
rename from jobx-rpc/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Client
rename to jobx-rpc/jobx-rpc-api/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Client
diff --git a/jobx-rpc/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Server b/jobx-rpc/jobx-rpc-api/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Server
similarity index 100%
rename from jobx-rpc/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Server
rename to jobx-rpc/jobx-rpc-api/src/main/resources/META-INF/jobx/com.jobxhub.rpc.Server
diff --git a/jobx-rpc/jobx-rpc-mina/pom.xml b/jobx-rpc/jobx-rpc-mina/pom.xml
new file mode 100644
index 00000000..733befdf
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-mina/pom.xml
@@ -0,0 +1,24 @@
+
+
+
+ jobx-rpc
+ com.jobxhub
+ ${revision}
+
+ 4.0.0
+ jar
+ jobx-rpc-mina
+
+
+ com.jobxhub
+ jobx-rpc-api
+
+
+
+ org.apache.mina
+ mina-core
+
+
+
\ No newline at end of file
diff --git a/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClient.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClient.java
new file mode 100644
index 00000000..5138b078
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClient.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (c) 2015 The JobX Project
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.jobxhub.rpc.mina;
+
+import com.jobxhub.common.job.Request;
+import com.jobxhub.common.job.Response;
+import com.jobxhub.common.logging.LoggerFactory;
+import com.jobxhub.rpc.InvokeCallback;
+import com.jobxhub.rpc.RpcFuture;
+import com.jobxhub.rpc.mina.support.AbstractClient;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.slf4j.Logger;
+
+public class MinaClient extends AbstractClient {
+
+ private static Logger logger = LoggerFactory.getLogger(MinaClient.class);
+
+ @Override
+ public void connect(Request request) {
+ if (connector == null) {
+ connector = new NioSocketConnector();
+ connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(Request.class, Response.class)));
+ connector.setHandler(new MinaClientHandler(this));
+ connector.setConnectTimeoutMillis(5000);
+ DefaultSocketSessionConfig sessionConfiguration = (DefaultSocketSessionConfig) connector.getSessionConfig();
+ sessionConfiguration.setTcpNoDelay(true);
+ sessionConfiguration.setKeepAlive(true);
+ sessionConfiguration.setWriteTimeout(5);
+ }
+ }
+
+ @Override
+ public Response sentSync(final Request request) throws Exception {
+ final ConnectFuture connect = super.getConnect(request);
+ if (connect != null && connect.isConnected()) {
+ RpcFuture rpcFuture = new RpcFuture(request);
+ //写数据
+// connect.addListener(new AbstractClient.FutureListener(rpcFuture));
+ IoSession session = connect.getSession();
+ session.write(request);
+ return rpcFuture.get();
+ } else {
+ throw new IllegalArgumentException("[JobX] MinaRPC channel not active. request id:" + request.getId());
+ }
+ }
+
+ @Override
+ public void sentAsync(final Request request, final InvokeCallback callback) throws Exception {
+ final ConnectFuture connect = super.getConnect(request);
+ if (connect != null && connect.isConnected()) {
+ RpcFuture rpcFuture = new RpcFuture(request, callback);
+// connect.addListener(new AbstractClient.FutureListener(rpcFuture));
+ connect.getSession().write(request);
+ } else {
+ throw new IllegalArgumentException("[JobX] MinaRPC invokeAsync channel not active. request id:" + request.getId());
+ }
+ }
+
+ @Override
+ public void sentOneWay(final Request request) throws Exception {
+ ConnectFuture connect = super.getConnect(request);
+ if (connect != null && connect.isConnected()) {
+ RpcFuture rpcFuture = new RpcFuture(request);
+// connect.addListener(new AbstractClient.FutureListener(rpcFuture));
+ connect.getSession().write(request);
+ } else {
+ throw new IllegalArgumentException("[JobX] MinaRPC channel not active. request id:" + request.getId());
+ }
+ }
+
+
+}
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java
rename to jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java
index af490221..71d2b814 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaClientHandler.java
@@ -20,11 +20,11 @@
*/
package com.jobxhub.rpc.mina;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
import com.jobxhub.common.job.Response;
import com.jobxhub.common.logging.LoggerFactory;
import com.jobxhub.rpc.RpcFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
/**
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java
rename to jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java
index 93a59cee..317a5e08 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaCodecAdapter.java
@@ -20,12 +20,12 @@
*/
package com.jobxhub.rpc.mina;
-import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.*;
import com.jobxhub.common.Constants;
import com.jobxhub.common.ext.ExtensionLoader;
import com.jobxhub.common.serialize.Serializer;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaConnectWrapper.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaConnectWrapper.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaConnectWrapper.java
rename to jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaConnectWrapper.java
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServer.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServer.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServer.java
rename to jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServer.java
index 159c5b6d..3125dd63 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServer.java
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServer.java
@@ -21,13 +21,13 @@
package com.jobxhub.rpc.mina;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.jobxhub.common.job.Request;
import com.jobxhub.common.job.Response;
import com.jobxhub.rpc.Server;
import com.jobxhub.rpc.ServerHandler;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java
rename to jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java
index 61d2eb20..d770183d 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/MinaServerHandler.java
@@ -20,12 +20,12 @@
*/
package com.jobxhub.rpc.mina;
-import org.apache.mina.core.service.IoHandlerAdapter;
-import org.apache.mina.core.session.IoSession;
import com.jobxhub.common.job.Request;
import com.jobxhub.common.job.Response;
import com.jobxhub.common.logging.LoggerFactory;
import com.jobxhub.rpc.ServerHandler;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
public class MinaServerHandler extends IoHandlerAdapter {
diff --git a/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/support/AbstractClient.java b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/support/AbstractClient.java
new file mode 100644
index 00000000..12b9cec8
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-mina/src/main/java/com/jobxhub/rpc/mina/support/AbstractClient.java
@@ -0,0 +1,117 @@
+/**
+ * Copyright (c) 2015 The JobX Project
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.jobxhub.rpc.mina.support;
+
+import com.jobxhub.common.Constants;
+import com.jobxhub.common.job.Request;
+import com.jobxhub.common.util.HttpUtils;
+import com.jobxhub.common.util.collection.HashMap;
+import com.jobxhub.rpc.Client;
+import com.jobxhub.rpc.RpcFuture;
+import com.jobxhub.rpc.mina.MinaConnectWrapper;
+import com.jobxhub.rpc.support.ChannelWrapper;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author benjobs
+ */
+public abstract class AbstractClient implements Client {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ protected NioSocketConnector connector;
+
+ private final Lock connectLock = new ReentrantLock();
+
+ protected final ConcurrentHashMap channelTable = new ConcurrentHashMap();
+
+ public final Map futureTable = new HashMap(256);
+
+ public ConnectFuture getConnect(Request request) {
+ connectLock.lock();
+ try {
+ MinaConnectWrapper minaConnectWrapper = (MinaConnectWrapper) this.channelTable.get(request.getAddress());
+ if (minaConnectWrapper != null && minaConnectWrapper.isActive()) {
+ return minaConnectWrapper.getConnectFuture();
+ }
+ this.doConnect(request);
+ ConnectFuture connectFuture = connector.connect(HttpUtils.parseSocketAddress(request.getAddress()));
+ minaConnectWrapper = new MinaConnectWrapper(connectFuture);
+ if (connectFuture.awaitUninterruptibly(Constants.RPC_TIMEOUT)) {
+ if (minaConnectWrapper.isActive()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("[JobX] MinaRPC getConnect: connect remote host[{}] success, {}", request.getAddress(), connectFuture.toString());
+ }
+ this.channelTable.put(request.getAddress(), minaConnectWrapper);
+ return connectFuture;
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("[JobX] MinaRPC getConnect: connect remote host[" + request.getAddress() + "] failed, " + connectFuture.toString(), connectFuture.getException());
+ }
+ }
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("[JobX] MinaRPC getConnect: connect remote host[{}] timeout {}ms, {}", request.getAddress(), Constants.RPC_TIMEOUT, connectFuture);
+ }
+ }
+ } finally {
+ connectLock.unlock();
+ }
+ return null;
+ }
+
+
+ private void doConnect(Request request) {
+// if (this.bootstrap == null) {
+// this.connect(request);
+// }
+ }
+
+ @Override
+ public abstract void connect(Request request);
+
+ @Override
+ public void disconnect() throws Throwable {
+ connectLock.lock();
+ try {
+ for (Map.Entry entry : channelTable.entrySet()) {
+ ChannelWrapper channelWrapper = entry.getValue();
+ if (channelWrapper != null) {
+ channelWrapper.close();
+ }
+ }
+ } finally {
+ connectLock.unlock();
+ }
+ }
+
+ public RpcFuture getRpcFuture(Long id) {
+ return this.futureTable.get(id);
+ }
+}
diff --git a/jobx-rpc/jobx-rpc-netty4/pom.xml b/jobx-rpc/jobx-rpc-netty4/pom.xml
new file mode 100644
index 00000000..147324cd
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-netty4/pom.xml
@@ -0,0 +1,23 @@
+
+
+
+ jobx-rpc
+ com.jobxhub
+ ${revision}
+
+ 4.0.0
+ jobx-rpc-netty4
+ jar
+
+
+ com.jobxhub
+ jobx-rpc-api
+
+
+ io.netty
+ netty-all
+
+
+
\ No newline at end of file
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyChannelWrapper.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyChannelWrapper.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyChannelWrapper.java
rename to jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyChannelWrapper.java
diff --git a/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClient.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClient.java
new file mode 100755
index 00000000..8e8d4c94
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClient.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright (c) 2015 The JobX Project
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.jobxhub.rpc.netty;
+
+import com.jobxhub.common.Constants;
+import com.jobxhub.common.job.Request;
+import com.jobxhub.common.job.Response;
+import com.jobxhub.rpc.InvokeCallback;
+import com.jobxhub.rpc.RpcFuture;
+import com.jobxhub.rpc.netty.support.AbstractClient;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @author B e n
+ * @version 1.0
+ * @date 2016-03-27
+ */
+
+public class NettyClient extends AbstractClient {
+
+ private static final NioEventLoopGroup NIO_EVENT_LOOP_GROUP = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
+
+ @Override
+ public void connect(final Request request) {
+ int timeout = 3000;
+ this.bootstrap = new Bootstrap().group(NIO_EVENT_LOOP_GROUP)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout < 3000 ? 3000 : timeout)
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline().addLast(
+ NettyCodecAdapter.getCodecAdapter().getDecoder(Response.class),
+ NettyCodecAdapter.getCodecAdapter().getEncoder(Request.class),
+ new NettyClientHandler(NettyClient.this, request)
+ );
+ }
+ });
+ }
+
+ @Override
+ public Response sentSync(Request request) throws TimeoutException {
+ Channel channel = getChannel(request);
+ if (channel != null && channel.isActive()) {
+ RpcFuture rpcFuture = new RpcFuture(request);
+ channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
+ return rpcFuture.get();
+ } else {
+ throw new IllegalArgumentException("[JobX] NettyRPC invokeSync channel not active. request id:" + request.getId());
+ }
+ }
+
+ @Override
+ public void sentAsync(Request request, final InvokeCallback callback) throws Exception {
+ Channel channel = getChannel(request);
+ if (channel != null && channel.isActive()) {
+ RpcFuture rpcFuture = new RpcFuture(request, callback);
+ channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
+ } else {
+ throw new IllegalArgumentException("[JobX] NettyRPC invokeAsync channel not active. request id:" + request.getId());
+ }
+ }
+
+ @Override
+ public void sentOneWay(Request request) throws Exception {
+ Channel channel = getChannel(request);
+ if (channel != null && channel.isActive()) {
+ RpcFuture rpcFuture = new RpcFuture(request);
+ channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
+ } else {
+ throw new IllegalArgumentException("[JobX] NettyRPC invokeAsync invokeOneway channel not active. request id:" + request.getId());
+ }
+ }
+
+}
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java
rename to jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java
index c069c5ac..1084f699 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyClientHandler.java
@@ -20,12 +20,12 @@
*/
package com.jobxhub.rpc.netty;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
import com.jobxhub.common.Constants;
import com.jobxhub.common.job.*;
import com.jobxhub.common.logging.LoggerFactory;
import com.jobxhub.common.util.CommonUtils;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import java.io.RandomAccessFile;
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java
similarity index 100%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java
rename to jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java
index e68662ca..ecab7805 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyCodecAdapter.java
@@ -20,13 +20,13 @@
*/
package com.jobxhub.rpc.netty;
+import com.jobxhub.common.Constants;
+import com.jobxhub.common.ext.ExtensionLoader;
+import com.jobxhub.common.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
-import com.jobxhub.common.Constants;
-import com.jobxhub.common.ext.ExtensionLoader;
-import com.jobxhub.common.serialize.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServer.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServer.java
similarity index 96%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServer.java
rename to jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServer.java
index ed3c1e36..fc3617fb 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServer.java
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServer.java
@@ -21,6 +21,12 @@
package com.jobxhub.rpc.netty;
+import com.jobxhub.common.Constants;
+import com.jobxhub.common.job.Request;
+import com.jobxhub.common.job.Response;
+import com.jobxhub.common.logging.LoggerFactory;
+import com.jobxhub.rpc.Server;
+import com.jobxhub.rpc.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
@@ -30,15 +36,12 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
-import com.jobxhub.common.Constants;
-import com.jobxhub.common.job.Request;
-import com.jobxhub.common.job.Response;
-import com.jobxhub.common.logging.LoggerFactory;
-import com.jobxhub.rpc.ServerHandler;
-import com.jobxhub.rpc.Server;
import org.slf4j.Logger;
-import java.util.concurrent.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.jobxhub.common.util.ExceptionUtils.stackTrace;
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java
similarity index 97%
rename from jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java
rename to jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java
index e461d39b..63bec82a 100644
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/NettyServerHandler.java
@@ -20,13 +20,13 @@
*/
package com.jobxhub.rpc.netty;
-import com.jobxhub.common.exception.RpcException;
-import io.netty.channel.*;
import com.jobxhub.common.Constants;
+import com.jobxhub.common.exception.RpcException;
import com.jobxhub.common.job.*;
import com.jobxhub.common.logging.LoggerFactory;
import com.jobxhub.common.util.IOUtils;
import com.jobxhub.rpc.ServerHandler;
+import io.netty.channel.*;
import org.slf4j.Logger;
import java.io.File;
@@ -87,7 +87,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
return;
}
- Response response = Response.response(request).setExitCode(Constants.StatusCode.SUCCESS_EXIT.getValue()).setSuccess(true);
+ Response response = Response.response(request).setExitCode(Constants.ExitCode.SUCCESS_EXIT.getValue()).setSuccess(true);
final RequestFile requestFile = request.getUploadFile();
@@ -99,7 +99,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!savePath.exists()) {
ResponseFile responseFile = new ResponseFile(start, requestFile.getFileMD5());
responseFile.setEnd(true);
- response.setExitCode(Constants.StatusCode.NOTFOUND.getValue()).setSuccess(false).setUploadFile(responseFile).end();
+ response.setExitCode(Constants.ExitCode.NOTFOUND.getValue()).setSuccess(false).setUploadFile(responseFile).end();
handlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
diff --git a/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/support/AbstractClient.java b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/support/AbstractClient.java
new file mode 100644
index 00000000..57c474cc
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-netty4/src/main/java/com/jobxhub/rpc/netty/support/AbstractClient.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright (c) 2015 The JobX Project
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.jobxhub.rpc.netty.support;
+
+import com.jobxhub.common.Constants;
+import com.jobxhub.common.job.Request;
+import com.jobxhub.common.util.HttpUtils;
+import com.jobxhub.common.util.collection.HashMap;
+import com.jobxhub.rpc.Client;
+import com.jobxhub.rpc.RpcFuture;
+import com.jobxhub.rpc.netty.NettyChannelWrapper;
+import com.jobxhub.rpc.support.ChannelWrapper;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * @author benjobs
+ */
+public abstract class AbstractClient implements Client {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ protected Bootstrap bootstrap;
+
+ private final Lock connectLock = new ReentrantLock();
+
+ protected final ConcurrentHashMap channelTable = new ConcurrentHashMap();
+
+ public final Map futureTable = new HashMap(256);
+
+ public Channel getChannel(Request request) {
+ connectLock.lock();
+ try {
+ NettyChannelWrapper nettyChannelWrapper = (NettyChannelWrapper) this.channelTable.get(request.getAddress());
+ if (nettyChannelWrapper != null && nettyChannelWrapper.isActive()) {
+ return nettyChannelWrapper.getChannel();
+ }
+ // 发起异步连接操作
+ this.doConnect(request);
+ ChannelFuture channelFuture = this.bootstrap.connect(HttpUtils.parseSocketAddress(request.getAddress()));
+ nettyChannelWrapper = new NettyChannelWrapper(channelFuture);
+ if (channelFuture.awaitUninterruptibly(Constants.RPC_TIMEOUT)) {
+ if (nettyChannelWrapper.isActive()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("[JobX] NettyRPC getChannel: connect remote host[{}] success, {}", request.getAddress(), channelFuture.toString());
+ }
+ this.channelTable.put(request.getAddress(), nettyChannelWrapper);
+ return nettyChannelWrapper.getChannel();
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("[JobX] NettyRPC getChannel: connect remote host[" + request.getAddress() + "] failed, " + channelFuture.toString(), channelFuture.cause());
+ }
+ }
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("[JobX] NettyRPC getChannel: connect remote host[{}] timeout {}ms, {}", request.getAddress(), Constants.RPC_TIMEOUT, channelFuture);
+ }
+ }
+ } finally {
+ connectLock.unlock();
+ }
+ return null;
+ }
+
+ private void doConnect(Request request) {
+ if (this.bootstrap == null) {
+ this.connect(request);
+ }
+ }
+
+ @Override
+ public abstract void connect(Request request);
+
+ @Override
+ public void disconnect() throws Throwable {
+ connectLock.lock();
+ try {
+ for (Map.Entry entry : channelTable.entrySet()) {
+ ChannelWrapper channelWrapper = entry.getValue();
+ if (channelWrapper != null) {
+ channelWrapper.close();
+ }
+ }
+ } finally {
+ connectLock.unlock();
+ }
+ }
+
+ public RpcFuture getRpcFuture(Long id) {
+ return this.futureTable.get(id);
+ }
+
+ public class FutureListener implements ChannelFutureListener {
+
+ private RpcFuture rpcFuture;
+
+ public FutureListener(RpcFuture rpcFuture) {
+ if (rpcFuture != null) {
+ this.rpcFuture = rpcFuture;
+ futureTable.put(rpcFuture.getFutureId(), rpcFuture);
+ }
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("[JobX] NettyRPC sent success, request id:{}", rpcFuture.getRequest().getId());
+ }
+ return;
+ } else {
+ if (logger.isInfoEnabled()) {
+ logger.info("[JobX] NettyRPC sent failure, request id:{}", rpcFuture.getRequest().getId());
+ }
+ if (this.rpcFuture != null) {
+ rpcFuture.caught(future.cause());
+ }
+ }
+ futureTable.remove(rpcFuture.getFutureId());
+ }
+ }
+}
diff --git a/jobx-rpc/jobx-rpc-netty4/src/test/java/NettyFileTest.java b/jobx-rpc/jobx-rpc-netty4/src/test/java/NettyFileTest.java
new file mode 100644
index 00000000..ed934ee4
--- /dev/null
+++ b/jobx-rpc/jobx-rpc-netty4/src/test/java/NettyFileTest.java
@@ -0,0 +1,37 @@
+//import com.jobxhub.common.ext.ExtensionLoader;
+//import com.jobxhub.common.job.*;
+//import com.jobxhub.common.util.IdGenerator;
+//import com.jobxhub.rpc.Client;
+//import com.jobxhub.rpc.Server;
+//import org.junit.Test;
+//
+//import java.io.File;
+//
+//public class NettyFileTest {
+//
+//
+// @Test
+// public void server() {
+// Server server = ExtensionLoader.load(Server.class);
+// server.start(8089, null);
+// }
+//
+// @Test
+// public void client() throws Exception {
+// Client client = ExtensionLoader.load(Client.class);
+// Request request = new Request();
+// request.setId(IdGenerator.getId());
+// request.setAction(Action.UPLOAD);
+// request.setRpcType(RpcType.SYNC);
+// request.setHost("127.0.0.1");
+// request.setPort(8089);
+// File file = new File("/Users/benjobs/movie/盗梦空间.mkv");
+// RequestFile requestFile = new RequestFile(file);
+// requestFile.setSavePath("/Users/benjobs/Desktop");
+// request.setUploadFile(requestFile);
+// Response response = client.sentSync(request);
+// System.out.println(response.getAction());
+//
+// }
+//
+//}
diff --git a/jobx-rpc/src/test/java/SPITest.java b/jobx-rpc/jobx-rpc-netty4/src/test/java/SPITest.java
similarity index 100%
rename from jobx-rpc/src/test/java/SPITest.java
rename to jobx-rpc/jobx-rpc-netty4/src/test/java/SPITest.java
index f958e61c..a073c812 100644
--- a/jobx-rpc/src/test/java/SPITest.java
+++ b/jobx-rpc/jobx-rpc-netty4/src/test/java/SPITest.java
@@ -1,5 +1,5 @@
-import org.junit.Test;
import com.jobxhub.common.util.SystemPropertyUtils;
+import org.junit.Test;
import java.io.IOException;
diff --git a/jobx-rpc/pom.xml b/jobx-rpc/pom.xml
index 17bded85..d9c9aa61 100644
--- a/jobx-rpc/pom.xml
+++ b/jobx-rpc/pom.xml
@@ -5,45 +5,14 @@
jobxcom.jobxhub
- 1.2.0-RELEASE
+ ${revision}4.0.0jobx-rpc
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
- 7
- 7
-
-
-
-
-
-
-
-
- com.jobxhub
- jobx-common
-
-
-
- io.netty
- netty-all
-
-
-
- org.apache.mina
- mina-core
-
-
-
- org.apache.zookeeper
- zookeeper
-
-
-
-
+ pom
+
+ jobx-rpc-netty4
+ jobx-rpc-mina
+ jobx-rpc-api
+
\ No newline at end of file
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClient.java b/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClient.java
deleted file mode 100644
index 86a121e5..00000000
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/mina/MinaClient.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Copyright (c) 2015 The JobX Project
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.jobxhub.rpc.mina;
-
-import org.apache.mina.core.future.ConnectFuture;
-import org.apache.mina.core.session.IoSession;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
-import org.apache.mina.transport.socket.nio.NioSocketConnector;
-import com.jobxhub.common.job.Request;
-import com.jobxhub.common.job.Response;
-import com.jobxhub.common.logging.LoggerFactory;
-import com.jobxhub.rpc.InvokeCallback;
-import com.jobxhub.rpc.RpcFuture;
-import com.jobxhub.rpc.support.AbstractClient;
-import org.slf4j.Logger;
-
-public class MinaClient extends AbstractClient {
-
- private static Logger logger = LoggerFactory.getLogger(MinaClient.class);
-
- @Override
- public void connect(Request request) {
- if (connector == null) {
- connector = new NioSocketConnector();
- connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(Request.class, Response.class)));
- connector.setHandler(new MinaClientHandler(this));
- connector.setConnectTimeoutMillis(5000);
- DefaultSocketSessionConfig sessionConfiguration = (DefaultSocketSessionConfig) connector.getSessionConfig();
- sessionConfiguration.setTcpNoDelay(true);
- sessionConfiguration.setKeepAlive(true);
- sessionConfiguration.setWriteTimeout(5);
- }
- }
- @Override
- public Response sentSync(final Request request) throws Exception {
- final ConnectFuture connect = super.getConnect(request);
- if (connect != null && connect.isConnected()) {
- RpcFuture rpcFuture = new RpcFuture(request);
- //写数据
- connect.addListener(new AbstractClient.FutureListener(rpcFuture));
- IoSession session = connect.getSession();
- session.write(request);
- return rpcFuture.get();
- } else {
- throw new IllegalArgumentException("[JobX] MinaRPC channel not active. request id:" + request.getId());
- }
- }
-
- @Override
- public void sentAsync(final Request request, final InvokeCallback callback) throws Exception {
- final ConnectFuture connect = super.getConnect(request);
- if (connect != null && connect.isConnected()) {
- RpcFuture rpcFuture = new RpcFuture(request,callback);
- connect.addListener(new AbstractClient.FutureListener(rpcFuture));
- connect.getSession().write(request);
- } else {
- throw new IllegalArgumentException("[JobX] MinaRPC invokeAsync channel not active. request id:" + request.getId());
- }
- }
-
- @Override
- public void sentOneWay(final Request request) throws Exception {
- ConnectFuture connect = super.getConnect(request);
- if (connect != null && connect.isConnected()) {
- RpcFuture rpcFuture = new RpcFuture(request);
- connect.addListener(new AbstractClient.FutureListener(rpcFuture));
- connect.getSession().write(request);
- } else {
- throw new IllegalArgumentException("[JobX] MinaRPC channel not active. request id:" + request.getId());
- }
- }
-
-
-}
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClient.java b/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClient.java
deleted file mode 100755
index b90e870f..00000000
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/netty/NettyClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Copyright (c) 2015 The JobX Project
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.jobxhub.rpc.netty;
-
-import com.jobxhub.common.Constants;
-import com.jobxhub.common.job.Request;
-import com.jobxhub.common.job.Response;
-import com.jobxhub.rpc.InvokeCallback;
-import com.jobxhub.rpc.RpcFuture;
-import com.jobxhub.rpc.support.AbstractClient;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * @author B e n
- * @version 1.0
- * @date 2016-03-27
- */
-
-public class NettyClient extends AbstractClient {
-
- private static final NioEventLoopGroup NIO_EVENT_LOOP_GROUP = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
-
- @Override
- public void connect(final Request request) {
- int timeout = 3000;
- this.bootstrap = new Bootstrap().group(NIO_EVENT_LOOP_GROUP)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout < 3000 ? 3000 : timeout)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline().addLast(
- NettyCodecAdapter.getCodecAdapter().getDecoder(Response.class),
- NettyCodecAdapter.getCodecAdapter().getEncoder(Request.class),
- new NettyClientHandler(NettyClient.this, request)
- );
- }
- });
- }
-
- @Override
- public Response sentSync(Request request) throws TimeoutException {
- Channel channel = getChannel(request);
- if (channel != null && channel.isActive()) {
- RpcFuture rpcFuture = new RpcFuture(request);
- channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
- return rpcFuture.get();
- } else {
- throw new IllegalArgumentException("[JobX] NettyRPC invokeSync channel not active. request id:" + request.getId());
- }
- }
-
- @Override
- public void sentAsync(Request request, final InvokeCallback callback) throws Exception {
- Channel channel = getChannel(request);
- if (channel != null && channel.isActive()) {
- RpcFuture rpcFuture = new RpcFuture(request, callback);
- channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
- } else {
- throw new IllegalArgumentException("[JobX] NettyRPC invokeAsync channel not active. request id:" + request.getId());
- }
- }
-
- @Override
- public void sentOneWay(Request request) throws Exception {
- Channel channel = getChannel(request);
- if (channel != null && channel.isActive()) {
- RpcFuture rpcFuture = new RpcFuture(request);
- channel.writeAndFlush(request).addListener(new FutureListener(rpcFuture));
- } else {
- throw new IllegalArgumentException("[JobX] NettyRPC invokeAsync invokeOneway channel not active. request id:" + request.getId());
- }
- }
-
-}
diff --git a/jobx-rpc/src/main/java/com/jobxhub/rpc/support/AbstractClient.java b/jobx-rpc/src/main/java/com/jobxhub/rpc/support/AbstractClient.java
deleted file mode 100644
index 1f48d89f..00000000
--- a/jobx-rpc/src/main/java/com/jobxhub/rpc/support/AbstractClient.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Copyright (c) 2015 The JobX Project
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *