diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ChannelizerV4.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ChannelizerV4.java new file mode 100644 index 00000000000..24230958cb5 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ChannelizerV4.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelHandler; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; + +import java.util.List; + +/** + + */ +public interface ChannelizerV4 extends Channelizer { + + public void init(final ServiceContext serviceContext, final List services); +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinBootstrapper.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinBootstrapper.java new file mode 100644 index 00000000000..488bf779ffb --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinBootstrapper.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.EventLoopGroup; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.op.OpLoader; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1; +import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV2; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +public class GremlinBootstrapper { + + private static boolean started = false; + private static boolean disposed = false; + + private static final Logger logger = LoggerFactory.getLogger(GremlinBootstrapper.class); + + // todo: services should not be stored here + private static ExecutorService gremlinExecutorService; + private static ServerGremlinExecutor serverGremlinExecutor; + private static GraphManager graphManager; + private static final Map> serializers = new HashMap<>(); + + //// AbstractChannelizer copy/paste + private static final List DEFAULT_SERIALIZERS = Arrays.asList( + new Settings.SerializerSettings(GraphSONMessageSerializerV2.class.getName(), Collections.emptyMap()), + new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), Collections.emptyMap()), + new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), new HashMap() {{ + put(GraphBinaryMessageSerializerV1.TOKEN_SERIALIZE_RESULT_TO_STRING, true); + }}) + ); + + public static void init(final ServiceContext serviceContext) { + if (started) return; + + final EventLoopGroup workerGroup = (EventLoopGroup)serviceContext.getService("workerGroup"); + + // use the ExecutorService returned from ServerGremlinExecutor as it might be initialized there + serverGremlinExecutor = (ServerGremlinExecutor)serviceContext.getOrAddService( + ServerGremlinExecutor.class.getName(), + () -> new ServerGremlinExecutor(serviceContext.getSettings(), gremlinExecutorService, workerGroup)); + + // todo: use serviceContext.getOrAddService + gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService(); + graphManager = serverGremlinExecutor.getGraphManager(); + + // initialize the OpLoader with configurations being passed to each OpProcessor implementation loaded + OpLoader.init(serviceContext.getSettings()); + + configureSerializers(serviceContext); + + serviceContext.addService("serializers", serializers); + + started = true; + } + + private static void configureSerializers(final ServiceContext serviceContext) { + graphManager = serverGremlinExecutor.getGraphManager(); + + // grab some sensible defaults if no serializers are present in the config + final List serializerSettings = + (null == serviceContext.getSettings().serializers || serviceContext.getSettings().serializers.isEmpty()) + ? DEFAULT_SERIALIZERS + : serviceContext.getSettings().serializers; + + serializerSettings.stream().map(config -> { + try { + final Class clazz = Class.forName(config.className); + if (!MessageSerializer.class.isAssignableFrom(clazz)) { + logger.warn("The {} serialization class does not implement {} - it will not be available.", config.className, MessageSerializer.class.getCanonicalName()); + return Optional.empty(); + } + + if (clazz.getAnnotation(Deprecated.class) != null) + logger.warn("The {} serialization class is deprecated.", config.className); + + final MessageSerializer serializer = (MessageSerializer) clazz.newInstance(); + final Map graphsDefinedAtStartup = new HashMap<>(); + for (String graphName : serviceContext.getSettings().graphs.keySet()) { + graphsDefinedAtStartup.put(graphName, graphManager.getGraph(graphName)); + } + + if (config.config != null) + serializer.configure(config.config, graphsDefinedAtStartup); + + return Optional.ofNullable(serializer); + } catch (ClassNotFoundException cnfe) { + logger.warn("Could not find configured serializer class - {} - it will not be available", config.className); + return Optional.empty(); + } catch (Exception ex) { + logger.warn("Could not instantiate configured serializer class - {} - it will not be available. {}", config.className, ex.getMessage()); + return Optional.empty(); + } + }).filter(Optional::isPresent).map(Optional::get).flatMap(serializer -> + Stream.of(serializer.mimeTypesSupported()).map(mimeType -> Pair.with(mimeType, serializer)) + ).forEach(pair -> { + final String mimeType = pair.getValue0(); + final MessageSerializer serializer = pair.getValue1(); + if (serializers.containsKey(mimeType)) + logger.info("{} already has {} configured - it will not be replaced by {}, change order of serialization configuration if this is not desired.", + mimeType, serializers.get(mimeType).getClass().getName(), serializer.getClass().getName()); + else { + logger.info("Configured {} with {}", mimeType, pair.getValue1().getClass().getName()); + serializers.put(mimeType, serializer); + } + }); + + if (serializers.size() == 0) { + logger.error("No serializers were successfully configured - server will not start."); + throw new RuntimeException("Serialization configuration error."); + } + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinChannelizerV4.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinChannelizerV4.java new file mode 100644 index 00000000000..f1353b2c657 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinChannelizerV4.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; +import org.apache.tinkerpop.gremlin.server.handler.GremlinResponseFrameEncoder; +import org.apache.tinkerpop.gremlin.server.handler.RoutingHandler; +import org.apache.tinkerpop.gremlin.server.handler.WsGremlinBinaryRequestDecoder; +import org.apache.tinkerpop.gremlin.server.handler.WsGremlinCloseRequestDecoder; +import org.apache.tinkerpop.gremlin.server.handler.WsGremlinResponseFrameEncoder; +import org.apache.tinkerpop.gremlin.server.handler.WsGremlinTextRequestDecoder; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + + */ +public class GremlinChannelizerV4 extends ChannelInitializer implements ChannelizerV4 { + + // todo: move out? + public static final String PIPELINE_REQUEST_HANDLER = "request-handler"; + public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder"; + public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator"; + public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler"; + protected static final String PIPELINE_HTTP_REQUEST_DECODER = "http-request-decoder"; + + private static final Logger logger = LoggerFactory.getLogger(GremlinChannelizerV4.class); + + protected ServiceContext serviceContext; + protected RoutingHandler routingHandler; + + @Override + public void init(final ServiceContext serviceContext, final List services) { + this.serviceContext = serviceContext; + routingHandler = new RoutingHandler(services); + } + + @Override + public void initChannel(final SocketChannel ch) throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + + if (supportsIdleMonitor()) { + final int idleConnectionTimeout = (int) (serviceContext.getSettings().idleConnectionTimeout / 1000); + final int keepAliveInterval = (int) (serviceContext.getSettings().keepAliveInterval / 1000); + pipeline.addLast(new IdleStateHandler(idleConnectionTimeout, keepAliveInterval, 0)); + } + + // configureHttpPipeline(pipeline); + + initWs(); + configureWsPipeline(pipeline); + } + + /// ----------- WS setup + private GremlinResponseFrameEncoder gremlinResponseFrameEncoder; + private WsGremlinTextRequestDecoder wsGremlinTextRequestDecoder; + private WsGremlinBinaryRequestDecoder wsGremlinBinaryRequestDecoder; + private WsGremlinResponseFrameEncoder wsGremlinResponseFrameEncoder; + private WsGremlinCloseRequestDecoder wsGremlinCloseRequestDecoder; + + public void initWs() { + // all encoders should be wrapped as services + gremlinResponseFrameEncoder = new GremlinResponseFrameEncoder(); + wsGremlinTextRequestDecoder = new WsGremlinTextRequestDecoder(serviceContext); + wsGremlinBinaryRequestDecoder = new WsGremlinBinaryRequestDecoder(serviceContext); + wsGremlinCloseRequestDecoder = new WsGremlinCloseRequestDecoder(serviceContext); + wsGremlinResponseFrameEncoder = new WsGremlinResponseFrameEncoder(); + } + + public void configureWsPipeline(final ChannelPipeline pipeline) { + final Settings settings = serviceContext.getSettings(); + + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG)); + + pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder()); + + logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}", + settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize); + pipeline.addLast(PIPELINE_HTTP_REQUEST_DECODER, new HttpRequestDecoder(settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize)); + + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("log-decoder-aggregator", LogLevel.DEBUG)); + + logger.debug("HttpObjectAggregator settings - maxContentLength={}, maxAccumulationBufferComponents={}", + settings.maxContentLength, settings.maxAccumulationBufferComponents); + final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength); + aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents); + pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator); + // Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692 + pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler()); + + // setting closeOnProtocolViolation to false prevents causing all the other requests using the same channel + // to fail when a single request causes a protocol violation. + // todo: collect all ws services path + final WebSocketDecoderConfig wsDecoderConfig = WebSocketDecoderConfig.newBuilder(). + closeOnProtocolViolation(false).allowExtensions(true).maxFramePayloadLength(settings.maxContentLength).build(); + pipeline.addLast(PIPELINE_REQUEST_HANDLER, new WebSocketServerProtocolHandler("/gremlin", + null, false, false, 10000L, wsDecoderConfig)); + + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG)); + + pipeline.addLast("ws-frame-encoder", wsGremlinResponseFrameEncoder); + pipeline.addLast("response-frame-encoder", gremlinResponseFrameEncoder); + pipeline.addLast("request-text-decoder", wsGremlinTextRequestDecoder); + pipeline.addLast("request-binary-decoder", wsGremlinBinaryRequestDecoder); + pipeline.addLast("request-close-decoder", wsGremlinCloseRequestDecoder); + + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG)); + + pipeline.addLast("http-routing-handler", routingHandler); + } + + public void configureHttpPipeline(final ChannelPipeline pipeline) { + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG)); + + pipeline.addLast("http-server", new HttpServerCodec()); + + if (logger.isDebugEnabled()) + pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG)); + + final HttpObjectAggregator aggregator = new HttpObjectAggregator(serviceContext.getSettings().maxContentLength); + aggregator.setMaxCumulationBufferComponents(serviceContext.getSettings().maxAccumulationBufferComponents); + pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator); + + pipeline.addLast("http-routing-handler", routingHandler); + } + + @Override + public void init(ServerGremlinExecutor serverGremlinExecutor) { + // remove + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinHttpService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinHttpService.java new file mode 100644 index 00000000000..669ed90d689 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinHttpService.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; +import org.apache.tinkerpop.gremlin.server.op.OpLoader; +import org.apache.tinkerpop.gremlin.server.util.LifeCycleHook; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class GremlinHttpService implements GremlinService { + + private static final Logger logger = LoggerFactory.getLogger(GremlinHttpService.class); + private ExecutorService gremlinExecutorService; + private ServerGremlinExecutor serverGremlinExecutor; + private ServiceContext serviceContext; + private GraphManager graphManager; + private GremlinExecutor gremlinExecutor; + private Map> serializers = new HashMap<>(); + private HttpGremlinEndpointHandler httpGremlinEndpointHandler; + + private List protocols = Arrays.asList(HttpMethod.GET, HttpMethod.POST, HttpMethod.HEAD); + private static String endpoint = "/text"; + + // better to pass all parameters only with serviceContext + @Override + public void init(final ServiceContext serviceContext) { + this.serviceContext = serviceContext; + + GremlinBootstrapper.init(serviceContext); + + serverGremlinExecutor = (ServerGremlinExecutor)serviceContext.getService(ServerGremlinExecutor.class.getName()); + + // todo: use serviceContext.getOrAddService + gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService(); + graphManager = serverGremlinExecutor.getGraphManager(); + gremlinExecutor = serverGremlinExecutor.getGremlinExecutor(); + + serializers = (Map>)serviceContext.getService("serializers"); + + httpGremlinEndpointHandler = buildHandler(); + } + + @Override + public void start() { + // fire off any lifecycle scripts that were provided by the user. hooks get initialized during + // ServerGremlinExecutor initialization + serverGremlinExecutor.getHooks().forEach(hook -> { + logger.info("Executing start up {}", LifeCycleHook.class.getSimpleName()); + try { + hook.onStartUp(new LifeCycleHook.Context(logger)); + } catch (UnsupportedOperationException uoe) { + // if the user doesn't implement onStartUp the scriptengine will throw + // this exception. it can safely be ignored. + } + }); + } + + @Override + public boolean canHandle(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof FullHttpRequest) { + final FullHttpRequest httpRequest = (FullHttpRequest) msg; + + return protocols.contains(httpRequest.method()) && httpRequest.uri().startsWith(endpoint); + } + return false; + } + + @Override + public boolean handle(final ChannelHandlerContext ctx, final Object msg) { + // hack only for PoC + httpGremlinEndpointHandler.channelRead(ctx, msg); + return true; + } + + // todo: move code to GremlinBootstraper.shutDown + @Override + public void shutDown() { + // release resources in the OpProcessors (e.g. kill sessions) + OpLoader.getProcessors().entrySet().forEach(kv -> { + logger.info("Shutting down OpProcessor[{}]", kv.getKey()); + try { + kv.getValue().close(); + } catch (Exception ex) { + logger.warn("Shutdown will continue but, there was an error encountered while closing " + kv.getKey(), ex); + } + }); + + try { + if (gremlinExecutorService != null) gremlinExecutorService.shutdown(); + } finally { + logger.debug("Shutdown Gremlin thread pool."); + } + } + + @Override + public void tryReleaseResources() { + if (serverGremlinExecutor != null) { + serverGremlinExecutor.getHooks().forEach(hook -> { + logger.info("Executing shutdown {}", LifeCycleHook.class.getSimpleName()); + try { + hook.onShutDown(new LifeCycleHook.Context(logger)); + } catch (UnsupportedOperationException | UndeclaredThrowableException uoe) { + // if the user doesn't implement onShutDown the scriptengine will throw + // this exception. it can safely be ignored. + } + }); + } + + try { + if (gremlinExecutorService != null) { + if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process"); + } + } + } catch (InterruptedException ie) { + logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process."); + } + + // close TraversalSource and Graph instances - there aren't guarantees that closing Graph will close all + // spawned TraversalSource instances so both should be closed directly and independently. + if (serverGremlinExecutor != null) { + final Set traversalSourceNames = serverGremlinExecutor.getGraphManager().getTraversalSourceNames(); + traversalSourceNames.forEach(traversalSourceName -> { + logger.debug("Closing GraphTraversalSource instance [{}]", traversalSourceName); + try { + serverGremlinExecutor.getGraphManager().getTraversalSource(traversalSourceName).close(); + } catch (Exception ex) { + logger.warn(String.format("Exception while closing GraphTraversalSource instance [%s]", traversalSourceName), ex); + } finally { + logger.info("Closed GraphTraversalSource instance [{}]", traversalSourceName); + } + + try { + serverGremlinExecutor.getGraphManager().removeTraversalSource(traversalSourceName); + } catch (Exception ex) { + logger.warn(String.format("Exception while removing GraphTraversalSource instance [%s] from GraphManager", traversalSourceName), ex); + } + }); + + final Set graphNames = serverGremlinExecutor.getGraphManager().getGraphNames(); + graphNames.forEach(gName -> { + logger.debug("Closing Graph instance [{}]", gName); + try { + final Graph graph = serverGremlinExecutor.getGraphManager().getGraph(gName); + graph.close(); + } catch (Exception ex) { + logger.warn(String.format("Exception while closing Graph instance [%s]", gName), ex); + } finally { + logger.info("Closed Graph instance [{}]", gName); + } + + try { + serverGremlinExecutor.getGraphManager().removeGraph(gName); + } catch (Exception ex) { + logger.warn(String.format("Exception while removing Graph instance [%s] from GraphManager", gName), ex); + } + }); + } + } + + private HttpGremlinEndpointHandler buildHandler() { + return new HttpGremlinEndpointHandler(serializers, gremlinExecutor, graphManager, serviceContext.getSettings()); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServerV4.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServerV4.java new file mode 100644 index 00000000000..181e4722ff3 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinServerV4.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; +import org.apache.commons.lang3.SystemUtils; +import org.apache.tinkerpop.gremlin.server.util.MetricManager; +import org.apache.tinkerpop.gremlin.server.util.ThreadFactoryUtil; +import org.apache.tinkerpop.gremlin.util.Gremlin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Start and stop Gremlin Server. + */ +public class GremlinServerV4 { + + static { + // hook slf4j up to netty internal logging + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); + } + + private static final String SERVER_THREAD_PREFIX = "gremlin-server-"; + public static final String AUDIT_LOGGER_NAME = "audit.org.apache.tinkerpop.gremlin.server"; + + private static final Logger logger = LoggerFactory.getLogger(GremlinServerV4.class); + private ServiceContext serviceContext; + private Channel ch; + + private CompletableFuture serverStopped = null; + private CompletableFuture serverStarted = null; + + private final EventLoopGroup bossGroup; + private final EventLoopGroup workerGroup; + + private final boolean isEpollEnabled; + + private ChannelizerV4 channelizer; + + // for PoC only. Should be build from config file + private GremlinService gremlinHttpService = new GremlinHttpService(); + private GremlinService gremlinWsService = new GremlinWsService(); + private StatusHttpService statusHttpService = new StatusHttpService(); + + /** + * Construct a Gremlin Server instance from {@link Settings}. + */ + public GremlinServerV4(final Settings settings) { + settings.optionalMetrics().ifPresent(GremlinServerV4::configureMetrics); + + provideDefaultForGremlinPoolSize(settings); + this.isEpollEnabled = settings.useEpollEventLoop && SystemUtils.IS_OS_LINUX; + if (settings.useEpollEventLoop && !SystemUtils.IS_OS_LINUX){ + logger.warn("cannot use epoll in non-linux env, falling back to NIO"); + } + + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop().join(), SERVER_THREAD_PREFIX + "shutdown")); + + final ThreadFactory threadFactoryBoss = ThreadFactoryUtil.create("boss-%d"); + // if linux os use epoll else fallback to nio based eventloop + // epoll helps in reducing GC and has better performance + // http://netty.io/wiki/native-transports.html + if (isEpollEnabled){ + bossGroup = new EpollEventLoopGroup(settings.threadPoolBoss, threadFactoryBoss); + } else { + bossGroup = new NioEventLoopGroup(settings.threadPoolBoss, threadFactoryBoss); + } + + final ThreadFactory threadFactoryWorker = ThreadFactoryUtil.create("worker-%d"); + if (isEpollEnabled) { + workerGroup = new EpollEventLoopGroup(settings.threadPoolWorker, threadFactoryWorker); + } else { + workerGroup = new NioEventLoopGroup(settings.threadPoolWorker, threadFactoryWorker); + } + + channelizer = new GremlinChannelizerV4(); + + buildServiceContext(settings); + + gremlinHttpService.init(serviceContext); + gremlinWsService.init(serviceContext); + statusHttpService.init(serviceContext); + } + + // override if you need to add something + protected void buildServiceContext(final Settings settings) { + serviceContext = new ServiceContext(settings); + + serviceContext.addService("workerGroup", workerGroup); + serviceContext.addService("channelizer", channelizer); + } + + /** + * Start Gremlin Server with {@link Settings} provided to the constructor. + */ + public synchronized CompletableFuture start() throws Exception { + if (serverStarted != null) { + // server already started - don't get it rolling again + return serverStarted; + } + + final Settings settings = serviceContext.getSettings(); + serverStarted = new CompletableFuture<>(); + final CompletableFuture serverReadyFuture = serverStarted; + try { + final ServerBootstrap b = new ServerBootstrap(); + + // when high value is reached then the channel becomes non-writable and stays like that until the + // low value is so that there is time to recover + b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(settings.writeBufferLowWaterMark, settings.writeBufferHighWaterMark)); + b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + gremlinHttpService.start(); + gremlinWsService.start(); + statusHttpService.start(); + + channelizer.init(serviceContext, Arrays.asList(gremlinHttpService, gremlinWsService, statusHttpService)); + + b.group(bossGroup, workerGroup).childHandler(channelizer); + if (isEpollEnabled) { + b.channel(EpollServerSocketChannel.class); + } else { + b.channel(NioServerSocketChannel.class); + } + + // bind to host/port and wait for channel to be ready + b.bind(serviceContext.getSettings().host, serviceContext.getSettings().port).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture channelFuture) throws Exception { + if (channelFuture.isSuccess()) { + ch = channelFuture.channel(); + + logger.info("Gremlin Server configured with worker thread pool of {}, gremlin pool of {} and boss thread pool of {}.", + settings.threadPoolWorker, settings.gremlinPool, settings.threadPoolBoss); + logger.info("Channel started at port {}.", settings.port); + + serverReadyFuture.complete(null); + } else { + serverReadyFuture.completeExceptionally(new IOException( + String.format("Could not bind to %s and %s - perhaps something else is bound to that address.", settings.host, settings.port))); + } + } + }); + } catch (Exception ex) { + logger.error("Gremlin Server Error", ex); + serverReadyFuture.completeExceptionally(ex); + } + + return serverStarted; + } + + /** + * Stop Gremlin Server and free the port binding. Note that multiple calls to this method will return the + * same instance of the {@code CompletableFuture}. + */ + public synchronized CompletableFuture stop() { + if (serverStopped != null) { + // shutdown has started so don't fire it off again + return serverStopped; + } + + serverStopped = new CompletableFuture<>(); + final CountDownLatch servicesLeftToShutdown = new CountDownLatch(3); + + // it's possible that a channel might not be initialized in the first place if bind() fails because + // of port conflict. in that case, there's no need to wait for the channel to close. + if (null == ch) + servicesLeftToShutdown.countDown(); + else + ch.close().addListener(f -> servicesLeftToShutdown.countDown()); + + logger.info("Shutting down thread pools."); + + try { + workerGroup.shutdownGracefully().addListener((GenericFutureListener) f -> servicesLeftToShutdown.countDown()); + } finally { + logger.debug("Shutdown Worker thread pool."); + } + try { + bossGroup.shutdownGracefully().addListener((GenericFutureListener) f -> servicesLeftToShutdown.countDown()); + } finally { + logger.debug("Shutdown Boss thread pool."); + } + + gremlinHttpService.shutDown(); + gremlinWsService.shutDown(); + statusHttpService.shutDown(); + + // channel is shutdown as are the thread pools - time to kill graphs as nothing else should be acting on them + new Thread(() -> { + + gremlinHttpService.tryReleaseResources(); + gremlinWsService.tryReleaseResources(); + statusHttpService.tryReleaseResources(); + + try { + servicesLeftToShutdown.await(30000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + logger.warn("Timeout waiting for boss/worker thread pools to shutdown - continuing with shutdown process."); + } + + // kills reporter threads. this is a last bit of cleanup that can be done. typically, the jvm is headed + // for shutdown which would obviously kill the reporters, but when it isn't they just keep reporting. + // removing them all will silent them up and release the appropriate resources. + MetricManager.INSTANCE.removeAllReporters(); + + // removing all the metrics should allow Gremlin Server to clean up the metrics instance so that it can be + // started again in the same JVM without those metrics initialized which generates a warning and won't + // reset to start values + MetricManager.INSTANCE.removeAllMetrics(); + + logger.info("Gremlin Server - shutdown complete"); + serverStopped.complete(null); + }, SERVER_THREAD_PREFIX + "stop").start(); + + return serverStopped; + } + + public ChannelHandler getChannelizer() { + return channelizer; + } + + public static void main(final String[] args) throws Exception { + // add to vm options: -Dlogback.configurationFile=file:conf/logback.xml + printHeader(); + final String file; + if (args.length > 0) + file = args[0]; + else + file = "conf/gremlin-server.yaml"; + + final Settings settings; + try { + settings = Settings.read(file); + } catch (Exception ex) { + logger.error("Configuration file at {} could not be found or parsed properly. [{}]", file, ex.getMessage()); + return; + } + + logger.info("Configuring Gremlin Server from {}", file); + final GremlinServerV4 server = new GremlinServerV4(settings); + server.start().exceptionally(t -> { + logger.error("Gremlin Server was unable to start and will now begin shutdown: {}", t.getMessage()); + server.stop().join(); + return null; + }).join(); + } + + public static String getHeader() { + final StringBuilder builder = new StringBuilder(); + builder.append(Gremlin.version() + "\r\n"); + builder.append(" \\,,,/\r\n"); + builder.append(" (o o)\r\n"); + builder.append("-----oOOo-(3)-oOOo-----\r\n"); + return builder.toString(); + } + + private static void configureMetrics(final Settings.ServerMetrics settings) { + final MetricManager metrics = MetricManager.INSTANCE; + settings.optionalConsoleReporter().ifPresent(config -> { + if (config.enabled) metrics.addConsoleReporter(config.interval); + }); + + settings.optionalCsvReporter().ifPresent(config -> { + if (config.enabled) metrics.addCsvReporter(config.interval, config.fileName); + }); + + settings.optionalJmxReporter().ifPresent(config -> { + if (config.enabled) metrics.addJmxReporter(config.domain, config.agentId); + }); + + settings.optionalSlf4jReporter().ifPresent(config -> { + if (config.enabled) metrics.addSlf4jReporter(config.interval, config.loggerName); + }); + + settings.optionalGangliaReporter().ifPresent(config -> { + if (config.enabled) { + try { + metrics.addGangliaReporter(config.host, config.port, + config.addressingMode, config.ttl, config.protocol31, config.hostUUID, config.spoof, config.interval); + } catch (IOException ioe) { + logger.warn("Error configuring the Ganglia Reporter.", ioe); + } + } + }); + + settings.optionalGraphiteReporter().ifPresent(config -> { + if (config.enabled) metrics.addGraphiteReporter(config.host, config.port, config.prefix, config.interval); + }); + } + + private static void printHeader() { + logger.info(getHeader()); + } + + private static void provideDefaultForGremlinPoolSize(final Settings settings) { + if (settings.gremlinPool == 0) + settings.gremlinPool = Runtime.getRuntime().availableProcessors(); + } + + @Override + public String toString() { + return "GremlinServer " + serviceContext.getSettings().host + ":" + serviceContext.getSettings().port; + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinService.java new file mode 100644 index 00000000000..d81e75ae095 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinService.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelHandlerContext; + +public interface GremlinService { + + public void init(final ServiceContext serviceContext); + public void start(); + public boolean canHandle(final ChannelHandlerContext ctx, final Object msg); + // return true when request is finished + // todo: replace ChannelHandlerContext with abstraction + public boolean handle(final ChannelHandlerContext ctx, final Object msg) throws Exception; + public void shutDown(); + public void tryReleaseResources(); +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinWsService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinWsService.java new file mode 100644 index 00000000000..940e8657c05 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/GremlinWsService.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; +import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler; +import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler; +import org.apache.tinkerpop.gremlin.server.op.OpLoader; +import org.apache.tinkerpop.gremlin.server.util.LifeCycleHook; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.util.MessageSerializer; +import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1; +import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV2; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class GremlinWsService implements GremlinService { + + private static final Logger logger = LoggerFactory.getLogger(GremlinWsService.class); + private ExecutorService gremlinExecutorService; + private ServerGremlinExecutor serverGremlinExecutor; + private ServiceContext serviceContext; + private GraphManager graphManager; + private GremlinExecutor gremlinExecutor; + private ScheduledExecutorService scheduledExecutorService; + private final Map> serializers = new HashMap<>(); + private OpSelectorHandler opSelectorHandler; + private OpExecutorHandler opExecutorHandler; + + private List protocols = Arrays.asList(); + private static String endpoint = "/gremlin"; + + @Override + public void init(final ServiceContext serviceContext) { + this.serviceContext = serviceContext; + + GremlinBootstrapper.init(serviceContext); + + final EventLoopGroup workerGroup = (EventLoopGroup)serviceContext.getService("workerGroup"); + + // use the ExecutorService returned from ServerGremlinExecutor as it might be initialized there + serverGremlinExecutor = (ServerGremlinExecutor)serviceContext.getOrAddService( + ServerGremlinExecutor.class.getName(), + () -> new ServerGremlinExecutor(serviceContext.getSettings(), gremlinExecutorService, workerGroup)); + + // todo: use serviceContext.getOrAddService + gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService(); + graphManager = serverGremlinExecutor.getGraphManager(); + gremlinExecutor = serverGremlinExecutor.getGremlinExecutor(); + scheduledExecutorService = serverGremlinExecutor.getScheduledExecutorService(); + + // initialize the OpLoader with configurations being passed to each OpProcessor implementation loaded + OpLoader.init(serviceContext.getSettings()); + + configureSerializers(); + + opSelectorHandler = new OpSelectorHandler(serviceContext.getSettings(), graphManager, gremlinExecutor, + scheduledExecutorService, (ChannelizerV4)serviceContext.getService("channelizer")); + opExecutorHandler = new OpExecutorHandler(serviceContext.getSettings(), graphManager, gremlinExecutor, scheduledExecutorService); + } + + @Override + public void start() { + // fire off any lifecycle scripts that were provided by the user. hooks get initialized during + // ServerGremlinExecutor initialization + serverGremlinExecutor.getHooks().forEach(hook -> { + logger.info("Executing start up {}", LifeCycleHook.class.getSimpleName()); + try { + hook.onStartUp(new LifeCycleHook.Context(logger)); + } catch (UnsupportedOperationException uoe) { + // if the user doesn't implement onStartUp the scriptengine will throw + // this exception. it can safely be ignored. + } + }); + } + + @Override + public boolean canHandle(final ChannelHandlerContext ctx, final Object msg) { + // RequestMessage + return msg instanceof RequestMessage; + } + + @Override + public boolean handle(final ChannelHandlerContext ctx, final Object msg) throws Exception { + // hack only for PoC + final Pair> selectOp = opSelectorHandler.selectOp(ctx, (RequestMessage)msg); + opExecutorHandler.handle(ctx, selectOp); + return true; + } + + @Override + public void shutDown() { + // release resources in the OpProcessors (e.g. kill sessions) + OpLoader.getProcessors().entrySet().forEach(kv -> { + logger.info("Shutting down OpProcessor[{}]", kv.getKey()); + try { + kv.getValue().close(); + } catch (Exception ex) { + logger.warn("Shutdown will continue but, there was an error encountered while closing " + kv.getKey(), ex); + } + }); + + try { + if (gremlinExecutorService != null) gremlinExecutorService.shutdown(); + } finally { + logger.debug("Shutdown Gremlin thread pool."); + } + } + + @Override + public void tryReleaseResources() { + if (serverGremlinExecutor != null) { + serverGremlinExecutor.getHooks().forEach(hook -> { + logger.info("Executing shutdown {}", LifeCycleHook.class.getSimpleName()); + try { + hook.onShutDown(new LifeCycleHook.Context(logger)); + } catch (UnsupportedOperationException | UndeclaredThrowableException uoe) { + // if the user doesn't implement onShutDown the scriptengine will throw + // this exception. it can safely be ignored. + } + }); + } + + try { + if (gremlinExecutorService != null) { + if (!gremlinExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + logger.warn("Gremlin thread pool did not fully terminate - continuing with shutdown process"); + } + } + } catch (InterruptedException ie) { + logger.warn("Timeout waiting for Gremlin thread pool to shutdown - continuing with shutdown process."); + } + + // close TraversalSource and Graph instances - there aren't guarantees that closing Graph will close all + // spawned TraversalSource instances so both should be closed directly and independently. + if (serverGremlinExecutor != null) { + final Set traversalSourceNames = serverGremlinExecutor.getGraphManager().getTraversalSourceNames(); + traversalSourceNames.forEach(traversalSourceName -> { + logger.debug("Closing GraphTraversalSource instance [{}]", traversalSourceName); + try { + serverGremlinExecutor.getGraphManager().getTraversalSource(traversalSourceName).close(); + } catch (Exception ex) { + logger.warn(String.format("Exception while closing GraphTraversalSource instance [%s]", traversalSourceName), ex); + } finally { + logger.info("Closed GraphTraversalSource instance [{}]", traversalSourceName); + } + + try { + serverGremlinExecutor.getGraphManager().removeTraversalSource(traversalSourceName); + } catch (Exception ex) { + logger.warn(String.format("Exception while removing GraphTraversalSource instance [%s] from GraphManager", traversalSourceName), ex); + } + }); + + final Set graphNames = serverGremlinExecutor.getGraphManager().getGraphNames(); + graphNames.forEach(gName -> { + logger.debug("Closing Graph instance [{}]", gName); + try { + final Graph graph = serverGremlinExecutor.getGraphManager().getGraph(gName); + graph.close(); + } catch (Exception ex) { + logger.warn(String.format("Exception while closing Graph instance [%s]", gName), ex); + } finally { + logger.info("Closed Graph instance [{}]", gName); + } + + try { + serverGremlinExecutor.getGraphManager().removeGraph(gName); + } catch (Exception ex) { + logger.warn(String.format("Exception while removing Graph instance [%s] from GraphManager", gName), ex); + } + }); + } + } + + //// AbstractChannelizer copy/paste + protected static final List DEFAULT_SERIALIZERS = Arrays.asList( + new Settings.SerializerSettings(GraphSONMessageSerializerV2.class.getName(), Collections.emptyMap()), + new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), Collections.emptyMap()), + new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), new HashMap(){{ + put(GraphBinaryMessageSerializerV1.TOKEN_SERIALIZE_RESULT_TO_STRING, true); + }}) + ); + + private void configureSerializers() { + // grab some sensible defaults if no serializers are present in the config + final List serializerSettings = + (null == serviceContext.getSettings().serializers || serviceContext.getSettings().serializers.isEmpty()) + ? DEFAULT_SERIALIZERS + : serviceContext.getSettings().serializers; + + serializerSettings.stream().map(config -> { + try { + final Class clazz = Class.forName(config.className); + if (!MessageSerializer.class.isAssignableFrom(clazz)) { + logger.warn("The {} serialization class does not implement {} - it will not be available.", config.className, MessageSerializer.class.getCanonicalName()); + return Optional.empty(); + } + + if (clazz.getAnnotation(Deprecated.class) != null) + logger.warn("The {} serialization class is deprecated.", config.className); + + final MessageSerializer serializer = (MessageSerializer) clazz.newInstance(); + final Map graphsDefinedAtStartup = new HashMap<>(); + for (String graphName : serviceContext.getSettings().graphs.keySet()) { + graphsDefinedAtStartup.put(graphName, graphManager.getGraph(graphName)); + } + + if (config.config != null) + serializer.configure(config.config, graphsDefinedAtStartup); + + return Optional.ofNullable(serializer); + } catch (ClassNotFoundException cnfe) { + logger.warn("Could not find configured serializer class - {} - it will not be available", config.className); + return Optional.empty(); + } catch (Exception ex) { + logger.warn("Could not instantiate configured serializer class - {} - it will not be available. {}", config.className, ex.getMessage()); + return Optional.empty(); + } + }).filter(Optional::isPresent).map(Optional::get).flatMap(serializer -> + Stream.of(serializer.mimeTypesSupported()).map(mimeType -> Pair.with(mimeType, serializer)) + ).forEach(pair -> { + final String mimeType = pair.getValue0(); + final MessageSerializer serializer = pair.getValue1(); + if (serializers.containsKey(mimeType)) + logger.info("{} already has {} configured - it will not be replaced by {}, change order of serialization configuration if this is not desired.", + mimeType, serializers.get(mimeType).getClass().getName(), serializer.getClass().getName()); + else { + logger.info("Configured {} with {}", mimeType, pair.getValue1().getClass().getName()); + serializers.put(mimeType, serializer); + } + }); + + if (serializers.size() == 0) { + logger.error("No serializers were successfully configured - server will not start."); + throw new RuntimeException("Serialization configuration error."); + } + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ServiceContext.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ServiceContext.java new file mode 100644 index 00000000000..fc0cc20fbf7 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ServiceContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class ServiceContext { + private final Settings settings; + private final Map serviceRegistry = new HashMap<>(); + + public ServiceContext(Settings settings) { + this.settings = settings; + } + + public Settings getSettings() { + return settings; + } + + public Object getOrAddService(final String serviceName, final Supplier supplier) { + if(!serviceRegistry.containsKey(serviceName)) { + serviceRegistry.put(serviceName, supplier.get()); + } + return serviceRegistry.get(serviceName); + } + + public Object getService(final String serviceName) { + if(!serviceRegistry.containsKey(serviceName)) { + throw new RuntimeException("todo"); + } + return serviceRegistry.get(serviceName); + } + + public void addService(final String serviceName, Object service) { + serviceRegistry.put(serviceName, service); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/StatusHttpService.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/StatusHttpService.java new file mode 100644 index 00000000000..88d2942fdeb --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/StatusHttpService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpUtil; +import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +public class StatusHttpService implements GremlinService { + + private static final Logger logger = LoggerFactory.getLogger(StatusHttpService.class); + private ServiceContext serviceContext; + + private List protocols = Arrays.asList(HttpMethod.GET); + private static String endpoint = "/status"; + + @Override + public void init(final ServiceContext serviceContext) { + this.serviceContext = serviceContext; + } + + @Override + public void start() { + + } + + @Override + public boolean canHandle(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof FullHttpRequest) { + final FullHttpRequest httpRequest = (FullHttpRequest) msg; + + return protocols.contains(httpRequest.method()) && httpRequest.uri().startsWith(endpoint); + } + return false; + } + + @Override + public boolean handle(final ChannelHandlerContext ctx, final Object msg) { + final FullHttpRequest req = (FullHttpRequest) msg; + final boolean keepAlive = HttpUtil.isKeepAlive(req); + + final ByteBuf buf = ctx.alloc().buffer(); + buf.writeBytes("alive TinkerPop 3.8?".getBytes()); + + final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf); + response.headers().set(CONTENT_TYPE, "text/html; charset=utf-8"); + + HttpHandlerUtil.sendAndCleanupConnection(ctx, keepAlive, response); + + return true; + } + + @Override + public void shutDown() { + + } + + @Override + public void tryReleaseResources() { + + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index 1ce6b0054bd..653c36e76d4 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -188,9 +188,9 @@ static void sendError(final ChannelHandlerContext ctx, final HttpResponseStatus sendAndCleanupConnection(ctx, keepAlive, response); } - static void sendAndCleanupConnection(final ChannelHandlerContext ctx, - final boolean keepAlive, - final FullHttpResponse response) { + public static void sendAndCleanupConnection(final ChannelHandlerContext ctx, + final boolean keepAlive, + final FullHttpResponse response) { HttpUtil.setKeepAlive(response, keepAlive); HttpUtil.setContentLength(response, response.content().readableBytes()); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java index 8e3b1ee2df6..665a217eeff 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java @@ -57,6 +57,11 @@ public OpExecutorHandler(final Settings settings, final GraphManager graphManage this.scheduledExecutorService = scheduledExecutorService; } + // todo: !!! one more dirty hack + public void handle(final ChannelHandlerContext ctx, final Pair> objects) throws Exception { + channelRead0(ctx, objects); + } + @Override protected void channelRead0(final ChannelHandlerContext ctx, final Pair> objects) throws Exception { final RequestMessage msg = objects.getValue0(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java index 340b913165b..5954c056177 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java @@ -20,6 +20,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.tinkerpop.gremlin.util.function.ThrowingConsumer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; @@ -65,6 +66,32 @@ public OpSelectorHandler(final Settings settings, final GraphManager graphManage this.channelizer = channelizer; } + // todo: !!! one more dirty hack + public Pair> selectOp(final ChannelHandlerContext ctx, final RequestMessage msg) { + final Context gremlinServerContext = new Context(msg, ctx, settings, + graphManager, gremlinExecutor, this.scheduledExecutorService); + try { + // choose a processor to do the work based on the request message. + final Optional processor = OpLoader.getProcessor(msg.getProcessor()); + + if (processor.isPresent()) + // the processor is known so use it to evaluate the message + return Pair.with(msg, processor.get().select(gremlinServerContext)); + else { + // invalid op processor selected so write back an error by way of OpProcessorException. + final String errorMessage = String.format("Invalid OpProcessor requested [%s]", msg.getProcessor()); + throw new OpProcessorException(errorMessage, ResponseMessage.build(msg) + .code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS) + .statusMessage(errorMessage).create()); + } + } catch (OpProcessorException ope) { + logger.warn(ope.getMessage(), ope); + gremlinServerContext.writeAndFlush(ope.getResponseMessage()); + } + + return null; + } + @Override protected void decode(final ChannelHandlerContext ctx, final RequestMessage msg, final List objects) throws Exception { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/RoutingHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/RoutingHandler.java new file mode 100644 index 00000000000..12ff139b5e2 --- /dev/null +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/RoutingHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.handler; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.server.GremlinService; + +import java.util.List; + +import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +@ChannelHandler.Sharable +public class RoutingHandler extends ChannelInboundHandlerAdapter { + + private List services; + + public RoutingHandler(final List services) { + this.services = services; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + for (GremlinService service : services) { + // more than 1 service can handle same request. For example auth and gremlin + if (service.canHandle(ctx, msg) && service.handle(ctx, msg)) + return; + } + + boolean keepAlive = false; + + if (msg instanceof FullHttpRequest) { + final FullHttpRequest req = (FullHttpRequest) msg; + + if (HttpUtil.is100ContinueExpected(req)) { + ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); + } + } + + if (msg instanceof HttpMessage) { + keepAlive = HttpUtil.isKeepAlive((HttpMessage) msg); + } + + HttpHandlerUtil.sendError(ctx, NOT_FOUND, NOT_FOUND.toString(), keepAlive); + ReferenceCountUtil.release(msg); + } +} diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinBinaryRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinBinaryRequestDecoder.java index 227b2f7e352..409a6f16335 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinBinaryRequestDecoder.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinBinaryRequestDecoder.java @@ -18,6 +18,8 @@ */ package org.apache.tinkerpop.gremlin.server.handler; +import org.apache.tinkerpop.gremlin.server.GremlinBootstrapper; +import org.apache.tinkerpop.gremlin.server.ServiceContext; import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.ser.SerializationException; @@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +54,12 @@ public WsGremlinBinaryRequestDecoder(final Map> ser this.serializers = serializers; } + public WsGremlinBinaryRequestDecoder(final ServiceContext serviceContext) { + GremlinBootstrapper.init(serviceContext); + + this.serializers = (Map>)serviceContext.getService("serializers"); + } + @Override protected void decode(final ChannelHandlerContext channelHandlerContext, final BinaryWebSocketFrame frame, final List objects) throws Exception { final ByteBuf messageBytes = frame.content(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java index 5e64eb80aed..eb3ac034777 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinCloseRequestDecoder.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import org.apache.tinkerpop.gremlin.server.ServiceContext; import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.ser.SerializationException; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,6 +52,11 @@ public WsGremlinCloseRequestDecoder(final Map> seri this.serializers = serializers; } + public WsGremlinCloseRequestDecoder(final ServiceContext serviceContext) { + // todo: get from serviceContext. Use ServerSerializers.DEFAULT_TEXT_SERIALIZER for PoC + this.serializers = new HashMap<>(); + } + @Override protected void decode(final ChannelHandlerContext channelHandlerContext, final CloseWebSocketFrame frame, final List objects) throws Exception { final ByteBuf messageBytes = frame.content(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinTextRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinTextRequestDecoder.java index 8c564acb0b5..25358c19d36 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinTextRequestDecoder.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsGremlinTextRequestDecoder.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.server.handler; +import org.apache.tinkerpop.gremlin.server.ServiceContext; import org.apache.tinkerpop.gremlin.util.MessageSerializer; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializer; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +50,11 @@ public WsGremlinTextRequestDecoder(final Map> seria this.serializers = serializers; } + public WsGremlinTextRequestDecoder(final ServiceContext serviceContext) { + // todo: get from serviceContext. Use ServerSerializers.DEFAULT_TEXT_SERIALIZER for PoC + this.serializers = new HashMap<>(); + } + @Override protected void decode(final ChannelHandlerContext channelHandlerContext, final TextWebSocketFrame frame, final List objects) throws Exception { try {