Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;

import io.netty.channel.ChannelHandler;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;

import java.util.List;

/**

*/
public interface ChannelizerV4 extends Channelizer {

public void init(final ServiceContext serviceContext, final List<GremlinService> services);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tinkerpop.gremlin.server;

import io.netty.channel.EventLoopGroup;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.server.op.OpLoader;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
import org.apache.tinkerpop.gremlin.util.ser.GraphSONMessageSerializerV2;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

public class GremlinBootstrapper {

private static boolean started = false;
private static boolean disposed = false;

private static final Logger logger = LoggerFactory.getLogger(GremlinBootstrapper.class);

// todo: services should not be stored here
private static ExecutorService gremlinExecutorService;
private static ServerGremlinExecutor serverGremlinExecutor;
private static GraphManager graphManager;
private static final Map<String, MessageSerializer<?>> serializers = new HashMap<>();

//// AbstractChannelizer copy/paste
private static final List<Settings.SerializerSettings> DEFAULT_SERIALIZERS = Arrays.asList(
new Settings.SerializerSettings(GraphSONMessageSerializerV2.class.getName(), Collections.emptyMap()),
new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), Collections.emptyMap()),
new Settings.SerializerSettings(GraphBinaryMessageSerializerV1.class.getName(), new HashMap<String, Object>() {{
put(GraphBinaryMessageSerializerV1.TOKEN_SERIALIZE_RESULT_TO_STRING, true);
}})
);

public static void init(final ServiceContext serviceContext) {
if (started) return;

final EventLoopGroup workerGroup = (EventLoopGroup)serviceContext.getService("workerGroup");

// use the ExecutorService returned from ServerGremlinExecutor as it might be initialized there
serverGremlinExecutor = (ServerGremlinExecutor)serviceContext.getOrAddService(
ServerGremlinExecutor.class.getName(),
() -> new ServerGremlinExecutor(serviceContext.getSettings(), gremlinExecutorService, workerGroup));

// todo: use serviceContext.getOrAddService
gremlinExecutorService = serverGremlinExecutor.getGremlinExecutorService();
graphManager = serverGremlinExecutor.getGraphManager();

// initialize the OpLoader with configurations being passed to each OpProcessor implementation loaded
OpLoader.init(serviceContext.getSettings());

configureSerializers(serviceContext);

serviceContext.addService("serializers", serializers);

started = true;
}

