VYPR
High severity7.5NVD Advisory· Published Aug 30, 2017· Updated May 13, 2026

CVE-2017-13763

CVE-2017-13763

Description

ONOS versions 1.8.0, 1.9.0, and 1.10.0 do not restrict the amount of memory allocated. The Netty payload size is not limited.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
org.onosproject:onos-baseMaven
>= 1.8.0, < 1.11.01.11.0

Affected products

3
  • Onosproject/Onos3 versions
    cpe:2.3:a:onosproject:onos:1.8.0:*:*:*:*:*:*:*+ 2 more
    • cpe:2.3:a:onosproject:onos:1.8.0:*:*:*:*:*:*:*
    • cpe:2.3:a:onosproject:onos:1.9.0:*:*:*:*:*:*:*
    • cpe:2.3:a:onosproject:onos:1.10.0:*:*:*:*:*:*:*

Patches

1
f7c7f6f22997

[ONOS-6401] Implement dynamically computed timeouts for NettyMessagingManager

https://github.com/opennetworkinglab/onosJordan HaltermanMay 5, 2017via ghsa
5 files changed · +528 250
  • core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java+18 1 modified
    @@ -105,10 +105,19 @@ public InternalMessage(int preamble,
                                Endpoint sender,
                                String type,
                                byte[] payload) {
    -        this(preamble, time, id, sender, type, payload, Status.OK);
    +        this(preamble, time, id, sender, type, payload, null);
         }
     
         public InternalMessage(int preamble,
    +            HybridLogicalTime time,
    +            long id,
    +            Endpoint sender,
    +            byte[] payload,
    +            Status status) {
    +        this(preamble, time, id, sender, "", payload, status);
    +    }
    +
    +    InternalMessage(int preamble,
                                HybridLogicalTime time,
                                long id,
                                Endpoint sender,
    @@ -124,6 +133,14 @@ public InternalMessage(int preamble,
             this.status = status;
         }
     
    +    public boolean isRequest() {
    +        return status == null;
    +    }
    +
    +    public boolean isReply() {
    +        return status != null;
    +    }
    +
         public HybridLogicalTime time() {
             return time;
         }
    
  • core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java+7 2 modified
    @@ -88,15 +88,20 @@ protected void decode(
                 senderPort = buffer.readInt();
                 checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
             case READ_MESSAGE_TYPE_LENGTH:
    -            messageTypeLength = buffer.readInt();
    +            messageTypeLength = buffer.readShort();
                 checkpoint(DecoderState.READ_MESSAGE_TYPE);
             case READ_MESSAGE_TYPE:
                 byte[] messageTypeBytes = new byte[messageTypeLength];
                 buffer.readBytes(messageTypeBytes);
                 messageType = new String(messageTypeBytes, Charsets.UTF_8);
                 checkpoint(DecoderState.READ_MESSAGE_STATUS);
             case READ_MESSAGE_STATUS:
    -            status = Status.forId(buffer.readInt());
    +            int statusId = buffer.readByte();
    +            if (statusId == -1) {
    +                status = null;
    +            } else {
    +                status = Status.forId(statusId);
    +            }
                 checkpoint(DecoderState.READ_CONTENT_LENGTH);
             case READ_CONTENT_LENGTH:
                 contentLength = buffer.readInt();
    
  • core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java+7 2 modified
    @@ -79,13 +79,18 @@ protected void encode(
             byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
     
             // write length of message type
    -        out.writeInt(messageTypeBytes.length);
    +        out.writeShort(messageTypeBytes.length);
     
             // write message type bytes
             out.writeBytes(messageTypeBytes);
     
             // write message status value
    -        out.writeInt(message.status().id());
    +        InternalMessage.Status status = message.status();
    +        if (status == null) {
    +            out.writeByte(-1);
    +        } else {
    +            out.writeByte(status.id());
    +        }
     
             byte[] payload = message.payload();
     
    
  • core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java+480 245 modified
    @@ -18,15 +18,13 @@
     import com.google.common.base.Strings;
     import com.google.common.cache.Cache;
     import com.google.common.cache.CacheBuilder;
    -import com.google.common.cache.RemovalListener;
    -import com.google.common.cache.RemovalNotification;
    +import com.google.common.collect.Maps;
     import com.google.common.util.concurrent.MoreExecutors;
     
     import io.netty.bootstrap.Bootstrap;
     import io.netty.bootstrap.ServerBootstrap;
     import io.netty.buffer.PooledByteBufAllocator;
     import io.netty.channel.Channel;
    -import io.netty.channel.ChannelFuture;
     import io.netty.channel.ChannelHandler;
     import io.netty.channel.ChannelHandlerContext;
     import io.netty.channel.ChannelInitializer;
    @@ -39,18 +37,23 @@
     import io.netty.channel.epoll.EpollServerSocketChannel;
     import io.netty.channel.epoll.EpollSocketChannel;
     import io.netty.channel.nio.NioEventLoopGroup;
    +import io.netty.channel.pool.AbstractChannelPoolHandler;
    +import io.netty.channel.pool.AbstractChannelPoolMap;
    +import io.netty.channel.pool.ChannelPool;
    +import io.netty.channel.pool.ChannelPoolMap;
    +import io.netty.channel.pool.SimpleChannelPool;
     import io.netty.channel.socket.SocketChannel;
     import io.netty.channel.socket.nio.NioServerSocketChannel;
     import io.netty.channel.socket.nio.NioSocketChannel;
    -import org.apache.commons.pool.KeyedPoolableObjectFactory;
    -import org.apache.commons.pool.impl.GenericKeyedObjectPool;
    +import io.netty.util.concurrent.FutureListener;
    +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
    +import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
     import org.apache.felix.scr.annotations.Activate;
     import org.apache.felix.scr.annotations.Component;
     import org.apache.felix.scr.annotations.Deactivate;
     import org.apache.felix.scr.annotations.Reference;
     import org.apache.felix.scr.annotations.ReferenceCardinality;
     import org.apache.felix.scr.annotations.Service;
    -import org.onlab.util.Tools;
     import org.onosproject.cluster.ClusterMetadataService;
     import org.onosproject.cluster.ControllerNode;
     import org.onosproject.core.HybridLogicalClockService;
    @@ -67,21 +70,27 @@
     import javax.net.ssl.TrustManagerFactory;
     
     import java.io.FileInputStream;
    -import java.io.IOException;
    +import java.net.ConnectException;
     import java.security.KeyStore;
    +import java.time.Duration;
    +import java.util.Iterator;
     import java.util.Map;
     import java.util.Optional;
     import java.util.concurrent.CompletableFuture;
     import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
     import java.util.concurrent.Executor;
    +import java.util.concurrent.Executors;
     import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ScheduledFuture;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.TimeoutException;
     import java.util.concurrent.atomic.AtomicBoolean;
     import java.util.concurrent.atomic.AtomicLong;
     import java.util.function.BiConsumer;
     import java.util.function.BiFunction;
    -import java.util.function.Consumer;
    +import java.util.function.Function;
     
     import static org.onlab.util.Tools.groupedThreads;
     import static org.onosproject.security.AppGuard.checkPermission;
    @@ -93,41 +102,51 @@
     @Component(immediate = true)
     @Service
     public class NettyMessagingManager implements MessagingService {
    -
    -    private static final int REPLY_TIME_OUT_MILLIS = 500;
    +    private static final long DEFAULT_TIMEOUT_MILLIS = 500;
    +    private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(10).toMillis();
    +    private static final long MIN_TIMEOUT_MILLIS = 100;
    +    private static final long MAX_TIMEOUT_MILLIS = 5000;
    +    private static final long TIMEOUT_INTERVAL = 50;
    +    private static final int WINDOW_SIZE = 100;
    +    private static final double TIMEOUT_MULTIPLIER = 2.5;
         private static final short MIN_KS_LENGTH = 6;
     
    +    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    +
         private final Logger log = LoggerFactory.getLogger(getClass());
     
    -    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
    +    private final ClientConnection localClientConnection = new LocalClientConnection();
    +    private final ServerConnection localServerConnection = new LocalServerConnection(null);
     
         @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
         protected HybridLogicalClockService clockService;
     
    -    private Endpoint localEp;
    +    private Endpoint localEndpoint;
         private int preamble;
         private final AtomicBoolean started = new AtomicBoolean(false);
    -    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
    +    private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
    +    private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
    +    private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
         private final AtomicLong messageIdGenerator = new AtomicLong(0);
    -    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
    -            .expireAfterWrite(REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)
    -            .removalListener(new RemovalListener<Long, Callback>() {
    -                @Override
    -                public void onRemoval(RemovalNotification<Long, Callback> entry) {
    -                    if (entry.wasEvicted()) {
    -                        entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
    -                    }
    -                }
    -            })
    +
    +    private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
    +            .expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
                 .build();
    +    private ScheduledFuture<?> timeoutFuture;
     
    -    private final GenericKeyedObjectPool<Endpoint, Connection> channels
    -            = new GenericKeyedObjectPool<>(new OnosCommunicationChannelFactory());
    +    private final ChannelPoolMap<Endpoint, SimpleChannelPool> channels =
    +            new AbstractChannelPoolMap<Endpoint, SimpleChannelPool>() {
    +                @Override
    +                protected SimpleChannelPool newPool(Endpoint endpoint) {
    +                    return new SimpleChannelPool(bootstrapClient(endpoint), new ClientChannelPoolHandler());
    +                }
    +            };
     
         private EventLoopGroup serverGroup;
         private EventLoopGroup clientGroup;
         private Class<? extends ServerChannel> serverChannelClass;
         private Class<? extends Channel> clientChannelClass;
    +    private ScheduledExecutorService timeoutExecutor;
     
         protected static final boolean TLS_DISABLED = false;
         protected boolean enableNettyTls = TLS_DISABLED;
    @@ -146,29 +165,28 @@ public void activate() throws Exception {
             getTlsParameters();
     
             if (started.get()) {
    -            log.warn("Already running at local endpoint: {}", localEp);
    +            log.warn("Already running at local endpoint: {}", localEndpoint);
                 return;
             }
             this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
    -        this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
    -        channels.setLifo(true);
    -        channels.setTestOnBorrow(true);
    -        channels.setTestOnReturn(true);
    -        channels.setMinEvictableIdleTimeMillis(60_000L);
    -        channels.setTimeBetweenEvictionRunsMillis(30_000L);
    +        this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
             initEventLoopGroup();
             startAcceptingConnections();
    +        timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
    +                groupedThreads("NettyMessagingEvt", "timeout", log));
    +        timeoutFuture = timeoutExecutor.scheduleAtFixedRate(
    +                this::timeoutAllCallbacks, TIMEOUT_INTERVAL, TIMEOUT_INTERVAL, TimeUnit.MILLISECONDS);
             started.set(true);
    -        serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
             log.info("Started");
         }
     
         @Deactivate
         public void deactivate() throws Exception {
             if (started.get()) {
    -            channels.close();
                 serverGroup.shutdownGracefully();
                 clientGroup.shutdownGracefully();
    +            timeoutFuture.cancel(false);
    +            timeoutExecutor.shutdown();
                 started.set(false);
             }
             log.info("Stopped");
    @@ -201,6 +219,7 @@ private void getTlsParameters() {
                 }
             }
         }
    +
         private void initEventLoopGroup() {
             // try Epoll first and if that does work, use nio.
             try {
    @@ -211,52 +230,39 @@ private void initEventLoopGroup() {
                 return;
             } catch (Throwable e) {
                 log.debug("Failed to initialize native (epoll) transport. "
    -                              + "Reason: {}. Proceeding with nio.", e.getMessage());
    +                    + "Reason: {}. Proceeding with nio.", e.getMessage());
             }
             clientGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioC-%d", log));
             serverGroup = new NioEventLoopGroup(0, groupedThreads("NettyMessagingEvt", "nioS-%d", log));
             serverChannelClass = NioServerSocketChannel.class;
             clientChannelClass = NioSocketChannel.class;
         }
     
    +    /**
    +     * Times out response callbacks.
    +     */
    +    private void timeoutAllCallbacks() {
    +        // Iterate through all connections and time out callbacks.
    +        for (RemoteClientConnection connection : clientConnections.values()) {
    +            connection.timeoutCallbacks();
    +        }
    +
    +        // Iterate through all timeout histories and recompute the timeout.
    +        for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
    +            timeoutHistory.recomputeTimeoutMillis();
    +        }
    +    }
    +
         @Override
         public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
             checkPermission(CLUSTER_WRITE);
             InternalMessage message = new InternalMessage(preamble,
    -                                                      clockService.timeNow(),
    -                                                      messageIdGenerator.incrementAndGet(),
    -                                                      localEp,
    -                                                      type,
    -                                                      payload);
    -        return sendAsync(ep, message);
    -    }
    -
    -    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
    -        checkPermission(CLUSTER_WRITE);
    -        if (ep.equals(localEp)) {
    -            try {
    -                dispatchLocally(message);
    -            } catch (IOException e) {
    -                return Tools.exceptionalFuture(e);
    -            }
    -            return CompletableFuture.completedFuture(null);
    -        }
    -
    -        CompletableFuture<Void> future = new CompletableFuture<>();
    -        try {
    -            Connection connection = null;
    -            try {
    -                connection = channels.borrowObject(ep);
    -                connection.send(message, future);
    -            } finally {
    -                if (connection != null) {
    -                    channels.returnObject(ep, connection);
    -                }
    -            }
    -        } catch (Exception e) {
    -            future.completeExceptionally(e);
    -        }
    -        return future;
    +                clockService.timeNow(),
    +                messageIdGenerator.incrementAndGet(),
    +                localEndpoint,
    +                type,
    +                payload);
    +        return executeOnPooledConnection(ep, c -> c.sendAsync(message), MoreExecutors.directExecutor());
         }
     
         @Override
    @@ -268,21 +274,62 @@ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[]
         @Override
         public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
             checkPermission(CLUSTER_WRITE);
    -        CompletableFuture<byte[]> future = new CompletableFuture<>();
    -        Callback callback = new Callback(future, executor);
             Long messageId = messageIdGenerator.incrementAndGet();
    -        callbacks.put(messageId, callback);
             InternalMessage message = new InternalMessage(preamble,
    -                                                      clockService.timeNow(),
    -                                                      messageId,
    -                                                      localEp,
    -                                                      type,
    -                                                      payload);
    -
    -        sendAsync(ep, message).whenComplete((response, error) -> {
    -            if (error != null) {
    -                callbacks.invalidate(messageId);
    -                callback.completeExceptionally(error);
    +                clockService.timeNow(),
    +                messageId,
    +                localEndpoint,
    +                type,
    +                payload);
    +        return executeOnPooledConnection(ep, c -> c.sendAndReceive(message), executor);
    +    }
    +
    +    /**
    +     * Executes the given callback on a pooled connection.
    +     *
    +     * @param endpoint the endpoint to which to send a message
    +     * @param callback the callback to execute to send the message
    +     * @param <T> the send result type
    +     * @return a completable future to be completed with the result of the supplied function
    +     */
    +    private <T> CompletableFuture<T> executeOnPooledConnection(
    +            Endpoint endpoint,
    +            Function<ClientConnection, CompletableFuture<T>> callback,
    +            Executor executor) {
    +        if (endpoint.equals(localEndpoint)) {
    +            CompletableFuture<T> future = new CompletableFuture<>();
    +            callback.apply(localClientConnection).whenComplete((result, error) -> {
    +               if (error == null) {
    +                   executor.execute(() -> future.complete(result));
    +               } else {
    +                   executor.execute(() -> future.completeExceptionally(error));
    +               }
    +            });
    +            return future;
    +        }
    +
    +        CompletableFuture<T> future = new CompletableFuture<>();
    +        ChannelPool pool = channels.get(endpoint);
    +        pool.acquire().addListener((FutureListener<Channel>) channelResult -> {
    +            if (channelResult.isSuccess()) {
    +                Channel channel = channelResult.getNow();
    +                ClientConnection connection = clientConnections.computeIfAbsent(channel, RemoteClientConnection::new);
    +                callback.apply(connection).whenComplete((result, error) -> {
    +                    pool.release(channel).addListener(releaseResult -> {
    +                        if (!releaseResult.isSuccess()) {
    +                            clientConnections.remove(channel);
    +                            connection.close();
    +                        }
    +                    });
    +
    +                    if (error == null) {
    +                        executor.execute(() -> future.complete(result));
    +                    } else {
    +                        executor.execute(() -> future.completeExceptionally(error));
    +                    }
    +                });
    +            } else {
    +                executor.execute(() -> future.completeExceptionally(channelResult.cause()));
                 }
             });
             return future;
    @@ -291,13 +338,14 @@ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[]
         @Override
         public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
             checkPermission(CLUSTER_WRITE);
    -        handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
    +        handlers.put(type, (message, connection) -> executor.execute(() ->
    +                handler.accept(message.sender(), message.payload())));
         }
     
         @Override
         public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
             checkPermission(CLUSTER_WRITE);
    -        handlers.put(type, message -> executor.execute(() -> {
    +        handlers.put(type, (message, connection) -> executor.execute(() -> {
                 byte[] responsePayload = null;
                 Status status = Status.OK;
                 try {
    @@ -306,14 +354,14 @@ public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> ha
                     log.debug("An error occurred in a message handler: {}", e);
                     status = Status.ERROR_HANDLER_EXCEPTION;
                 }
    -            sendReply(message, status, Optional.ofNullable(responsePayload));
    +            connection.reply(message, status, Optional.ofNullable(responsePayload));
             }));
         }
     
         @Override
         public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
             checkPermission(CLUSTER_WRITE);
    -        handlers.put(type, message -> {
    +        handlers.put(type, (message, connection) -> {
                 handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
                     Status status;
                     if (error == null) {
    @@ -322,7 +370,7 @@ public void registerHandler(String type, BiFunction<Endpoint, byte[], Completabl
                         log.debug("An error occurred in a message handler: {}", error);
                         status = Status.ERROR_HANDLER_EXCEPTION;
                     }
    -                sendReply(message, status, Optional.ofNullable(result));
    +                connection.reply(message, status, Optional.ofNullable(result));
                 });
             });
         }
    @@ -333,10 +381,26 @@ public void unregisterHandler(String type) {
             handlers.remove(type);
         }
     
    +    private Bootstrap bootstrapClient(Endpoint endpoint) {
    +        Bootstrap bootstrap = new Bootstrap();
    +        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    +        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
    +                new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
    +        bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
    +        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
    +        bootstrap.group(clientGroup);
    +        // TODO: Make this faster:
    +        // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
    +        bootstrap.channel(clientChannelClass);
    +        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    +        bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
    +        return bootstrap;
    +    }
    +
         private void startAcceptingConnections() throws InterruptedException {
             ServerBootstrap b = new ServerBootstrap();
             b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
    -                      new WriteBufferWaterMark(8 * 1024, 32 * 1024));
    +                new WriteBufferWaterMark(8 * 1024, 32 * 1024));
             b.option(ChannelOption.SO_RCVBUF, 1048576);
             b.option(ChannelOption.TCP_NODELAY, true);
             b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    @@ -345,82 +409,41 @@ private void startAcceptingConnections() throws InterruptedException {
             if (enableNettyTls) {
                 b.childHandler(new SslServerCommunicationChannelInitializer());
             } else {
    -            b.childHandler(new OnosCommunicationChannelInitializer());
    +            b.childHandler(new BasicChannelInitializer());
             }
             b.option(ChannelOption.SO_BACKLOG, 128);
             b.childOption(ChannelOption.SO_KEEPALIVE, true);
     
             // Bind and start to accept incoming connections.
    -        b.bind(localEp.port()).sync().addListener(future -> {
    +        b.bind(localEndpoint.port()).sync().addListener(future -> {
                 if (future.isSuccess()) {
    -                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
    +                log.info("{} accepting incoming connections on port {}",
    +                        localEndpoint.host(), localEndpoint.port());
                 } else {
    -                log.warn("{} failed to bind to port {} due to {}", localEp.host(), localEp.port(), future.cause());
    +                log.warn("{} failed to bind to port {} due to {}",
    +                        localEndpoint.host(), localEndpoint.port(), future.cause());
                 }
             });
         }
     
    -    private class OnosCommunicationChannelFactory
    -            implements KeyedPoolableObjectFactory<Endpoint, Connection> {
    -
    +    /**
    +     * Channel pool handler.
    +     */
    +    private class ClientChannelPoolHandler extends AbstractChannelPoolHandler {
             @Override
    -        public void activateObject(Endpoint endpoint, Connection connection)
    -                throws Exception {
    -        }
    -
    -        @Override
    -        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
    -            log.debug("Closing connection {} to {}", connection, ep);
    -            //Is this the right way to destroy?
    -            connection.destroy();
    -        }
    -
    -        @Override
    -        public Connection makeObject(Endpoint ep) throws Exception {
    -            Bootstrap bootstrap = new Bootstrap();
    -            bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    -            bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
    -                          new WriteBufferWaterMark(10 * 32 * 1024, 10 * 64 * 1024));
    -            bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
    -            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
    -            bootstrap.group(clientGroup);
    -            // TODO: Make this faster:
    -            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
    -            bootstrap.channel(clientChannelClass);
    -            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    +        public void channelCreated(Channel channel) throws Exception {
                 if (enableNettyTls) {
    -                bootstrap.handler(new SslClientCommunicationChannelInitializer());
    +                new SslClientCommunicationChannelInitializer().initChannel((SocketChannel) channel);
                 } else {
    -                bootstrap.handler(new OnosCommunicationChannelInitializer());
    +                new BasicChannelInitializer().initChannel((SocketChannel) channel);
                 }
    -            // Start the client.
    -            CompletableFuture<Channel> retFuture = new CompletableFuture<>();
    -            ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
    -
    -            f.addListener(future -> {
    -                if (future.isSuccess()) {
    -                    retFuture.complete(f.channel());
    -                } else {
    -                    retFuture.completeExceptionally(future.cause());
    -                }
    -            });
    -            log.debug("Established a new connection to {}", ep);
    -            return new Connection(retFuture);
    -        }
    -
    -        @Override
    -        public void passivateObject(Endpoint ep, Connection connection)
    -                throws Exception {
    -        }
    -
    -        @Override
    -        public boolean validateObject(Endpoint ep, Connection connection) {
    -            return connection.validate();
             }
         }
     
    +    /**
    +     * Channel initializer for TLS servers.
    +     */
         private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
    -
             private final ChannelHandler dispatcher = new InboundMessageDispatcher();
             private final ChannelHandler encoder = new MessageEncoder(preamble);
     
    @@ -454,8 +477,10 @@ protected void initChannel(SocketChannel channel) throws Exception {
             }
         }
     
    +    /**
    +     * Channel initializer for TLS clients.
    +     */
         private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
    -
             private final ChannelHandler dispatcher = new InboundMessageDispatcher();
             private final ChannelHandler encoder = new MessageEncoder(preamble);
     
    @@ -488,8 +513,10 @@ protected void initChannel(SocketChannel channel) throws Exception {
             }
         }
     
    -    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
    -
    +    /**
    +     * Channel initializer for basic connections.
    +     */
    +    private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
             private final ChannelHandler dispatcher = new InboundMessageDispatcher();
             private final ChannelHandler encoder = new MessageEncoder(preamble);
     
    @@ -502,16 +529,27 @@ protected void initChannel(SocketChannel channel) throws Exception {
             }
         }
     
    +    /**
    +     * Channel inbound handler that dispatches messages to the appropriate handler.
    +     */
         @ChannelHandler.Sharable
         private class InboundMessageDispatcher extends SimpleChannelInboundHandler<Object> {
    -     // Effectively SimpleChannelInboundHandler<InternalMessage>,
    -     // had to specify <Object> to avoid Class Loader not being able to find some classes.
    +        // Effectively SimpleChannelInboundHandler<InternalMessage>,
    +        // had to specify <Object> to avoid Class Loader not being able to find some classes.
     
             @Override
             protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
                 InternalMessage message = (InternalMessage) rawMessage;
                 try {
    -                dispatchLocally(message);
    +                if (message.isRequest()) {
    +                    RemoteServerConnection connection =
    +                            serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
    +                    connection.dispatch(message);
    +                } else {
    +                    RemoteClientConnection connection =
    +                            clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
    +                    connection.dispatch(message);
    +                }
                 } catch (RejectedExecutionException e) {
                     log.warn("Unable to dispatch message due to {}", e.getMessage());
                 }
    @@ -520,6 +558,16 @@ protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws
             @Override
             public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
                 log.error("Exception inside channel handling pipeline.", cause);
    +
    +            RemoteClientConnection clientConnection = clientConnections.remove(context.channel());
    +            if (clientConnection != null) {
    +                clientConnection.close();
    +            }
    +
    +            RemoteServerConnection serverConnection = serverConnections.remove(context.channel());
    +            if (serverConnection != null) {
    +                serverConnection.close();
    +            }
                 context.close();
             }
     
    @@ -537,133 +585,320 @@ public final boolean acceptInboundMessage(Object msg) {
             }
         }
     
    -    private void dispatchLocally(InternalMessage message) throws IOException {
    -        if (message.preamble() != preamble) {
    -            log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
    -            sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
    +    /**
    +     * Wraps a {@link CompletableFuture} and tracks its type and creation time.
    +     */
    +    private final class Callback {
    +        private final String type;
    +        private final CompletableFuture<byte[]> future;
    +        private final long time = System.currentTimeMillis();
    +
    +        Callback(String type, CompletableFuture<byte[]> future) {
    +            this.type = type;
    +            this.future = future;
             }
    -        clockService.recordEventTime(message.time());
    -        String type = message.type();
    -        if (REPLY_MESSAGE_TYPE.equals(type)) {
    -            try {
    -                Callback callback =
    -                        callbacks.getIfPresent(message.id());
    -                if (callback != null) {
    -                    if (message.status() == Status.OK) {
    -                        callback.complete(message.payload());
    -                    } else if (message.status() == Status.ERROR_NO_HANDLER) {
    -                        callback.completeExceptionally(new MessagingException.NoRemoteHandler());
    -                    } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
    -                        callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
    -                    } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
    -                        callback.completeExceptionally(new MessagingException.ProtocolException());
    -                    }
    -                } else {
    -                    log.debug("Received a reply for message id:[{}]. "
    -                                     + " from {}. But was unable to locate the"
    -                                     + " request handle", message.id(), message.sender());
    -                }
    -            } finally {
    -                callbacks.invalidate(message.id());
    -            }
    -            return;
    +
    +        public void complete(byte[] value) {
    +            future.complete(value);
             }
    -        Consumer<InternalMessage> handler = handlers.get(type);
    -        if (handler != null) {
    -            handler.accept(message);
    -        } else {
    -            log.debug("No handler for message type {} from {}", message.type(), message.sender());
    -            sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
    +
    +        public void completeExceptionally(Throwable error) {
    +            future.completeExceptionally(error);
             }
         }
     
    -    private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
    -        InternalMessage response = new InternalMessage(preamble,
    -                clockService.timeNow(),
    -                message.id(),
    -                localEp,
    -                REPLY_MESSAGE_TYPE,
    -                responsePayload.orElse(new byte[0]),
    -                status);
    -        sendAsync(message.sender(), response).whenComplete((result, error) -> {
    -            if (error != null) {
    -                log.debug("Failed to respond", error);
    +    /**
    +     * Represents the client side of a connection to a local or remote server.
    +     */
    +    private interface ClientConnection {
    +
    +        /**
    +         * Sends a message to the other side of the connection.
    +         *
    +         * @param message the message to send
    +         * @return a completable future to be completed once the message has been sent
    +         */
    +        CompletableFuture<Void> sendAsync(InternalMessage message);
    +
    +        /**
    +         * Sends a message to the other side of the connection, awaiting a reply.
    +         *
    +         * @param message the message to send
    +         * @return a completable future to be completed once a reply is received or the request times out
    +         */
    +        CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
    +
    +        /**
    +         * Closes the connection.
    +         */
    +        default void close() {
    +        }
    +    }
    +
    +    /**
    +     * Represents the server side of a connection.
    +     */
    +    private interface ServerConnection {
    +
    +        /**
    +         * Sends a reply to the other side of the connection.
    +         *
    +         * @param message the message to which to reply
    +         * @param status the reply status
    +         * @param payload the response payload
    +         */
    +        void reply(InternalMessage message, Status status, Optional<byte[]> payload);
    +
    +        /**
    +         * Closes the connection.
    +         */
    +        default void close() {
    +        }
    +    }
    +
    +    /**
    +     * Local connection implementation.
    +     */
    +    private final class LocalClientConnection implements ClientConnection {
    +        @Override
    +        public CompletableFuture<Void> sendAsync(InternalMessage message) {
    +            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
    +            if (handler != null) {
    +                handler.accept(message, localServerConnection);
    +            } else {
    +                log.debug("No handler for message type {} from {}", message.type(), message.sender());
                 }
    -        });
    +            return CompletableFuture.completedFuture(null);
    +        }
    +
    +        @Override
    +        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
    +            CompletableFuture<byte[]> future = new CompletableFuture<>();
    +            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
    +            if (handler != null) {
    +                handler.accept(message, new LocalServerConnection(future));
    +            } else {
    +                log.debug("No handler for message type {} from {}", message.type(), message.sender());
    +                new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
    +            }
    +            return future;
    +        }
         }
     
    -    private final class Callback {
    +    /**
    +     * Local server connection.
    +     */
    +    private final class LocalServerConnection implements ServerConnection {
             private final CompletableFuture<byte[]> future;
    -        private final Executor executor;
     
    -        public Callback(CompletableFuture<byte[]> future, Executor executor) {
    +        LocalServerConnection(CompletableFuture<byte[]> future) {
                 this.future = future;
    -            this.executor = executor;
             }
     
    -        public void complete(byte[] value) {
    -            executor.execute(() -> future.complete(value));
    -        }
    -
    -        public void completeExceptionally(Throwable error) {
    -            executor.execute(() -> future.completeExceptionally(error));
    +        @Override
    +        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
    +            if (future != null) {
    +                if (status == Status.OK) {
    +                    future.complete(payload.orElse(EMPTY_PAYLOAD));
    +                } else if (status == Status.ERROR_NO_HANDLER) {
    +                    future.completeExceptionally(new MessagingException.NoRemoteHandler());
    +                } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
    +                    future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
    +                } else if (status == Status.PROTOCOL_EXCEPTION) {
    +                    future.completeExceptionally(new MessagingException.ProtocolException());
    +                }
    +            }
             }
         }
    -    private final class Connection {
    -        private final CompletableFuture<Channel> internalFuture;
     
    -        public Connection(CompletableFuture<Channel> internalFuture) {
    -            this.internalFuture = internalFuture;
    +    /**
    +     * Remote connection implementation.
    +     */
    +    private final class RemoteClientConnection implements ClientConnection {
    +        private final Channel channel;
    +        private final Map<Long, Callback> futures = Maps.newConcurrentMap();
    +        private final AtomicBoolean closed = new AtomicBoolean(false);
    +
    +        RemoteClientConnection(Channel channel) {
    +            this.channel = channel;
             }
     
             /**
    -         * Sends a message out on its channel and associated the message with a
    -         * completable future used for signaling.
    -         * @param message the message to be sent
    -         * @param future a future that is completed normally or exceptionally if
    -         *               message sending succeeds or fails respectively
    +         * Times out callbacks for this connection.
              */
    -        public void send(Object message, CompletableFuture<Void> future) {
    -            internalFuture.whenComplete((channel, throwable) -> {
    -                if (throwable == null) {
    -                    channel.writeAndFlush(message).addListener(channelFuture -> {
    -                        if (!channelFuture.isSuccess()) {
    -                            future.completeExceptionally(channelFuture.cause());
    -                        } else {
    -                            future.complete(null);
    -                        }
    -                    });
    +        private void timeoutCallbacks() {
    +            // Store the current time.
    +            long currentTime = System.currentTimeMillis();
    +
    +            // Iterate through future callbacks and time out callbacks that have been alive
    +            // longer than the current timeout according to the message type.
    +            Iterator<Map.Entry<Long, Callback>> iterator = futures.entrySet().iterator();
    +            while (iterator.hasNext()) {
    +                Callback callback = iterator.next().getValue();
    +                try {
    +                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
    +                    long currentTimeout = timeoutHistory.currentTimeout;
    +                    if (currentTime - callback.time > currentTimeout) {
    +                        iterator.remove();
    +                        long elapsedTime = currentTime - callback.time;
    +                        timeoutHistory.addReplyTime(elapsedTime);
    +                        callback.completeExceptionally(
    +                                new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
    +                    }
    +                } catch (ExecutionException e) {
    +                    throw new AssertionError();
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public CompletableFuture<Void> sendAsync(InternalMessage message) {
    +            CompletableFuture<Void> future = new CompletableFuture<>();
    +            channel.writeAndFlush(message).addListener(channelFuture -> {
    +                if (!channelFuture.isSuccess()) {
    +                    future.completeExceptionally(channelFuture.cause());
                     } else {
    -                    future.completeExceptionally(throwable);
    +                    future.complete(null);
    +                }
    +            });
    +            return future;
    +        }
    +
    +        @Override
    +        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
    +            CompletableFuture<byte[]> future = new CompletableFuture<>();
    +            Callback callback = new Callback(message.type(), future);
    +            futures.put(message.id(), callback);
    +            channel.writeAndFlush(message).addListener(channelFuture -> {
    +                if (!channelFuture.isSuccess()) {
    +                    futures.remove(message.id());
    +                    callback.completeExceptionally(channelFuture.cause());
                     }
                 });
    +            return future;
    +        }
    +
    +        /**
    +         * Dispatches a message to a local handler.
    +         *
    +         * @param message the message to dispatch
    +         */
    +        private void dispatch(InternalMessage message) {
    +            if (message.preamble() != preamble) {
    +                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
    +                return;
    +            }
    +
    +            clockService.recordEventTime(message.time());
    +
    +            Callback callback = futures.remove(message.id());
    +            if (callback != null) {
    +                if (message.status() == Status.OK) {
    +                    callback.complete(message.payload());
    +                } else if (message.status() == Status.ERROR_NO_HANDLER) {
    +                    callback.completeExceptionally(new MessagingException.NoRemoteHandler());
    +                } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
    +                    callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
    +                } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
    +                    callback.completeExceptionally(new MessagingException.ProtocolException());
    +                }
    +
    +                try {
    +                    TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
    +                    timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
    +                } catch (ExecutionException e) {
    +                    throw new AssertionError();
    +                }
    +            } else {
    +                log.debug("Received a reply for message id:[{}]. "
    +                        + " from {}. But was unable to locate the"
    +                        + " request handle", message.id(), message.sender());
    +            }
    +        }
    +
    +        @Override
    +        public void close() {
    +            if (closed.compareAndSet(false, true)) {
    +                timeoutFuture.cancel(false);
    +                for (Callback callback : futures.values()) {
    +                    callback.completeExceptionally(new ConnectException());
    +                }
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Remote server connection.
    +     */
    +    private final class RemoteServerConnection implements ServerConnection {
    +        private final Channel channel;
    +
    +        RemoteServerConnection(Channel channel) {
    +            this.channel = channel;
             }
     
             /**
    -         * Destroys a channel by closing its channel (if it exists) and
    -         * cancelling its future.
    +         * Dispatches a message to a local handler.
    +         *
    +         * @param message the message to dispatch
              */
    -        public void destroy() {
    -            Channel channel = internalFuture.getNow(null);
    -            if (channel != null) {
    -                channel.close();
    +        private void dispatch(InternalMessage message) {
    +            if (message.preamble() != preamble) {
    +                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
    +                reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
    +                return;
    +            }
    +
    +            clockService.recordEventTime(message.time());
    +
    +            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
    +            if (handler != null) {
    +                handler.accept(message, this);
    +            } else {
    +                log.debug("No handler for message type {} from {}", message.type(), message.sender());
    +                reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
                 }
    -            internalFuture.cancel(false);
    +        }
    +
    +        @Override
    +        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
    +            InternalMessage response = new InternalMessage(preamble,
    +                    clockService.timeNow(),
    +                    message.id(),
    +                    localEndpoint,
    +                    payload.orElse(EMPTY_PAYLOAD),
    +                    status);
    +            channel.writeAndFlush(response);
    +        }
    +    }
    +
    +    /**
    +     * Request-reply timeout history tracker.
    +     */
    +    private static final class TimeoutHistory {
    +        private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
    +        private final AtomicLong maxReplyTime = new AtomicLong();
    +        private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
    +
    +        /**
    +         * Adds a reply time to the history.
    +         *
    +         * @param replyTime the reply time to add to the history
    +         */
    +        void addReplyTime(long replyTime) {
    +            maxReplyTime.getAndAccumulate(replyTime, Math::max);
             }
     
             /**
    -         * Determines whether the connection is valid meaning it is either
    -         * complete with and active channel
    -         * or it has not yet completed.
    -         * @return true if the channel has an active connection or has not
    -         * yet completed
    +         * Computes the current timeout.
              */
    -        public boolean validate() {
    -            if (internalFuture.isCompletedExceptionally()) {
    -                return false;
    +        private void recomputeTimeoutMillis() {
    +            double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
    +            timeoutHistory.addValue(
    +                    Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
    +            if (timeoutHistory.getN() == WINDOW_SIZE) {
    +                this.currentTimeout = (long) timeoutHistory.getMax();
                 }
    -            Channel channel = internalFuture.getNow(null);
    -            return channel == null || channel.isActive();
             }
         }
     }
    
  • core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java+16 0 modified
    @@ -37,9 +37,11 @@
     import java.util.Arrays;
     import java.util.UUID;
     import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
     import java.util.concurrent.CountDownLatch;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeoutException;
     import java.util.concurrent.atomic.AtomicBoolean;
     import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
    @@ -157,6 +159,20 @@ public void testSendAndReceive() {
             assertEquals(ep1, sender.get());
         }
     
    +    @Test
    +    public void testSendTimeout() {
    +        String subject = nextSubject();
    +        BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
    +        netty2.registerHandler(subject, handler);
    +
    +        try {
    +            netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
    +            fail();
    +        } catch (CompletionException e) {
    +            assertTrue(e.getCause() instanceof TimeoutException);
    +        }
    +    }
    +
         /*
          * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
          * and response completion occurs on the expected thread.
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

8

News mentions

0

No linked articles in our index yet.