private static void configureSerializers(final ServiceContext serviceContext) {
graphManager = serverGremlinExecutor.getGraphManager();

// grab some sensible defaults if no serializers are present in the config
final List<Settings.SerializerSettings> serializerSettings =
(null == serviceContext.getSettings().serializers || serviceContext.getSettings().serializers.isEmpty())
? DEFAULT_SERIALIZERS
: serviceContext.getSettings().serializers;

serializerSettings.stream().map(config -> {
try {
final Class clazz = Class.forName(config.className);
if (!MessageSerializer.class.isAssignableFrom(clazz)) {
logger.warn("The {} serialization class does not implement {} - it will not be available.", config.className, MessageSerializer.class.getCanonicalName());
return Optional.<MessageSerializer>empty();
}

if (clazz.getAnnotation(Deprecated.class) != null)
logger.warn("The {} serialization class is deprecated.", config.className);

final MessageSerializer<?> serializer = (MessageSerializer) clazz.newInstance();
final Map<String, Graph> graphsDefinedAtStartup = new HashMap<>();
for (String graphName : serviceContext.getSettings().graphs.keySet()) {
graphsDefinedAtStartup.put(graphName, graphManager.getGraph(graphName));
}

if (config.config != null)
serializer.configure(config.config, graphsDefinedAtStartup);

return Optional.ofNullable(serializer);
} catch (ClassNotFoundException cnfe) {
logger.warn("Could not find configured serializer class - {} - it will not be available", config.className);
return Optional.<MessageSerializer>empty();
} catch (Exception ex) {
logger.warn("Could not instantiate configured serializer class - {} - it will not be available. {}", config.className, ex.getMessage());
return Optional.<MessageSerializer>empty();
}
}).filter(Optional::isPresent).map(Optional::get).flatMap(serializer ->
Stream.of(serializer.mimeTypesSupported()).map(mimeType -> Pair.with(mimeType, serializer))
).forEach(pair -> {
final String mimeType = pair.getValue0();
final MessageSerializer<?> serializer = pair.getValue1();
if (serializers.containsKey(mimeType))
logger.info("{} already has {} configured - it will not be replaced by {}, change order of serialization configuration if this is not desired.",
mimeType, serializers.get(mimeType).getClass().getName(), serializer.getClass().getName());
else {
logger.info("Configured {} with {}", mimeType, pair.getValue1().getClass().getName());
serializers.put(mimeType, serializer);
}
});

if (serializers.size() == 0) {
logger.error("No serializers were successfully configured - server will not start.");
throw new RuntimeException("Serialization configuration error.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketDecoderConfig;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.tinkerpop.gremlin.server.handler.GremlinResponseFrameEncoder;
import org.apache.tinkerpop.gremlin.server.handler.RoutingHandler;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinBinaryRequestDecoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinCloseRequestDecoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinResponseFrameEncoder;
import org.apache.tinkerpop.gremlin.server.handler.WsGremlinTextRequestDecoder;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**

*/
public class GremlinChannelizerV4 extends ChannelInitializer<SocketChannel> implements ChannelizerV4 {

// todo: move out?
public static final String PIPELINE_REQUEST_HANDLER = "request-handler";
public static final String PIPELINE_HTTP_RESPONSE_ENCODER = "http-response-encoder";
public static final String PIPELINE_HTTP_AGGREGATOR = "http-aggregator";
public static final String PIPELINE_WEBSOCKET_SERVER_COMPRESSION = "web-socket-server-compression-handler";
protected static final String PIPELINE_HTTP_REQUEST_DECODER = "http-request-decoder";

private static final Logger logger = LoggerFactory.getLogger(GremlinChannelizerV4.class);

protected ServiceContext serviceContext;
protected RoutingHandler routingHandler;

@Override
public void init(final ServiceContext serviceContext, final List<GremlinService> services) {
this.serviceContext = serviceContext;
routingHandler = new RoutingHandler(services);
}

@Override
public void initChannel(final SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();

if (supportsIdleMonitor()) {
final int idleConnectionTimeout = (int) (serviceContext.getSettings().idleConnectionTimeout / 1000);
final int keepAliveInterval = (int) (serviceContext.getSettings().keepAliveInterval / 1000);
pipeline.addLast(new IdleStateHandler(idleConnectionTimeout, keepAliveInterval, 0));
}

// configureHttpPipeline(pipeline);

initWs();
configureWsPipeline(pipeline);
}

/// ----------- WS setup
private GremlinResponseFrameEncoder gremlinResponseFrameEncoder;
private WsGremlinTextRequestDecoder wsGremlinTextRequestDecoder;
private WsGremlinBinaryRequestDecoder wsGremlinBinaryRequestDecoder;
private WsGremlinResponseFrameEncoder wsGremlinResponseFrameEncoder;
private WsGremlinCloseRequestDecoder wsGremlinCloseRequestDecoder;

public void initWs() {
// all encoders should be wrapped as services
gremlinResponseFrameEncoder = new GremlinResponseFrameEncoder();
wsGremlinTextRequestDecoder = new WsGremlinTextRequestDecoder(serviceContext);
wsGremlinBinaryRequestDecoder = new WsGremlinBinaryRequestDecoder(serviceContext);
wsGremlinCloseRequestDecoder = new WsGremlinCloseRequestDecoder(serviceContext);
wsGremlinResponseFrameEncoder = new WsGremlinResponseFrameEncoder();
}

public void configureWsPipeline(final ChannelPipeline pipeline) {
final Settings settings = serviceContext.getSettings();

if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-encoder-aggregator", LogLevel.DEBUG));

pipeline.addLast(PIPELINE_HTTP_RESPONSE_ENCODER, new HttpResponseEncoder());

logger.debug("HttpRequestDecoder settings - maxInitialLineLength={}, maxHeaderSize={}, maxChunkSize={}",
settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize);
pipeline.addLast(PIPELINE_HTTP_REQUEST_DECODER, new HttpRequestDecoder(settings.maxInitialLineLength, settings.maxHeaderSize, settings.maxChunkSize));

if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-decoder-aggregator", LogLevel.DEBUG));

logger.debug("HttpObjectAggregator settings - maxContentLength={}, maxAccumulationBufferComponents={}",
settings.maxContentLength, settings.maxAccumulationBufferComponents);
final HttpObjectAggregator aggregator = new HttpObjectAggregator(settings.maxContentLength);
aggregator.setMaxCumulationBufferComponents(settings.maxAccumulationBufferComponents);
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);
// Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
pipeline.addLast(PIPELINE_WEBSOCKET_SERVER_COMPRESSION, new WebSocketServerCompressionHandler());

// setting closeOnProtocolViolation to false prevents causing all the other requests using the same channel
// to fail when a single request causes a protocol violation.
// todo: collect all ws services path
final WebSocketDecoderConfig wsDecoderConfig = WebSocketDecoderConfig.newBuilder().
closeOnProtocolViolation(false).allowExtensions(true).maxFramePayloadLength(settings.maxContentLength).build();
pipeline.addLast(PIPELINE_REQUEST_HANDLER, new WebSocketServerProtocolHandler("/gremlin",
null, false, false, 10000L, wsDecoderConfig));

if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));

pipeline.addLast("ws-frame-encoder", wsGremlinResponseFrameEncoder);
pipeline.addLast("response-frame-encoder", gremlinResponseFrameEncoder);
pipeline.addLast("request-text-decoder", wsGremlinTextRequestDecoder);
pipeline.addLast("request-binary-decoder", wsGremlinBinaryRequestDecoder);
pipeline.addLast("request-close-decoder", wsGremlinCloseRequestDecoder);

if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-aggregator-encoder", LogLevel.DEBUG));

pipeline.addLast("http-routing-handler", routingHandler);
}

public void configureHttpPipeline(final ChannelPipeline pipeline) {
if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));

pipeline.addLast("http-server", new HttpServerCodec());

if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("http-io", LogLevel.DEBUG));

final HttpObjectAggregator aggregator = new HttpObjectAggregator(serviceContext.getSettings().maxContentLength);
aggregator.setMaxCumulationBufferComponents(serviceContext.getSettings().maxAccumulationBufferComponents);
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, aggregator);

pipeline.addLast("http-routing-handler", routingHandler);
}

@Override
public void init(ServerGremlinExecutor serverGremlinExecutor) {
// remove
}
}
Loading