Apache Zeppelin: raft directory listing and file read
Description
The attacker can use the raft server protocol in an unauthenticated way. The attacker can see the server's resources, including directories and files.
This issue affects Apache Zeppelin: from 0.10.1 up to 0.12.0.
Users are recommended to upgrade to version 0.12.0, which fixes the issue by removing the Cluster Interpreter.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Unauthenticated attacker can list directories and read files via raft server protocol in Apache Zeppelin 0.10.1 to before 0.12.0; upgrade fixes by removing cluster interpreter.
Vulnerability
Overview CVE-2024-41169 affects Apache Zeppelin versions 0.10.1 up to (but excluding) 0.12.0. The vulnerability stems from the raft server protocol being accessible without authentication. This allows an attacker to enumerate server resources, including directory listings and file contents, via the unsecured protocol endpoint [1][2].
Attack
Vector Exploitation requires no authentication; an attacker with network access to the Zeppelin server can send crafted raft protocol requests. No special privileges or user interaction is needed. The attack surface is the raft protocol listener, which is part of the cluster interpreter feature [3].
Impact
An unauthenticated attacker can read arbitrary files on the server, potentially exposing sensitive configuration files, credentials, or application data. Directory listing reveals the server's file structure, aiding further attacks [2][4].
Mitigation
The vulnerability is fixed in Apache Zeppelin 0.12.0, which removes the cluster interpreter and raft dependencies entirely [4]. Users should upgrade immediately. No workarounds are mentioned; upgrading is the recommended action [1][3].
AI Insight generated on May 19, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.apache.zeppelin:zeppelin-interpreterMaven | >= 0.10.1, < 0.12.0 | 0.12.0 |
org.apache.zeppelin:zeppelin-serverMaven | >= 0.10.1, < 0.12.0 | 0.12.0 |
Affected products
2- Apache Software Foundation/Apache Zeppelinv5Range: 0.10.1
Patches
23 files changed · +7 −5
docs/development/contribution/how_to_contribute_code.md+2 −2 modified@@ -54,10 +54,10 @@ Get the source code on your development machine using git. git clone git://gitbox.apache.org/repos/asf/zeppelin.git zeppelin ``` -You may also want to develop against a specific branch. For example, for branch-0.11.0 +You may also want to develop against a specific branch. For example, for branch-0.12 ```bash -git clone -b branch-0.11.0 git://gitbox.apache.org/repos/asf/zeppelin.git zeppelin +git clone -b branch-0.12 git://gitbox.apache.org/repos/asf/zeppelin.git zeppelin ``` Apache Zeppelin follows [Fork & Pull](https://github.com/sevntu-checkstyle/sevntu.checkstyle/wiki/Development-workflow-with-Git:-Fork,-Branching,-Commits,-and-Pull-Request) as a source control workflow.
scripts/docker/zeppelin/bin/Dockerfile+4 −1 modified@@ -51,8 +51,11 @@ RUN set -ex && \ export PATH=/opt/conda/bin:$PATH && \ conda config --set always_yes yes --set changeps1 no && \ conda info -a && \ + conda config --remove channels defaults && \ + conda config --add channels conda-forge && \ + conda config --set channel_priority strict && \ conda install mamba -c conda-forge && \ - mamba env update -f /env_python_3_with_R.yml --prune && \ + mamba env create -f /env_python_3_with_R.yml -c conda-forge && \ # Cleanup rm -v miniconda.sh anaconda.sha256 && \ # Cleanup based on https://github.com/ContinuumIO/docker-images/commit/cac3352bf21a26fa0b97925b578fb24a0fe8c383
scripts/docker/zeppelin/bin/env_python_3_with_R.yml+1 −2 modified@@ -1,9 +1,8 @@ name: python_3_with_R channels: - conda-forge - - defaults dependencies: - - python=3.7 + - python=3.9 - pycodestyle - scipy - numpy=1.19.5
ca4cc5a5dd02[ZEPPELIN-6101] Remove cluster interpreter
56 files changed · +11 −6146
conf/zeppelin-site.xml.template+0 −6 modified@@ -31,12 +31,6 @@ <description>Server port.</description> </property> -<property> - <name>zeppelin.cluster.addr</name> - <value></value> - <description>Server cluster address, eg. 127.0.0.1:6000,127.0.0.2:6000,127.0.0.3:6000</description> -</property> - <property> <name>zeppelin.server.ssl.port</name> <value>8443</value>
flink/flink-scala-2.12/pom.xml+0 −33 modified@@ -42,7 +42,6 @@ <hive.guava.version>14.0.1</hive.guava.version> <derby.version>10.14.2.0</derby.version> <hiverunner.version>5.3.0</hiverunner.version> - <grpc.version>1.15.0</grpc.version> <flink.bin.download.url>https://archive.apache.org/dist/flink/flink-${flink.version}/flink-${flink.version}-bin-scala_${flink.scala.binary.version}.tgz</flink.bin.download.url> </properties> @@ -78,10 +77,6 @@ <artifactId>zeppelin-python</artifactId> <version>${project.version}</version> <exclusions> - <exclusion> - <groupId>io.atomix</groupId> - <artifactId>*</artifactId> - </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> @@ -94,20 +89,6 @@ <artifactId>zeppelin-interpreter</artifactId> <version>${project.version}</version> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>io.atomix</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> @@ -117,18 +98,10 @@ <classifier>tests</classifier> <scope>test</scope> <exclusions> - <exclusion> - <groupId>io.atomix</groupId> - <artifactId>*</artifactId> - </exclusion> <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </exclusion> - <exclusion> - <groupId>io.grpc</groupId> - <artifactId>*</artifactId> - </exclusion> </exclusions> </dependency> @@ -292,12 +265,6 @@ <artifactId>guava</artifactId> <version>${hive.guava.version}</version> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> </dependency> <dependency>
zeppelin-integration/pom.xml+0 −32 modified@@ -51,12 +51,6 @@ <groupId>org.seleniumhq.selenium</groupId> <artifactId>selenium-java</artifactId> <version>${selenium.java.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </exclusion> - </exclusions> <scope>test</scope> </dependency> <dependency> @@ -79,20 +73,6 @@ <groupId>${project.groupId}</groupId> <artifactId>zeppelin-zengine</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - </exclusions> <scope>test</scope> </dependency> @@ -101,24 +81,12 @@ <artifactId>spark-interpreter</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-shell</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.zeppelin</groupId>
zeppelin-interpreter-parent/pom.xml+0 −10 modified@@ -43,16 +43,6 @@ <artifactId>zeppelin-interpreter</artifactId> <version>${project.version}</version> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>io.atomix</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> </dependency> <dependency>
zeppelin-interpreter/pom.xml+0 −25 modified@@ -40,7 +40,6 @@ <!-- must match with maven version --> <sisu.plexus.version>0.3.4</sisu.plexus.version> <jline.version>2.14.3</jline.version> - <atomix.version>3.0.0-rc5</atomix.version> </properties> <dependencies> @@ -50,30 +49,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix</artifactId> - <version>${atomix.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-raft</artifactId> - <version>${atomix.version}</version> - </dependency> - - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-primary-backup</artifactId> - <version>${atomix.version}</version> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId>
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java+0 −44 removed@@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import io.atomix.cluster.messaging.BroadcastService; - -import java.util.function.Consumer; - -/** - * Broadcast Service Adapter - * Service for broadcast messaging between nodes. - * The broadcast service is an unreliable broadcast messaging service backed by multicast. - * This service provides no guaranteed regarding reliability or order of messages. - */ -public class BroadcastServiceAdapter implements BroadcastService { - @Override - public void broadcast(String subject, byte[] message) { - - } - - @Override - public void addListener(String subject, Consumer<byte[]> listener) { - - } - - @Override - public void removeListener(String subject, Consumer<byte[]> listener) { - - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java+0 −26 removed@@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.cluster; - -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; - -public interface ClusterCallback<T> { - InterpreterClient online(T result); - - void offline(); -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java+0 −80 removed@@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import io.atomix.primitive.PrimitiveState; -import org.apache.zeppelin.conf.ZeppelinConfiguration; - -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; - -/** - * Cluster management client class instantiated in zeppelin-interperter - */ -public class ClusterManagerClient extends ClusterManager { - private static ClusterManagerClient instance = null; - - public static ClusterManagerClient getInstance(ZeppelinConfiguration zConf) { - synchronized (ClusterManagerClient.class) { - if (instance == null) { - instance = new ClusterManagerClient(zConf); - } - return instance; - } - } - - public ClusterManagerClient(ZeppelinConfiguration zConf) { - super(zConf); - } - - @Override - public boolean raftInitialized() { - if (null != raftClient && null != raftSessionClient - && raftSessionClient.getState() == PrimitiveState.CONNECTED) { - return true; - } - - return false; - } - - @Override - public boolean isClusterLeader() { - return false; - } - - // In the ClusterManagerClient metaKey equal interperterGroupId - public void start(String metaKey) { - if (!zConf.isClusterMode()) { - return; - } - super.start(); - - // Instantiated cluster monitoring class - clusterMonitor = new ClusterMonitor(this, zConf); - clusterMonitor.start(INTP_PROCESS_META, metaKey); - } - - @Override - public void shutdown() { - if (!zConf.isClusterMode()) { - return; - } - clusterMonitor.shutdown(); - - super.shutdown(); - instance = null; - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java+0 −600 removed@@ -1,600 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.common.collect.Maps; -import io.atomix.cluster.MemberId; -import io.atomix.cluster.Node; -import io.atomix.cluster.messaging.MessagingService; -import io.atomix.cluster.messaging.impl.NettyMessagingService; -import io.atomix.primitive.operation.OperationType; -import io.atomix.primitive.operation.PrimitiveOperation; -import io.atomix.primitive.operation.impl.DefaultOperationId; -import io.atomix.primitive.partition.PartitionId; -import io.atomix.primitive.service.ServiceConfig; -import io.atomix.primitive.session.SessionClient; -import io.atomix.primitive.session.SessionId; -import io.atomix.protocols.raft.RaftClient; -import io.atomix.protocols.raft.RaftError; -import io.atomix.protocols.raft.ReadConsistency; -import io.atomix.protocols.raft.cluster.RaftMember; -import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember; -import io.atomix.protocols.raft.protocol.CloseSessionRequest; -import io.atomix.protocols.raft.protocol.CloseSessionResponse; -import io.atomix.protocols.raft.protocol.KeepAliveRequest; -import io.atomix.protocols.raft.protocol.KeepAliveResponse; -import io.atomix.protocols.raft.protocol.QueryRequest; -import io.atomix.protocols.raft.protocol.QueryResponse; -import io.atomix.protocols.raft.protocol.CommandRequest; -import io.atomix.protocols.raft.protocol.CommandResponse; -import io.atomix.protocols.raft.protocol.MetadataRequest; -import io.atomix.protocols.raft.protocol.MetadataResponse; -import io.atomix.protocols.raft.protocol.JoinRequest; -import io.atomix.protocols.raft.protocol.JoinResponse; -import io.atomix.protocols.raft.protocol.LeaveRequest; -import io.atomix.protocols.raft.protocol.LeaveResponse; -import io.atomix.protocols.raft.protocol.ConfigureRequest; -import io.atomix.protocols.raft.protocol.ConfigureResponse; -import io.atomix.protocols.raft.protocol.ReconfigureRequest; -import io.atomix.protocols.raft.protocol.ReconfigureResponse; -import io.atomix.protocols.raft.protocol.InstallRequest; -import io.atomix.protocols.raft.protocol.InstallResponse; -import io.atomix.protocols.raft.protocol.PollRequest; -import io.atomix.protocols.raft.protocol.PollResponse; -import io.atomix.protocols.raft.protocol.VoteRequest; -import io.atomix.protocols.raft.protocol.VoteResponse; -import io.atomix.protocols.raft.protocol.AppendRequest; -import io.atomix.protocols.raft.protocol.AppendResponse; -import io.atomix.protocols.raft.protocol.PublishRequest; -import io.atomix.protocols.raft.protocol.ResetRequest; -import io.atomix.protocols.raft.protocol.RaftResponse; -import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry; -import io.atomix.protocols.raft.storage.log.entry.CommandEntry; -import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry; -import io.atomix.protocols.raft.storage.log.entry.InitializeEntry; -import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry; -import io.atomix.protocols.raft.storage.log.entry.MetadataEntry; -import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry; -import io.atomix.protocols.raft.storage.log.entry.QueryEntry; -import io.atomix.protocols.raft.protocol.OpenSessionRequest; -import io.atomix.protocols.raft.protocol.OpenSessionResponse; -import io.atomix.protocols.raft.protocol.RaftClientProtocol; -import io.atomix.protocols.raft.session.CommunicationStrategy; -import io.atomix.protocols.raft.storage.system.Configuration; -import io.atomix.utils.net.Address; -import io.atomix.utils.serializer.Namespace; -import io.atomix.utils.serializer.Serializer; -import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaEntity; -import org.apache.zeppelin.cluster.meta.ClusterMetaOperation; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory; -import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.time.Instant; - -import java.time.LocalDateTime; -import java.util.Collections; -import java.util.Map; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; - -import static io.atomix.primitive.operation.PrimitiveOperation.operation; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS; -import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION; -import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION; -import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; - -/** - * The base class for cluster management, including the following implementations - * 1. RaftClient as the raft client - * 2. Threading to provide retry after cluster metadata submission failure - * 3. Cluster monitoring - */ -public abstract class ClusterManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); - - protected final ZeppelinConfiguration zConf; - - protected Collection<Node> clusterNodes = new ArrayList<>(); - - protected int raftServerPort = 0; - - protected RaftClient raftClient = null; - protected SessionClient raftSessionClient = null; - protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>(); - protected LocalRaftProtocolFactory protocolFactory - = new LocalRaftProtocolFactory(protocolSerializer); - protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>(); - - protected AtomicBoolean running = new AtomicBoolean(true); - - // Write data through the queue to prevent failure due to network exceptions - private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue - = new ConcurrentLinkedQueue<>(); - - // zeppelin server host & port - protected String zeplServerHost = ""; - - protected ClusterMonitor clusterMonitor = null; - - protected boolean isTest = false; - - public ClusterManager(ZeppelinConfiguration zConf) { - this.zConf = zConf; - try { - zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); - String clusterAddr = this.zConf.getClusterAddress(); - if (!StringUtils.isEmpty(clusterAddr)) { - String cluster[] = clusterAddr.split(","); - - for (int i = 0; i < cluster.length; i++) { - String[] parts = cluster[i].split(":"); - String clusterHost = parts[0]; - int clusterPort = Integer.valueOf(parts[1]); - if (zeplServerHost.equalsIgnoreCase(clusterHost)) { - raftServerPort = clusterPort; - } - - String memberId = clusterHost + ":" + clusterPort; - Address address = Address.from(clusterHost, clusterPort); - Node node = Node.builder().withId(memberId).withAddress(address).build(); - clusterNodes.add(node); - raftAddressMap.put(MemberId.from(memberId), address); - clusterMemberIds.add(MemberId.from(memberId)); - } - } else { - throw new RuntimeException("No zeppelin.cluster.addr specified in zeppelin-site.xml"); - } - } catch (UnknownHostException e) { - LOGGER.error(e.getMessage()); - } catch (SocketException e) { - LOGGER.error(e.getMessage()); - } - } - - // Check if the raft environment is initialized - public abstract boolean raftInitialized(); - // Is it a cluster leader - public abstract boolean isClusterLeader(); - - public AtomicBoolean getRunning() { - return running; - } - - private SessionClient createProxy(RaftClient client) { - return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME, - ClusterPrimitiveType.INSTANCE, new ServiceConfig()) - .withReadConsistency(ReadConsistency.SEQUENTIAL) - .withCommunicationStrategy(CommunicationStrategy.LEADER) - .build() - .connect() - .join(); - } - - public void start() { - if (!zConf.isClusterMode()) { - return; - } - - // RaftClient Thread - new Thread(new Runnable() { - @Override - public void run() { - LOGGER.info("RaftClientThread run() >>>"); - - int raftClientPort = 0; - try { - raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - - MemberId memberId = MemberId.from(zeplServerHost + ":" + raftClientPort); - Address address = Address.from(zeplServerHost, raftClientPort); - raftAddressMap.put(memberId, address); - - MessagingService messagingManager - = NettyMessagingService.builder().withAddress(address).build().start().join(); - RaftClientProtocol protocol = new RaftClientMessagingProtocol( - messagingManager, protocolSerializer, raftAddressMap::get); - - raftClient = RaftClient.builder() - .withMemberId(memberId) - .withPartitionId(PartitionId.from("partition", 1)) - .withProtocol(protocol) - .build(); - - raftClient.connect(clusterMemberIds).join(); - - raftSessionClient = createProxy(raftClient); - - LOGGER.info("RaftClientThread run() <<<"); - } - }).start(); - - // Cluster Meta Consume Thread - new Thread(new Runnable() { - @Override - public void run() { - try { - while (getRunning().get()) { - ClusterMetaEntity metaEntity = clusterMetaQueue.peek(); - if (null != metaEntity) { - // Determine whether the client is connected - int retry = 0; - while (!raftInitialized()) { - retry++; - if (0 == retry % 30) { - LOGGER.warn("Raft incomplete initialization! retry[{}]", retry); - } - Thread.sleep(100); - } - boolean success = false; - switch (metaEntity.getOperation()) { - case DELETE_OPERATION: - success = deleteClusterMeta(metaEntity); - break; - case PUT_OPERATION: - success = putClusterMeta(metaEntity); - break; - } - if (true == success) { - // The operation was successfully deleted - clusterMetaQueue.remove(metaEntity); - LOGGER.info("Cluster Meta Consume success! {}", metaEntity); - } else { - LOGGER.error("Cluster Meta Consume faild!"); - } - } else { - Thread.sleep(100); - } - } - } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); - } - } - }).start(); - } - - // cluster shutdown - public void shutdown() { - if (!zConf.isClusterMode()) { - return; - } - - running.set(false); - - try { - if (null != raftSessionClient) { - raftSessionClient.close().get(3, TimeUnit.SECONDS); - } - if (null != raftClient) { - raftClient.close().get(3, TimeUnit.SECONDS); - } - } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); - } catch (ExecutionException e) { - LOGGER.error(e.getMessage()); - } catch (TimeoutException e) { - LOGGER.error(e.getMessage()); - } - } - - public String getClusterNodeName() { - if (isTest) { - // Start three cluster servers in the test case at the same time, - // need to avoid duplicate names - return this.zeplServerHost + ":" + this.raftServerPort; - } - - String hostName = ""; - try { - InetAddress addr = InetAddress.getLocalHost(); - hostName = addr.getHostName().toString(); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - return hostName; - } - - // put metadata into cluster metadata - private boolean putClusterMeta(ClusterMetaEntity entity) { - if (!raftInitialized()) { - LOGGER.error("Raft incomplete initialization!"); - return false; - } - - ClusterMetaType metaType = entity.getMetaType(); - String metaKey = entity.getKey(); - Map<String, Object> newMetaValue = entity.getValues(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("putClusterMeta {} {}", metaType, metaKey); - } - - // add cluster name - newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost); - newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort); - - raftSessionClient.execute(operation(ClusterStateMachine.PUT, - clientSerializer.encode(entity))) - .<Long>thenApply(clientSerializer::decode); - return true; - } - - // put metadata into cluster metadata - public void putClusterMeta(ClusterMetaType type, String key, Map<String, Object> values) { - ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values); - - boolean result = putClusterMeta(metaEntity); - if (false == result) { - LOGGER.warn("putClusterMeta failure, Cache metadata to queue."); - clusterMetaQueue.add(metaEntity); - } - } - - // delete metadata by cluster metadata - private boolean deleteClusterMeta(ClusterMetaEntity entity) { - ClusterMetaType metaType = entity.getMetaType(); - String metaKey = entity.getKey(); - - // Need to pay attention to delete metadata operations - LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey); - - if (!raftInitialized()) { - LOGGER.error("Raft incomplete initialization!"); - return false; - } - - raftSessionClient.execute(operation( - ClusterStateMachine.REMOVE, - clientSerializer.encode(entity))) - .<Long>thenApply(clientSerializer::decode) - .thenAccept(result -> { - LOGGER.info("deleteClusterMeta {}", result); - }); - - return true; - } - - // delete metadata from cluster metadata - public void deleteClusterMeta(ClusterMetaType type, String key) { - ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null); - - boolean result = deleteClusterMeta(metaEntity); - if (false == result) { - LOGGER.warn("deleteClusterMeta faild, Cache data to queue."); - clusterMetaQueue.add(metaEntity); - } - } - - // get metadata by cluster metadata - public Map<String, Map<String, Object>> getClusterMeta( - ClusterMetaType metaType, String metaKey) { - Map<String, Map<String, Object>> clusterMeta = new HashMap<>(); - if (!raftInitialized()) { - LOGGER.error("Raft incomplete initialization!"); - return clusterMeta; - } - - ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null); - - byte[] mateData = null; - try { - mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET, - clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage()); - } catch (ExecutionException e) { - LOGGER.error(e.getMessage()); - } catch (TimeoutException e) { - LOGGER.error(e.getMessage()); - } - - if (null != mateData) { - clusterMeta = clientSerializer.decode(mateData); - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("getClusterMeta >>> {}", clusterMeta); - } - - return clusterMeta; - } - - public InterpreterClient getIntpProcessStatus(String intpName, - int timeout, - ClusterCallback<Map<String, Object>> callback) { - final int CHECK_META_INTERVAL = 1000; - int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL; - int retryGetMeta = 0; - while (retryGetMeta++ < MAX_RETRY_GET_META) { - Map<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName); - if (interpreterMetaOnline(intpMeta)) { - // connect exist Interpreter Process - String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST); - int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT); - LOGGER.info("interpreter thrift {}:{} service is online!", intpTSrvHost, intpTSrvPort); - - // Check if the interpreter thrift service is available - boolean remoteIntpAccessible = - RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(intpTSrvHost, intpTSrvPort); - if (remoteIntpAccessible) { - LOGGER.info("interpreter thrift {}:{} accessible!", intpTSrvHost, intpTSrvPort); - return callback.online(intpMeta); - } else { - LOGGER.error("interpreter thrift {}:{} service is not available!", - intpTSrvHost, intpTSrvPort); - try { - Thread.sleep(CHECK_META_INTERVAL); - LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, intpName); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } else { - try { - Thread.sleep(CHECK_META_INTERVAL); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - } - - LOGGER.error("retry {} times not get {} meta!", retryGetMeta, intpName); - callback.offline(); - return null; - } - - // Check if the interpreter is online - private boolean interpreterMetaOnline(Map<String, Object> intpProcMeta) { - if (null != intpProcMeta - && intpProcMeta.containsKey(INTP_TSERVER_HOST) - && intpProcMeta.containsKey(INTP_TSERVER_PORT) - && intpProcMeta.containsKey(STATUS) - && StringUtils.equals((String) intpProcMeta.get(STATUS), ONLINE_STATUS)) { - return true; - } - - return false; - } - - protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder() - .register(OpenSessionRequest.class) - .register(OpenSessionResponse.class) - .register(CloseSessionRequest.class) - .register(CloseSessionResponse.class) - .register(KeepAliveRequest.class) - .register(KeepAliveResponse.class) - .register(QueryRequest.class) - .register(QueryResponse.class) - .register(CommandRequest.class) - .register(CommandResponse.class) - .register(MetadataRequest.class) - .register(MetadataResponse.class) - .register(JoinRequest.class) - .register(JoinResponse.class) - .register(LeaveRequest.class) - .register(LeaveResponse.class) - .register(ConfigureRequest.class) - .register(ConfigureResponse.class) - .register(ReconfigureRequest.class) - .register(ReconfigureResponse.class) - .register(InstallRequest.class) - .register(InstallResponse.class) - .register(PollRequest.class) - .register(PollResponse.class) - .register(VoteRequest.class) - .register(VoteResponse.class) - .register(AppendRequest.class) - .register(AppendResponse.class) - .register(PublishRequest.class) - .register(ResetRequest.class) - .register(RaftResponse.Status.class) - .register(RaftError.class) - .register(RaftError.Type.class) - .register(PrimitiveOperation.class) - .register(ReadConsistency.class) - .register(byte[].class) - .register(long[].class) - .register(CloseSessionEntry.class) - .register(CommandEntry.class) - .register(ConfigurationEntry.class) - .register(InitializeEntry.class) - .register(KeepAliveEntry.class) - .register(MetadataEntry.class) - .register(OpenSessionEntry.class) - .register(QueryEntry.class) - .register(PrimitiveOperation.class) - .register(DefaultOperationId.class) - .register(OperationType.class) - .register(ReadConsistency.class) - .register(ArrayList.class) - .register(HashMap.class) - .register(ClusterMetaEntity.class) - .register(LocalDateTime.class) - .register(Collections.emptyList().getClass()) - .register(HashSet.class) - .register(DefaultRaftMember.class) - .register(MemberId.class) - .register(SessionId.class) - .register(RaftMember.Type.class) - .register(Instant.class) - .register(Configuration.class) - .build()); - - protected static final Serializer storageSerializer = Serializer.using(Namespace.builder() - .register(CloseSessionEntry.class) - .register(CommandEntry.class) - .register(ConfigurationEntry.class) - .register(InitializeEntry.class) - .register(KeepAliveEntry.class) - .register(MetadataEntry.class) - .register(OpenSessionEntry.class) - .register(QueryEntry.class) - .register(PrimitiveOperation.class) - .register(DefaultOperationId.class) - .register(OperationType.class) - .register(ReadConsistency.class) - .register(ArrayList.class) - .register(ClusterMetaEntity.class) - .register(HashMap.class) - .register(HashSet.class) - .register(LocalDateTime.class) - .register(DefaultRaftMember.class) - .register(MemberId.class) - .register(RaftMember.Type.class) - .register(Instant.class) - .register(Configuration.class) - .register(byte[].class) - .register(long[].class) - .build()); - - protected static final Serializer clientSerializer = Serializer.using(Namespace.builder() - .register(ReadConsistency.class) - .register(ClusterMetaEntity.class) - .register(ClusterMetaOperation.class) - .register(ClusterMetaType.class) - .register(HashMap.class) - .register(LocalDateTime.class) - .register(Maps.immutableEntry(new String(), new Object()).getClass()) - .build()); -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java+0 −390 removed@@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; -import io.atomix.cluster.*; -import io.atomix.cluster.discovery.BootstrapDiscoveryProvider; -import io.atomix.cluster.impl.DefaultClusterMembershipService; -import io.atomix.cluster.impl.DefaultNodeDiscoveryService; -import io.atomix.cluster.messaging.BroadcastService; -import io.atomix.cluster.messaging.MessagingService; -import io.atomix.cluster.messaging.impl.NettyMessagingService; -import io.atomix.primitive.PrimitiveState; -import io.atomix.protocols.raft.RaftServer; -import io.atomix.protocols.raft.protocol.RaftServerProtocol; -import io.atomix.protocols.raft.storage.RaftStorage; -import io.atomix.storage.StorageLevel; -import io.atomix.utils.net.Address; -import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; - -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META; - -/** - * Cluster management server class instantiated in zeppelin-server - * 1. Create a raft server - * 2. Remotely create interpreter's thrift service - */ -public class ClusterManagerServer extends ClusterManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class); - - private static ClusterManagerServer instance = null; - - // raft server - protected RaftServer raftServer = null; - - protected MessagingService messagingService = null; - - private List<ClusterEventListener> clusterIntpEventListeners = new ArrayList<>(); - private List<ClusterEventListener> clusterNoteEventListeners = new ArrayList<>(); - private List<ClusterEventListener> clusterAuthEventListeners = new ArrayList<>(); - private List<ClusterEventListener> clusterIntpSettingEventListeners = new ArrayList<>(); - - // zeppelin cluster event - public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC"; - public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC"; - public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC"; - public static String CLUSTER_INTP_SETTING_EVENT_TOPIC = "CLUSTER_INTP_SETTING_EVENT_TOPIC"; - - private ClusterManagerServer(ZeppelinConfiguration zConf) { - super(zConf); - } - - public static ClusterManagerServer getInstance(ZeppelinConfiguration zConf) { - synchronized (ClusterManagerServer.class) { - if (instance == null) { - instance = new ClusterManagerServer(zConf); - } - return instance; - } - } - - @Override - public void start() { - if (!zConf.isClusterMode()) { - return; - } - - initThread(); - - // Instantiated raftServer monitoring class - String clusterName = getClusterNodeName(); - clusterMonitor = new ClusterMonitor(this, zConf); - clusterMonitor.start(SERVER_META, clusterName); - - super.start(); - } - - @VisibleForTesting - public void initTestCluster(String clusterAddrList, String host, int port) { - isTest = true; - this.zeplServerHost = host; - this.raftServerPort = port; - - // clear - clusterNodes.clear(); - raftAddressMap.clear(); - clusterMemberIds.clear(); - - String cluster[] = clusterAddrList.split(","); - for (int i = 0; i < cluster.length; i++) { - String[] parts = cluster[i].split(":"); - String clusterHost = parts[0]; - int clusterPort = Integer.valueOf(parts[1]); - - String memberId = clusterHost + ":" + clusterPort; - Address address = Address.from(clusterHost, clusterPort); - Node node = Node.builder().withId(memberId).withAddress(address).build(); - clusterNodes.add(node); - raftAddressMap.put(MemberId.from(memberId), address); - clusterMemberIds.add(MemberId.from(memberId)); - } - } - - @Override - public boolean raftInitialized() { - if (null != raftServer && raftServer.isRunning() - && null != raftClient && null != raftSessionClient - && raftSessionClient.getState() == PrimitiveState.CONNECTED) { - return true; - } - - return false; - } - - @Override - public boolean isClusterLeader() { - if (null == raftServer - || !raftServer.isRunning() - || !raftServer.isLeader()) { - return false; - } - - return true; - } - - private void initThread() { - // RaftServer Thread - new Thread(new Runnable() { - @Override - public void run() { - LOGGER.info("RaftServer run() >>>"); - - Address address = Address.from(zeplServerHost, raftServerPort); - Member member = Member.builder(MemberId.from(zeplServerHost + ":" + raftServerPort)) - .withAddress(address) - .build(); - messagingService = NettyMessagingService.builder() - .withAddress(address).build().start().join(); - RaftServerProtocol protocol = new RaftServerMessagingProtocol( - messagingService, ClusterManager.protocolSerializer, raftAddressMap::get); - - BootstrapService bootstrapService = new BootstrapService() { - @Override - public MessagingService getMessagingService() { - return messagingService; - } - - @Override - public BroadcastService getBroadcastService() { - return new BroadcastServiceAdapter(); - } - }; - - ManagedClusterMembershipService clusterService = new DefaultClusterMembershipService( - member, - new DefaultNodeDiscoveryService(bootstrapService, member, - new BootstrapDiscoveryProvider(clusterNodes)), - bootstrapService, - new MembershipConfig()); - - File atomixDateDir = com.google.common.io.Files.createTempDir(); - atomixDateDir.deleteOnExit(); - - RaftServer.Builder builder = RaftServer.builder(member.id()) - .withMembershipService(clusterService) - .withProtocol(protocol) - .withStorage(RaftStorage.builder() - .withStorageLevel(StorageLevel.MEMORY) - .withDirectory(atomixDateDir) - .withSerializer(storageSerializer) - .withMaxSegmentSize(1024 * 1024) - .build()); - - raftServer = builder.build(); - raftServer.bootstrap(clusterMemberIds); - - messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC, - subscribeClusterIntpEvent, MoreExecutors.directExecutor()); - messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC, - subscribeClusterNoteEvent, MoreExecutors.directExecutor()); - messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC, - subscribeClusterAuthEvent, MoreExecutors.directExecutor()); - messagingService.registerHandler(CLUSTER_INTP_SETTING_EVENT_TOPIC, - subscribeIntpSettingEvent, MoreExecutors.directExecutor()); - - HashMap<String, Object> meta = new HashMap<String, Object>(); - String nodeName = getClusterNodeName(); - meta.put(ClusterMeta.NODE_NAME, nodeName); - meta.put(ClusterMeta.SERVER_HOST, zeplServerHost); - meta.put(ClusterMeta.SERVER_PORT, raftServerPort); - meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now()); - putClusterMeta(SERVER_META, nodeName, meta); - - LOGGER.info("RaftServer run() <<<"); - } - }).start(); - } - - @Override - public void shutdown() { - if (!zConf.isClusterMode()) { - return; - } - - try { - // delete local machine meta - deleteClusterMeta(SERVER_META, getClusterNodeName()); - Thread.sleep(300); - if (clusterMonitor != null) { - clusterMonitor.shutdown(); - } - // wait raft commit metadata - Thread.sleep(300); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - - if (null != raftServer && raftServer.isRunning()) { - try { - raftServer.shutdown().get(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } catch (ExecutionException e) { - LOGGER.error(e.getMessage(), e); - } catch (TimeoutException e) { - LOGGER.error(e.getMessage(), e); - } - } - - super.shutdown(); - instance = null; - } - - // Obtain the server node whose resources are idle in the cluster - public Map<String, Object> getIdleNodeMeta() { - Map<String, Object> idleNodeMeta = null; - Map<String, Map<String, Object>> clusterMeta = getClusterMeta(SERVER_META, ""); - - long memoryIdle = 0; - for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { - Map<String, Object> meta = entry.getValue(); - // Check if the service or process is offline - String status = (String) meta.get(ClusterMeta.STATUS); - if (null == status || StringUtils.isEmpty(status) - || status.equals(ClusterMeta.OFFLINE_STATUS)) { - continue; - } - - long memoryCapacity = (long) meta.get(ClusterMeta.MEMORY_CAPACITY); - long memoryUsed = (long) meta.get(ClusterMeta.MEMORY_USED); - long idle = memoryCapacity - memoryUsed; - if (idle > memoryIdle) { - memoryIdle = idle; - idleNodeMeta = meta; - } - } - - return idleNodeMeta; - } - - public void unicastClusterEvent(String host, int port, String topic, String msg) { - LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}", - host, port, topic, msg); - - Address address = Address.from(host, port); - CompletableFuture<byte[]> response = messagingService.sendAndReceive(address, - topic, msg.getBytes(), Duration.ofSeconds(2)); - response.whenComplete((r, e) -> { - if (null == e) { - LOGGER.error(e.getMessage(), e); - } - }); - } - - public void broadcastClusterEvent(String topic, String msg) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("send broadcastClusterEvent message {}", msg); - } - for (Node node : clusterNodes) { - if (StringUtils.equals(node.address().host(), zeplServerHost) - && node.address().port() == raftServerPort) { - // skip myself - continue; - } - - CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(), - topic, msg.getBytes(), Duration.ofSeconds(2)); - response.whenComplete((r, e) -> { - if (null == e) { - LOGGER.error(e.getMessage(), e); - } else { - LOGGER.info("broadcastClusterNoteEvent success! {}", msg); - } - }); - } - } - - private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent = (address, data) -> { - String message = new String(data); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("subscribeClusterIntpEvent() {}", message); - } - for (ClusterEventListener eventListener : clusterIntpEventListeners) { - eventListener.onClusterEvent(message); - } - - return null; - }; - - private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent = (address, data) -> { - String message = new String(data); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("subscribeClusterNoteEvent() {}", message); - } - for (ClusterEventListener eventListener : clusterNoteEventListeners) { - eventListener.onClusterEvent(message); - } - - return null; - }; - - private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent = (address, data) -> { - String message = new String(data); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("subscribeClusterAuthEvent() {}", message); - } - for (ClusterEventListener eventListener : clusterAuthEventListeners) { - eventListener.onClusterEvent(message); - } - - return null; - }; - - private BiFunction<Address, byte[], byte[]> subscribeIntpSettingEvent = (address, data) -> { - String message = new String(data); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("subscribeIntpSettingEvent() {}", message); - } - for (ClusterEventListener eventListener : clusterIntpSettingEventListeners) { - eventListener.onClusterEvent(message); - } - - return null; - }; - - public void addClusterEventListeners(String topic, ClusterEventListener listener) { - if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) { - clusterIntpEventListeners.add(listener); - } else if (StringUtils.equals(topic, CLUSTER_NOTE_EVENT_TOPIC)) { - clusterNoteEventListeners.add(listener); - } else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) { - clusterAuthEventListeners.add(listener); - } else if (StringUtils.equals(topic, CLUSTER_INTP_SETTING_EVENT_TOPIC)) { - clusterIntpSettingEventListeners.add(listener); - } else { - LOGGER.error("Unknow cluster event topic : {}", topic); - } - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java+0 −258 removed@@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.sun.management.OperatingSystemMXBean; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.management.ManagementFactory; -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META; - -/** - * cluster monitoring - * 1. cluster monitoring is also used for zeppelin-Server and zeppelin Interperter, - * distinguish by member variable ClusterMetaType - * 2. Report the average of the server resource CPU and MEMORY usage in the - * last few minutes to smooth the server's instantaneous peak - * 3. checks the heartbeat timeout of the zeppelin-server and interperter processes - */ -public class ClusterMonitor { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMonitor.class); - - // Whether the thread has started - private static AtomicBoolean running = new AtomicBoolean(true); - - private ClusterManager clusterManager = null; - - // Save the CPU resource and MEmory usage of the server resources - // in the last few minutes through the queue, and then average them through the queue. - private Queue<UsageUtil> monitorUsageQueues = new LinkedList<>(); - private final int USAGE_QUEUE_LIMIT = 100; // queue length - private int heartbeatInterval = 3000; // Heartbeat reporting interval(milliseconds) - - // The zeppelin-server leader checks the heartbeat timeout of - // the zeppelin-server and zeppelin-interperter processes in the cluster metadata. - // If this time is exceeded, the zeppelin-server and interperter processes - // can have an exception and no heartbeat is reported. - private int heartbeatTimeout = 9000; - - // Type of cluster monitoring object - private ClusterMetaType clusterMetaType; - - // The key of the cluster monitoring object, - // the name of the cluster when monitoring the zeppelin-server, - // and the interperterGroupID when monitoring the interperter processes - private String metaKey; - - private final ZeppelinConfiguration zConf; - - public ClusterMonitor(ClusterManager clusterManagerServer, ZeppelinConfiguration zConf) { - this.clusterManager = clusterManagerServer; - this.zConf = zConf; - - heartbeatInterval = zConf.getClusterHeartbeatInterval(); - heartbeatTimeout = zConf.getClusterHeartbeatTimeout(); - - if (heartbeatTimeout < heartbeatInterval) { - LOGGER.error("Heartbeat timeout must be greater than heartbeat period."); - heartbeatTimeout = heartbeatInterval * 3; - LOGGER.info("Heartbeat timeout is modified to 3 times the heartbeat period."); - } - - if (heartbeatTimeout < heartbeatInterval * 3) { - LOGGER.warn("Heartbeat timeout recommended than 3 times the heartbeat period."); - } - } - - // - public void start(ClusterMetaType clusterMetaType, String metaKey) { - this.clusterMetaType = clusterMetaType; - this.metaKey = metaKey; - - new Thread(new Runnable() { - @Override - public void run() { - while (running.get()) { - switch (clusterMetaType) { - case SERVER_META: - sendMachineUsage(); - checkHealthy(); - break; - case INTP_PROCESS_META: - sendHeartbeat(); - break; - default: - LOGGER.error("unknown cluster meta type:{}", clusterMetaType); - break; - } - - try { - Thread.sleep(heartbeatInterval); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }).start(); - } - - public void shutdown() { - running.set(false); - } - - // Check the healthy of each service and interperter instance - private void checkHealthy() { - // only leader check cluster healthy - if (!clusterManager.isClusterLeader()) { - return; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("checkHealthy()"); - } - - LocalDateTime now = LocalDateTime.now(); - // check machine mate - for (ClusterMetaType metaType : ClusterMetaType.values()) { - Map<String, Map<String, Object>> clusterMeta - = clusterManager.getClusterMeta(metaType, ""); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("clusterMeta : {}", clusterMeta); - } - - for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { - String key = entry.getKey(); - Map<String, Object> meta = entry.getValue(); - - // Metadata that has been offline is not processed - String status = (String) meta.get(ClusterMeta.STATUS); - if (status.equals(ClusterMeta.OFFLINE_STATUS)) { - continue; - } - - Object heartbeat = meta.get(ClusterMeta.LATEST_HEARTBEAT); - if (heartbeat instanceof LocalDateTime) { - LocalDateTime dHeartbeat = (LocalDateTime) heartbeat; - Duration duration = Duration.between(dHeartbeat, now); - long timeInterval = duration.getSeconds() * 1000; // Convert to milliseconds - if (timeInterval > heartbeatTimeout) { - // Set the metadata for the heartbeat timeout to offline - // Cannot delete metadata - HashMap<String, Object> mapValues = new HashMap<>(); - mapValues.put(ClusterMeta.STATUS, ClusterMeta.OFFLINE_STATUS); - clusterManager.putClusterMeta(metaType, key, mapValues); - LOGGER.warn("offline heartbeat timeout[{}] meta[{}]", dHeartbeat, key); - } - } else { - LOGGER.error("wrong data type"); - } - } - } - } - - // The interpreter process sends a heartbeat to the cluster, - // indicating that the process is still active. - private void sendHeartbeat() { - HashMap<String, Object> mapMonitorUtil = new HashMap<>(); - mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now()); - mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); - - clusterManager.putClusterMeta(INTP_PROCESS_META, metaKey, mapMonitorUtil); - } - - // send the usage of each service - private void sendMachineUsage() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("sendMachineUsage >>>"); - } - - // Limit queue size - while (monitorUsageQueues.size() > USAGE_QUEUE_LIMIT) { - monitorUsageQueues.poll(); - } - UsageUtil monitorUtil = getMachineUsage(); - monitorUsageQueues.add(monitorUtil); - - UsageUtil avgMonitorUtil = new UsageUtil(); - for (UsageUtil monitor : monitorUsageQueues){ - avgMonitorUtil.memoryUsed += monitor.memoryUsed; - avgMonitorUtil.memoryCapacity += monitor.memoryCapacity; - avgMonitorUtil.cpuUsed += monitor.cpuUsed; - avgMonitorUtil.cpuCapacity += monitor.cpuCapacity; - } - - // Resource consumption average - int queueSize = monitorUsageQueues.size(); - avgMonitorUtil.memoryUsed = avgMonitorUtil.memoryUsed / queueSize; - avgMonitorUtil.memoryCapacity = avgMonitorUtil.memoryCapacity / queueSize; - avgMonitorUtil.cpuUsed = avgMonitorUtil.cpuUsed / queueSize; - avgMonitorUtil.cpuCapacity = avgMonitorUtil.cpuCapacity / queueSize; - - HashMap<String, Object> mapMonitorUtil = new HashMap<>(); - mapMonitorUtil.put(ClusterMeta.MEMORY_USED, avgMonitorUtil.memoryUsed); - mapMonitorUtil.put(ClusterMeta.MEMORY_CAPACITY, avgMonitorUtil.memoryCapacity); - mapMonitorUtil.put(ClusterMeta.CPU_USED, avgMonitorUtil.cpuUsed); - mapMonitorUtil.put(ClusterMeta.CPU_CAPACITY, avgMonitorUtil.cpuCapacity); - mapMonitorUtil.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now()); - mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); - - String clusterName = clusterManager.getClusterNodeName(); - clusterManager.putClusterMeta(SERVER_META, clusterName, mapMonitorUtil); - } - - private UsageUtil getMachineUsage() { - OperatingSystemMXBean operatingSystemMXBean - = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); - - // Returns the amount of free physical memory in bytes. - long freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); - - // Returns the total amount of physical memory in bytes. - long totalPhysicalMemorySize = operatingSystemMXBean.getTotalPhysicalMemorySize(); - - // Returns the "recent cpu usage" for the whole system. - double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); - - int process = Runtime.getRuntime().availableProcessors(); - - UsageUtil monitorUtil = new UsageUtil(); - monitorUtil.memoryUsed = totalPhysicalMemorySize - freePhysicalMemorySize; - monitorUtil.memoryCapacity = totalPhysicalMemorySize; - monitorUtil.cpuUsed = (long) (process * systemCpuLoad * 100); - monitorUtil.cpuCapacity = process * 100; - - return monitorUtil; - } - - private class UsageUtil { - private long memoryUsed = 0; - private long memoryCapacity = 0; - private long cpuUsed = 0; - private long cpuCapacity = 0; - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java+0 −57 removed@@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import io.atomix.primitive.PrimitiveBuilder; -import io.atomix.primitive.PrimitiveManagementService; -import io.atomix.primitive.PrimitiveType; -import io.atomix.primitive.config.PrimitiveConfig; -import io.atomix.primitive.service.PrimitiveService; -import io.atomix.primitive.service.ServiceConfig; - -/** - * Cluster primitive type - * Creating a custom distributed primitive is defining the primitive type. - * To create a new type, implement the PrimitiveType interface - */ -public class ClusterPrimitiveType implements PrimitiveType { - public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType(); - - public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE"; - - @Override - public String name() { - return PRIMITIVE_NAME; - } - - @Override - public PrimitiveConfig newConfig() { - throw new UnsupportedOperationException(); - } - - @Override - public PrimitiveBuilder newBuilder(String primitiveName, - PrimitiveConfig config, - PrimitiveManagementService managementService) { - throw new UnsupportedOperationException(); - } - - @Override - public PrimitiveService newService(ServiceConfig config) { - return new ClusterStateMachine(); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java+0 −179 removed@@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.common.collect.Maps; -import io.atomix.primitive.operation.OperationId; -import io.atomix.primitive.service.AbstractPrimitiveService; -import io.atomix.primitive.service.BackupOutput; -import io.atomix.primitive.service.BackupInput; -import io.atomix.primitive.service.Commit; -import io.atomix.primitive.service.ServiceExecutor; -import io.atomix.utils.serializer.Serializer; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaEntity; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Cluster State Machine for Zeppelin - * The cluster state is implemented as a snapshot state machine. - * The state machine stores the service and process metadata information of the cluster. - * Metadata information can be manipulated by put, get, remove, index, and snapshot. - */ -public class ClusterStateMachine extends AbstractPrimitiveService { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStateMachine.class); - private ClusterMeta clusterMeta = new ClusterMeta(); - - // Command to operation a variable in cluster state machine - public static final OperationId PUT = OperationId.command("put"); - public static final OperationId GET = OperationId.query("get"); - public static final OperationId REMOVE = OperationId.command("remove"); - public static final OperationId INDEX = OperationId.command("index"); - - public ClusterStateMachine() { - super(ClusterPrimitiveType.INSTANCE); - } - - @Override - public Serializer serializer() { - return ClusterManager.clientSerializer; - } - - @Override - protected void configure(ServiceExecutor executor) { - executor.register(PUT, this::put); - executor.register(GET, this::get); - executor.register(REMOVE, this::remove); - executor.register(INDEX, this::index); - } - - protected long put(Commit<ClusterMetaEntity> commit) { - clusterMeta.put(commit.value().getMetaType(), - commit.value().getKey(), commit.value().getValues()); - return commit.index(); - } - - protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> commit) { - return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey()); - } - - protected long remove(Commit<ClusterMetaEntity> commit) { - clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey()); - return commit.index(); - } - - protected long index(Commit<Void> commit) { - return commit.index(); - } - - @Override - public void backup(BackupOutput writer) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("ClusterStateMachine.backup()"); - } - - // backup SERVER_META - // cluster meta map struct - // cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} - Map<String, Map<String, Object>> mapServerMeta - = clusterMeta.get(ClusterMetaType.SERVER_META, ""); - // write all SERVER_META size - writer.writeInt(mapServerMeta.size()); - for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) { - // write cluster_name - writer.writeString(entry.getKey()); - - Map<String, Object> kvPairs = entry.getValue(); - // write cluster mate kv pairs size - writer.writeInt(kvPairs.size()); - for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) { - // write cluster mate kv pairs - writer.writeString(entryValue.getKey()); - writer.writeObject(entryValue.getValue()); - } - } - - // backup INTP_PROCESS_META - // Interpreter meta map struct - // IntpGroupId -> {server_tserver_host,server_tserver_port,...} - Map<String, Map<String, Object>> mapIntpProcMeta - = clusterMeta.get(ClusterMetaType.INTP_PROCESS_META, ""); - // write interpreter size - writer.writeInt(mapIntpProcMeta.size()); - for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) { - // write IntpGroupId - writer.writeString(entry.getKey()); - - Map<String, Object> kvPairs = entry.getValue(); - // write interpreter mate kv pairs size - writer.writeInt(kvPairs.size()); - for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) { - // write interpreter mate kv pairs - writer.writeString(entryValue.getKey()); - writer.writeObject(entryValue.getValue()); - } - } - } - - @Override - public void restore(BackupInput reader) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("ClusterStateMachine.restore()"); - } - - clusterMeta = new ClusterMeta(); - // read all SERVER_META size - int nServerMeta = reader.readInt(); - for (int i = 0; i < nServerMeta; i++) { - // read cluster_name - String clusterName = reader.readString(); - - // read cluster mate kv pairs size - int nKVpairs = reader.readInt(); - for (int j = 0; j < nKVpairs; i++) { - // read cluster mate kv pairs - String key = reader.readString(); - Object value = reader.readObject(); - - clusterMeta.put(ClusterMetaType.SERVER_META, - clusterName, Maps.immutableEntry(key, value)); - } - } - - // read all INTP_PROCESS_META size - int nIntpMeta = reader.readInt(); - for (int i = 0; i < nIntpMeta; i++) { - // read interpreter name - String intpName = reader.readString(); - - // read interpreter mate kv pairs size - int nKVpairs = reader.readInt(); - for (int j = 0; j < nKVpairs; i++) { - // read interpreter mate kv pairs - String key = reader.readString(); - Object value = reader.readObject(); - - clusterMeta.put(ClusterMetaType.INTP_PROCESS_META, - intpName, Maps.immutableEntry(key, value)); - } - } - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java+0 −46 removed@@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.event; - -/** - * Cluster Event - */ -public enum ClusterEvent { - // CLUSTER_INTP_EVENT_TOPIC - CREATE_INTP_PROCESS, - // CLUSTER_NOTE_EVENT_TOPIC - BROADCAST_NOTE, - BROADCAST_NOTE_LIST, - BROADCAST_PARAGRAPH, - BROADCAST_PARAGRAPHS, - BROADCAST_NEW_PARAGRAPH, - UPDATE_NOTE_PERMISSIONS, - // CLUSTER_AUTH_EVENT_TOPIC - SET_ROLES, - // (TODO) Consolidate the permission related events into one event - SET_READERS_PERMISSIONS, - SET_RUNNERS_PERMISSIONS, - SET_WRITERS_PERMISSIONS, - SET_OWNERS_PERMISSIONS, - CLEAR_PERMISSION, - // CLUSTER_NBAUTH_EVENT_TOPIC - SET_NEW_NOTE_PERMISSIONS, - // CLUSTER_INTP_SETTING_EVENT_TOPIC - CREATE_INTP_SETTING, - UPDATE_INTP_SETTING, - DELETE_INTP_SETTING, -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java+0 −28 removed@@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.event; - -/** - * Listen for the NEW_NOTE、DEL_NOTE、REMOVE_NOTE_TO_TRASH ... event - * of the notebook in the NotebookServer#onMessage() function. - */ -public interface ClusterEventListener { - public static final String CLUSTER_EVENT = "CLUSTER_EVENT"; - public static final String CLUSTER_EVENT_MSG = "CLUSTER_EVENT_MSG"; - - void onClusterEvent(String msg); -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java+0 −73 removed@@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.event; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import org.apache.zeppelin.display.Input; - -import java.util.HashMap; -import java.util.Map; - -public class ClusterMessage { - public ClusterEvent clusterEvent; - private Map<String, String> data = new HashMap<>(); - private String msgId; - - private static Gson gson = new GsonBuilder() - .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") - .setPrettyPrinting() - .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); - - public ClusterMessage(ClusterEvent event) { - this.clusterEvent = event; - } - - public ClusterMessage put(String k, String v) { - data.put(k, v); - return this; - } - - public ClusterMessage put(Map<String, String> params) { - data.putAll(params); - return this; - } - - public String get(String k) { - return data.get(k); - } - - public Map<String, String> getData() { - return data; - } - - public String getMsgId() { - return msgId; - } - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - public static ClusterMessage deserializeMessage(String msg) { - return gson.fromJson(msg, ClusterMessage.class); - } - - public static String serializeMessage(ClusterMessage m) { - return gson.toJson(m); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java+0 −49 removed@@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.listener; - -import io.atomix.cluster.ClusterMembershipEvent; -import io.atomix.cluster.ClusterMembershipEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Entity capable of receiving device cluster-related events. - * Listen for new zeppelin servers to join or leave the cluster, - * Monitor whether the metadata in the cluster server changes - */ -public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class); - - @Override - public void event(ClusterMembershipEvent event) { - switch (event.type()) { - case MEMBER_ADDED: - LOGGER.info("{} joined the cluster.", event.subject().id()); - break; - case MEMBER_REMOVED: - LOGGER.info("{} left the cluster.", event.subject().id()); - break; - case METADATA_CHANGED: - LOGGER.info("{} meta data changed.", event.subject().id()); - break; - case REACHABILITY_CHANGED: - LOGGER.info("{} reachability changed.", event.subject().id()); - break; - } - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java+0 −58 removed@@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.meta; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -/** - * Cluster operations, cluster types, encapsulation objects for keys and values - */ -public class ClusterMetaEntity implements Serializable { - private ClusterMetaOperation operation; - private ClusterMetaType type; - private String key; - private Map<String, Object> values = new HashMap<>(); - - public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type, - String key, Map<String, Object> values) { - this.operation = operation; - this.type = type; - this.key = key; - - if (null != values) { - this.values.putAll(values); - } - } - - public ClusterMetaOperation getOperation() { - return operation; - } - - public ClusterMetaType getMetaType() { - return type; - } - - public String getKey() { - return key; - } - - public Map<String, Object> getValues() { - return values; - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java+0 −145 removed@@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.meta; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -/** - * Metadata stores metadata information in a KV key-value pair - */ -public class ClusterMeta implements Serializable { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMeta.class); - - // The name of each server node in the cluster - public static final String NODE_NAME = "NODE_NAME"; - - // zeppelin-server meta - public static final String SERVER_HOST = "SERVER_HOST"; - public static final String SERVER_PORT = "SERVER_PORT"; - public static final String SERVER_START_TIME = "SERVER_START_TIME"; - - // interperter-process meta - public static final String INTP_PROCESS_NAME = "INTP_PROCESS_NAME"; - public static final String INTP_TSERVER_HOST = "INTP_TSERVER_HOST"; - public static final String INTP_TSERVER_PORT = "INTP_TSERVER_PORT"; - public static final String INTP_START_TIME = "INTP_START_TIME"; - - // zeppelin-server resource usage - public static final String CPU_CAPACITY = "CPU_CAPACITY"; - public static final String CPU_USED = "CPU_USED"; - public static final String MEMORY_CAPACITY = "MEMORY_CAPACITY"; - public static final String MEMORY_USED = "MEMORY_USED"; - - public static final String LATEST_HEARTBEAT = "LATEST_HEARTBEAT"; - - // zeppelin-server or interperter-process status - public static final String STATUS = "STATUS"; - public static final String ONLINE_STATUS = "ONLINE"; - public static final String OFFLINE_STATUS = "OFFLINE"; - public static final String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT"; - public static final String INTP_PROCESS_LIST = "INTP_PROCESS_LIST"; - - // cluster_name = host:port - // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...} - private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>(); - - // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...} - private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>(); - - public void put(ClusterMetaType type, String key, Object value) { - Map<String, Object> mapValue = (Map<String, Object>) value; - - switch (type) { - case SERVER_META: - // Because it may be partially updated metadata information - if (mapServerMeta.containsKey(key)) { - Map<String, Object> values = mapServerMeta.get(key); - values.putAll(mapValue); - } else { - mapServerMeta.put(key, mapValue); - } - break; - case INTP_PROCESS_META: - if (mapInterpreterMeta.containsKey(key)) { - Map<String, Object> values = mapInterpreterMeta.get(key); - values.putAll(mapValue); - } else { - mapInterpreterMeta.put(key, mapValue); - } - break; - } - } - - public Map<String, Map<String, Object>> get(ClusterMetaType type, String key) { - Map<String, Object> values = null; - - switch (type) { - case SERVER_META: - if (null == key || StringUtils.isEmpty(key)) { - return mapServerMeta; - } - if (mapServerMeta.containsKey(key)) { - values = mapServerMeta.get(key); - } else { - LOGGER.warn("can not find key : {}", key); - } - break; - case INTP_PROCESS_META: - if (null == key || StringUtils.isEmpty(key)) { - return mapInterpreterMeta; - } - if (mapInterpreterMeta.containsKey(key)) { - values = mapInterpreterMeta.get(key); - } else { - LOGGER.warn("can not find key : {}", key); - } - break; - } - - Map<String, Map<String, Object>> result = new HashMap<>(); - result.put(key, values); - - return result; - } - - public Map<String, Object> remove(ClusterMetaType type, String key) { - switch (type) { - case SERVER_META: - if (mapServerMeta.containsKey(key)) { - return mapServerMeta.remove(key); - } else { - LOGGER.warn("can not find key : {}", key); - } - break; - case INTP_PROCESS_META: - if (mapInterpreterMeta.containsKey(key)) { - return mapInterpreterMeta.remove(key); - } else { - LOGGER.warn("can not find key : {}", key); - } - break; - } - - return null; - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaOperation.java+0 −26 removed@@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.meta; - -/** - * Type of cluster metadata operation - */ -public enum ClusterMetaOperation { - GET_OPERATION, - PUT_OPERATION, - DELETE_OPERATION -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java+0 −25 removed@@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.meta; - -/** - * Type of cluster metadata - */ -public enum ClusterMetaType { - SERVER_META, - INTP_PROCESS_META -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java+0 −163 removed@@ -1,163 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import com.google.common.collect.Maps; -import io.atomix.cluster.MemberId; -import io.atomix.primitive.session.SessionId; - -import io.atomix.protocols.raft.protocol.HeartbeatRequest; -import io.atomix.protocols.raft.protocol.PublishRequest; -import io.atomix.protocols.raft.protocol.RaftClientProtocol; -import io.atomix.protocols.raft.protocol.HeartbeatResponse; -import io.atomix.protocols.raft.protocol.OpenSessionResponse; -import io.atomix.protocols.raft.protocol.OpenSessionRequest; -import io.atomix.protocols.raft.protocol.CloseSessionResponse; -import io.atomix.protocols.raft.protocol.CloseSessionRequest; -import io.atomix.protocols.raft.protocol.KeepAliveResponse; -import io.atomix.protocols.raft.protocol.KeepAliveRequest; -import io.atomix.protocols.raft.protocol.QueryResponse; -import io.atomix.protocols.raft.protocol.QueryRequest; -import io.atomix.protocols.raft.protocol.CommandResponse; -import io.atomix.protocols.raft.protocol.CommandRequest; -import io.atomix.protocols.raft.protocol.MetadataResponse; -import io.atomix.protocols.raft.protocol.MetadataRequest; -import io.atomix.protocols.raft.protocol.ResetRequest; -import io.atomix.utils.concurrent.Futures; -import io.atomix.utils.serializer.Serializer; - -import java.net.ConnectException; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Protocol for intercommunication between Raft clients for each server in the cluster. - * Communication protocol for handling sessions, queries, commands, and services within the cluster. - */ -public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol { - private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler; - private final Map<Long, Consumer<PublishRequest>> publishListeners = Maps.newConcurrentMap(); - - public LocalRaftClientProtocol(MemberId memberId, - Serializer serializer, - Map<MemberId, LocalRaftServerProtocol> servers, - Map<MemberId, LocalRaftClientProtocol> clients) { - super(serializer, servers, clients); - clients.put(memberId, this); - } - - private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) { - LocalRaftServerProtocol server = server(memberId); - if (server != null) { - return Futures.completedFuture(server); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, - OpenSessionRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.openSession(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, - CloseSessionRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.closeSession(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, - KeepAliveRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.keepAlive(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.query(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<CommandResponse> command(MemberId memberId, - CommandRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.command(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<MetadataResponse> metadata(MemberId memberId, - MetadataRequest request) { - return getServer(memberId).thenCompose(protocol -> - protocol.metadata(encode(request))).thenApply(this::decode); - } - - CompletableFuture<byte[]> heartbeat(byte[] request) { - if (heartbeatHandler != null) { - return heartbeatHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerHeartbeatHandler(Function<HeartbeatRequest, - CompletableFuture<HeartbeatResponse>> handler) { - this.heartbeatHandler = handler; - } - - @Override - public void unregisterHeartbeatHandler() { - this.heartbeatHandler = null; - } - - @Override - public void reset(Set<MemberId> members, ResetRequest request) { - members.forEach(nodeId -> { - LocalRaftServerProtocol server = server(nodeId); - if (server != null) { - server.reset(request.session(), encode(request)); - } - }); - } - - void publish(long sessionId, byte[] request) { - Consumer<PublishRequest> listener = publishListeners.get(sessionId); - if (listener != null) { - listener.accept(decode(request)); - } - } - - @Override - public void registerPublishListener(SessionId sessionId, - Consumer<PublishRequest> listener, Executor executor) { - publishListeners.put(sessionId.id(), request -> - executor.execute(() -> listener.accept(request))); - } - - @Override - public void unregisterPublishListener(SessionId sessionId) { - publishListeners.remove(sessionId.id()); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocolFactory.java+0 −57 removed@@ -1,57 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import com.google.common.collect.Maps; -import io.atomix.cluster.MemberId; -import io.atomix.protocols.raft.protocol.RaftClientProtocol; -import io.atomix.protocols.raft.protocol.RaftServerProtocol; -import io.atomix.utils.serializer.Serializer; - -import java.util.Map; - -/** - * Cluster Raft protocol factory. - */ -public class LocalRaftProtocolFactory { - private final Serializer serializer; - private final Map<MemberId, LocalRaftServerProtocol> servers = Maps.newConcurrentMap(); - private final Map<MemberId, LocalRaftClientProtocol> clients = Maps.newConcurrentMap(); - - public LocalRaftProtocolFactory(Serializer serializer) { - this.serializer = serializer; - } - - /** - * Returns a new test client protocol. - * - * @param memberId the client member identifier - * @return a new test client protocol - */ - public RaftClientProtocol newClientProtocol(MemberId memberId) { - return new LocalRaftClientProtocol(memberId, serializer, servers, clients); - } - - /** - * Returns a new test server protocol. - * - * @param memberId the server member identifier - * @return a new test server protocol - */ - public RaftServerProtocol newServerProtocol(MemberId memberId) { - return new LocalRaftServerProtocol(memberId, serializer, servers, clients); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftProtocol.java+0 −58 removed@@ -1,58 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import io.atomix.cluster.MemberId; -import io.atomix.utils.serializer.Serializer; - -import java.util.Map; - -/** - * Base class for Raft protocol. - */ -public abstract class LocalRaftProtocol { - private final Serializer serializer; - private final Map<MemberId, LocalRaftServerProtocol> servers; - private final Map<MemberId, LocalRaftClientProtocol> clients; - - public LocalRaftProtocol(Serializer serializer, - Map<MemberId, LocalRaftServerProtocol> servers, - Map<MemberId, LocalRaftClientProtocol> clients) { - this.serializer = serializer; - this.servers = servers; - this.clients = clients; - } - - <T> T copy(T value) { - return serializer.decode(serializer.encode(value)); - } - - byte[] encode(Object value) { - return serializer.encode(value); - } - - <T> T decode(byte[] bytes) { - return serializer.decode(bytes); - } - - LocalRaftServerProtocol server(MemberId memberId) { - return servers.get(memberId); - } - - LocalRaftClientProtocol client(MemberId memberId) { - return clients.get(memberId); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftServerProtocol.java+0 −527 removed@@ -1,527 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import com.google.common.collect.Maps; -import io.atomix.cluster.MemberId; -import io.atomix.primitive.session.SessionId; -import io.atomix.protocols.raft.protocol.RaftServerProtocol; -import io.atomix.protocols.raft.protocol.OpenSessionRequest; -import io.atomix.protocols.raft.protocol.OpenSessionResponse; -import io.atomix.protocols.raft.protocol.CloseSessionRequest; -import io.atomix.protocols.raft.protocol.CloseSessionResponse; -import io.atomix.protocols.raft.protocol.KeepAliveRequest; -import io.atomix.protocols.raft.protocol.KeepAliveResponse; -import io.atomix.protocols.raft.protocol.QueryRequest; -import io.atomix.protocols.raft.protocol.QueryResponse; -import io.atomix.protocols.raft.protocol.CommandRequest; -import io.atomix.protocols.raft.protocol.CommandResponse; -import io.atomix.protocols.raft.protocol.MetadataRequest; -import io.atomix.protocols.raft.protocol.MetadataResponse; -import io.atomix.protocols.raft.protocol.JoinRequest; -import io.atomix.protocols.raft.protocol.JoinResponse; -import io.atomix.protocols.raft.protocol.LeaveRequest; -import io.atomix.protocols.raft.protocol.LeaveResponse; -import io.atomix.protocols.raft.protocol.ConfigureRequest; -import io.atomix.protocols.raft.protocol.ConfigureResponse; -import io.atomix.protocols.raft.protocol.ReconfigureRequest; -import io.atomix.protocols.raft.protocol.ReconfigureResponse; -import io.atomix.protocols.raft.protocol.InstallRequest; -import io.atomix.protocols.raft.protocol.InstallResponse; -import io.atomix.protocols.raft.protocol.PollRequest; -import io.atomix.protocols.raft.protocol.PollResponse; -import io.atomix.protocols.raft.protocol.VoteRequest; -import io.atomix.protocols.raft.protocol.VoteResponse; -import io.atomix.protocols.raft.protocol.TransferRequest; -import io.atomix.protocols.raft.protocol.TransferResponse; -import io.atomix.protocols.raft.protocol.AppendRequest; -import io.atomix.protocols.raft.protocol.AppendResponse; -import io.atomix.protocols.raft.protocol.ResetRequest; -import io.atomix.protocols.raft.protocol.PublishRequest; -import io.atomix.protocols.raft.protocol.HeartbeatResponse; -import io.atomix.protocols.raft.protocol.HeartbeatRequest; - -import io.atomix.utils.concurrent.Futures; -import io.atomix.utils.serializer.Serializer; - -import java.net.ConnectException; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Cluster server protocol. - */ -public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol { - private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> openSessionHandler; - private Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> - closeSessionHandler; - private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> keepAliveHandler; - private Function<QueryRequest, CompletableFuture<QueryResponse>> queryHandler; - private Function<CommandRequest, CompletableFuture<CommandResponse>> commandHandler; - private Function<MetadataRequest, CompletableFuture<MetadataResponse>> metadataHandler; - private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler; - private Function<LeaveRequest, CompletableFuture<LeaveResponse>> leaveHandler; - private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> configureHandler; - private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> reconfigureHandler; - private Function<InstallRequest, CompletableFuture<InstallResponse>> installHandler; - private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler; - private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler; - private Function<TransferRequest, CompletableFuture<TransferResponse>> transferHandler; - private Function<AppendRequest, CompletableFuture<AppendResponse>> appendHandler; - private final Map<Long, Consumer<ResetRequest>> resetListeners = Maps.newConcurrentMap(); - - public LocalRaftServerProtocol(MemberId memberId, Serializer serializer, - Map<MemberId, LocalRaftServerProtocol> servers, - Map<MemberId, LocalRaftClientProtocol> clients) { - super(serializer, servers, clients); - servers.put(memberId, this); - } - - private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) { - LocalRaftServerProtocol server = server(memberId); - if (server != null) { - return Futures.completedFuture(server); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId memberId) { - LocalRaftClientProtocol client = client(memberId); - if (client != null) { - return Futures.completedFuture(client); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, - OpenSessionRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.openSession(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, - CloseSessionRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.closeSession(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, - KeepAliveRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.keepAlive(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.query(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<CommandResponse> command(MemberId memberId, - CommandRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.command(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<MetadataResponse> metadata(MemberId memberId, - MetadataRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.metadata(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.join(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.leave(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<ConfigureResponse> configure(MemberId memberId, - ConfigureRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.configure(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, - ReconfigureRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.reconfigure(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.install(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.install(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.poll(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.vote(encode(request))).thenApply(this::decode); - } - - @Override - public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { - return getServer(memberId).thenCompose(listener -> - listener.append(encode(request))).thenApply(this::decode); - } - - @Override - public void publish(MemberId memberId, PublishRequest request) { - getClient(memberId).thenAccept(protocol -> - protocol.publish(request.session(), encode(request))); - } - - @Override - public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, - HeartbeatRequest request) { - return getClient(memberId).thenCompose(protocol -> - protocol.heartbeat(encode(request))).thenApply(this::decode); - } - - CompletableFuture<byte[]> openSession(byte[] request) { - if (openSessionHandler != null) { - return openSessionHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerOpenSessionHandler(Function<OpenSessionRequest, - CompletableFuture<OpenSessionResponse>> handler) { - this.openSessionHandler = handler; - } - - @Override - public void unregisterOpenSessionHandler() { - this.openSessionHandler = null; - } - - CompletableFuture<byte[]> closeSession(byte[] request) { - if (closeSessionHandler != null) { - return closeSessionHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerCloseSessionHandler(Function<CloseSessionRequest, - CompletableFuture<CloseSessionResponse>> handler) { - this.closeSessionHandler = handler; - } - - @Override - public void unregisterCloseSessionHandler() { - this.closeSessionHandler = null; - } - - CompletableFuture<byte[]> keepAlive(byte[] request) { - if (keepAliveHandler != null) { - return keepAliveHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerKeepAliveHandler(Function<KeepAliveRequest, - CompletableFuture<KeepAliveResponse>> handler) { - this.keepAliveHandler = handler; - } - - @Override - public void unregisterKeepAliveHandler() { - this.keepAliveHandler = null; - } - - CompletableFuture<byte[]> query(byte[] request) { - if (queryHandler != null) { - return queryHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerQueryHandler(Function<QueryRequest, - CompletableFuture<QueryResponse>> handler) { - this.queryHandler = handler; - } - - @Override - public void unregisterQueryHandler() { - this.queryHandler = null; - } - - CompletableFuture<byte[]> command(byte[] request) { - if (commandHandler != null) { - return commandHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerCommandHandler(Function<CommandRequest, - CompletableFuture<CommandResponse>> handler) { - this.commandHandler = handler; - } - - @Override - public void unregisterCommandHandler() { - this.commandHandler = null; - } - - CompletableFuture<byte[]> metadata(byte[] request) { - if (metadataHandler != null) { - return metadataHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerMetadataHandler(Function<MetadataRequest, - CompletableFuture<MetadataResponse>> handler) { - this.metadataHandler = handler; - } - - @Override - public void unregisterMetadataHandler() { - this.metadataHandler = null; - } - - CompletableFuture<byte[]> join(byte[] request) { - if (joinHandler != null) { - return joinHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerJoinHandler(Function<JoinRequest, - CompletableFuture<JoinResponse>> handler) { - this.joinHandler = handler; - } - - @Override - public void unregisterJoinHandler() { - this.joinHandler = null; - } - - CompletableFuture<byte[]> leave(byte[] request) { - if (leaveHandler != null) { - return leaveHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerLeaveHandler(Function<LeaveRequest, - CompletableFuture<LeaveResponse>> handler) { - this.leaveHandler = handler; - } - - @Override - public void unregisterLeaveHandler() { - this.leaveHandler = null; - } - - CompletableFuture<byte[]> configure(byte[] request) { - if (configureHandler != null) { - return configureHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerConfigureHandler(Function<ConfigureRequest, - CompletableFuture<ConfigureResponse>> handler) { - this.configureHandler = handler; - } - - @Override - public void unregisterConfigureHandler() { - this.configureHandler = null; - } - - CompletableFuture<byte[]> reconfigure(byte[] request) { - if (reconfigureHandler != null) { - return reconfigureHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerReconfigureHandler(Function<ReconfigureRequest, - CompletableFuture<ReconfigureResponse>> handler) { - this.reconfigureHandler = handler; - } - - @Override - public void unregisterReconfigureHandler() { - this.reconfigureHandler = null; - } - - CompletableFuture<byte[]> install(byte[] request) { - if (installHandler != null) { - return installHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerInstallHandler(Function<InstallRequest, - CompletableFuture<InstallResponse>> handler) { - this.installHandler = handler; - } - - @Override - public void unregisterInstallHandler() { - this.installHandler = null; - } - - CompletableFuture<byte[]> poll(byte[] request) { - if (pollHandler != null) { - return pollHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerPollHandler(Function<PollRequest, - CompletableFuture<PollResponse>> handler) { - this.pollHandler = handler; - } - - @Override - public void unregisterPollHandler() { - this.pollHandler = null; - } - - CompletableFuture<byte[]> vote(byte[] request) { - if (voteHandler != null) { - return voteHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerVoteHandler(Function<VoteRequest, - CompletableFuture<VoteResponse>> handler) { - this.voteHandler = handler; - } - - @Override - public void unregisterVoteHandler() { - this.voteHandler = null; - } - - @Override - public void registerTransferHandler(Function<TransferRequest, - CompletableFuture<TransferResponse>> handler) { - this.transferHandler = handler; - } - - @Override - public void unregisterTransferHandler() { - this.transferHandler = null; - } - - CompletableFuture<byte[]> transfer(byte[] request) { - if (transferHandler != null) { - return transferHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - CompletableFuture<byte[]> append(byte[] request) { - if (appendHandler != null) { - return appendHandler.apply(decode(request)).thenApply(this::encode); - } else { - return Futures.exceptionalFuture(new ConnectException()); - } - } - - @Override - public void registerAppendHandler(Function<AppendRequest, - CompletableFuture<AppendResponse>> handler) { - this.appendHandler = handler; - } - - @Override - public void unregisterAppendHandler() { - this.appendHandler = null; - } - - void reset(long sessionId, byte[] request) { - Consumer<ResetRequest> listener = resetListeners.get(sessionId); - if (listener != null) { - listener.accept(decode(request)); - } - } - - @Override - public void registerResetListener(SessionId sessionId, - Consumer<ResetRequest> listener, Executor executor) { - resetListeners.put(sessionId.id(), request -> executor.execute(() - -> listener.accept(request))); - } - - @Override - public void unregisterResetListener(SessionId sessionId) { - resetListeners.remove(sessionId.id()); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftClientMessagingProtocol.java+0 −123 removed@@ -1,123 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import io.atomix.cluster.MemberId; -import io.atomix.cluster.messaging.MessagingService; -import io.atomix.primitive.session.SessionId; -import io.atomix.protocols.raft.protocol.OpenSessionRequest; -import io.atomix.protocols.raft.protocol.OpenSessionResponse; -import io.atomix.protocols.raft.protocol.RaftClientProtocol; -import io.atomix.protocols.raft.protocol.HeartbeatRequest; -import io.atomix.protocols.raft.protocol.PublishRequest; -import io.atomix.protocols.raft.protocol.HeartbeatResponse; -import io.atomix.protocols.raft.protocol.CloseSessionResponse; -import io.atomix.protocols.raft.protocol.CloseSessionRequest; -import io.atomix.protocols.raft.protocol.KeepAliveResponse; -import io.atomix.protocols.raft.protocol.KeepAliveRequest; -import io.atomix.protocols.raft.protocol.QueryResponse; -import io.atomix.protocols.raft.protocol.QueryRequest; -import io.atomix.protocols.raft.protocol.CommandResponse; -import io.atomix.protocols.raft.protocol.CommandRequest; -import io.atomix.protocols.raft.protocol.MetadataResponse; -import io.atomix.protocols.raft.protocol.MetadataRequest; -import io.atomix.protocols.raft.protocol.ResetRequest; -import io.atomix.utils.net.Address; -import io.atomix.utils.serializer.Serializer; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Raft client messaging service protocol. - */ -public class RaftClientMessagingProtocol extends RaftMessagingProtocol - implements RaftClientProtocol { - public RaftClientMessagingProtocol(MessagingService messagingService, - Serializer serializer, - Function<MemberId, Address> addressProvider) { - super(messagingService, serializer, addressProvider); - } - - @Override - public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, - OpenSessionRequest request) { - return sendAndReceive(memberId, "open-session", request); - } - - @Override - public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, - CloseSessionRequest request) { - return sendAndReceive(memberId, "close-session", request); - } - - @Override - public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, - KeepAliveRequest request) { - return sendAndReceive(memberId, "keep-alive", request); - } - - @Override - public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { - return sendAndReceive(memberId, "query", request); - } - - @Override - public CompletableFuture<CommandResponse> command(MemberId memberId, - CommandRequest request) { - return sendAndReceive(memberId, "command", request); - } - - @Override - public CompletableFuture<MetadataResponse> metadata(MemberId memberId, - MetadataRequest request) { - return sendAndReceive(memberId, "metadata", request); - } - - @Override - public void registerHeartbeatHandler(Function<HeartbeatRequest, - CompletableFuture<HeartbeatResponse>> handler) { - registerHandler("heartbeat", handler); - } - - @Override - public void unregisterHeartbeatHandler() { - unregisterHandler("heartbeat"); - } - - @Override - public void reset(Set<MemberId> members, ResetRequest request) { - for (MemberId memberId : members) { - sendAsync(memberId, String.format("reset-%d", request.session()), request); - } - } - - @Override - public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, - Executor executor) { - messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> { - listener.accept(serializer.decode(p)); - }, executor); - } - - @Override - public void unregisterPublishListener(SessionId sessionId) { - messagingService.unregisterHandler(String.format("publish-%d", sessionId.id())); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftMessagingProtocol.java+0 −83 removed@@ -1,83 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import io.atomix.cluster.MemberId; -import io.atomix.cluster.messaging.MessagingService; -import io.atomix.utils.concurrent.Futures; -import io.atomix.utils.net.Address; -import io.atomix.utils.serializer.Serializer; - -import java.net.ConnectException; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -/** - * Messaging service based Raft protocol. - */ -public abstract class RaftMessagingProtocol { - protected final MessagingService messagingService; - protected final Serializer serializer; - private final Function<MemberId, Address> addressProvider; - - public RaftMessagingProtocol(MessagingService messagingService, - Serializer serializer, - Function<MemberId, Address> addressProvider) { - this.messagingService = messagingService; - this.serializer = serializer; - this.addressProvider = addressProvider; - } - - protected Address address(MemberId memberId) { - return addressProvider.apply(memberId); - } - - protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId, - String type, T request) { - Address address = address(memberId); - if (address == null) { - return Futures.exceptionalFuture(new ConnectException()); - } - return messagingService.sendAndReceive(address, type, serializer.encode(request)) - .thenApply(serializer::decode); - } - - protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, Object request) { - Address address = address(memberId); - if (address != null) { - return messagingService.sendAsync(address(memberId), type, serializer.encode(request)); - } - return CompletableFuture.completedFuture(null); - } - - protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) { - messagingService.registerHandler(type, (e, p) -> { - CompletableFuture<byte[]> future = new CompletableFuture<>(); - handler.apply(serializer.decode(p)).whenComplete((result, error) -> { - if (error == null) { - future.complete(serializer.encode(result)); - } else { - future.completeExceptionally(error); - } - }); - return future; - }); - } - - protected void unregisterHandler(String type) { - messagingService.unregisterHandler(type); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java+0 −346 removed@@ -1,346 +0,0 @@ -/* - * Copyright 2017-present Open Networking Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster.protocol; - -import io.atomix.cluster.MemberId; -import io.atomix.cluster.messaging.MessagingService; -import io.atomix.primitive.session.SessionId; -import io.atomix.protocols.raft.protocol.RaftServerProtocol; -import io.atomix.protocols.raft.protocol.OpenSessionRequest; -import io.atomix.protocols.raft.protocol.OpenSessionResponse; -import io.atomix.protocols.raft.protocol.CloseSessionRequest; -import io.atomix.protocols.raft.protocol.CloseSessionResponse; -import io.atomix.protocols.raft.protocol.KeepAliveRequest; -import io.atomix.protocols.raft.protocol.KeepAliveResponse; -import io.atomix.protocols.raft.protocol.QueryRequest; -import io.atomix.protocols.raft.protocol.QueryResponse; -import io.atomix.protocols.raft.protocol.CommandRequest; -import io.atomix.protocols.raft.protocol.CommandResponse; -import io.atomix.protocols.raft.protocol.MetadataRequest; -import io.atomix.protocols.raft.protocol.MetadataResponse; -import io.atomix.protocols.raft.protocol.JoinRequest; -import io.atomix.protocols.raft.protocol.JoinResponse; -import io.atomix.protocols.raft.protocol.LeaveRequest; -import io.atomix.protocols.raft.protocol.LeaveResponse; -import io.atomix.protocols.raft.protocol.ConfigureRequest; -import io.atomix.protocols.raft.protocol.ConfigureResponse; -import io.atomix.protocols.raft.protocol.ReconfigureRequest; -import io.atomix.protocols.raft.protocol.ReconfigureResponse; -import io.atomix.protocols.raft.protocol.InstallRequest; -import io.atomix.protocols.raft.protocol.InstallResponse; -import io.atomix.protocols.raft.protocol.PollRequest; -import io.atomix.protocols.raft.protocol.PollResponse; -import io.atomix.protocols.raft.protocol.VoteRequest; -import io.atomix.protocols.raft.protocol.VoteResponse; -import io.atomix.protocols.raft.protocol.TransferRequest; -import io.atomix.protocols.raft.protocol.TransferResponse; -import io.atomix.protocols.raft.protocol.AppendRequest; -import io.atomix.protocols.raft.protocol.AppendResponse; -import io.atomix.protocols.raft.protocol.ResetRequest; -import io.atomix.protocols.raft.protocol.PublishRequest; -import io.atomix.protocols.raft.protocol.HeartbeatResponse; -import io.atomix.protocols.raft.protocol.HeartbeatRequest; -import io.atomix.utils.net.Address; -import io.atomix.utils.serializer.Serializer; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Raft server messaging protocol between Raft Servers for each server in the cluster. - */ -public class RaftServerMessagingProtocol extends RaftMessagingProtocol - implements RaftServerProtocol { - public RaftServerMessagingProtocol(MessagingService messagingService, - Serializer serializer, - Function<MemberId, Address> addressProvider) { - super(messagingService, serializer, addressProvider); - } - - @Override - public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, - OpenSessionRequest request) { - return sendAndReceive(memberId, "open-session", request); - } - - @Override - public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, - CloseSessionRequest request) { - return sendAndReceive(memberId, "close-session", request); - } - - @Override - public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, - KeepAliveRequest request) { - return sendAndReceive(memberId, "keep-alive", request); - } - - @Override - public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { - return sendAndReceive(memberId, "query", request); - } - - @Override - public CompletableFuture<CommandResponse> command(MemberId memberId, - CommandRequest request) { - return sendAndReceive(memberId, "command", request); - } - - @Override - public CompletableFuture<MetadataResponse> metadata(MemberId memberId, - MetadataRequest request) { - return sendAndReceive(memberId, "metadata", request); - } - - @Override - public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { - return sendAndReceive(memberId, "join", request); - } - - @Override - public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { - return sendAndReceive(memberId, "leave", request); - } - - @Override - public CompletableFuture<ConfigureResponse> configure(MemberId memberId, - ConfigureRequest request) { - return sendAndReceive(memberId, "configure", request); - } - - @Override - public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, - ReconfigureRequest request) { - return sendAndReceive(memberId, "reconfigure", request); - } - - @Override - public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { - return sendAndReceive(memberId, "install", request); - } - - @Override - public CompletableFuture<TransferResponse> transfer(MemberId memberId, - TransferRequest request) { - return sendAndReceive(memberId, "transfer", request); - } - - @Override - public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { - return sendAndReceive(memberId, "poll", request); - } - - @Override - public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { - return sendAndReceive(memberId, "vote", request); - } - - @Override - public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { - return sendAndReceive(memberId, "append", request); - } - - @Override - public void publish(MemberId memberId, PublishRequest request) { - sendAsync(memberId, String.format("publish-%d", request.session()), request); - } - - @Override - public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, - HeartbeatRequest request) { - return sendAndReceive(memberId, "heartbeat", request); - } - - @Override - public void registerOpenSessionHandler(Function<OpenSessionRequest, - CompletableFuture<OpenSessionResponse>> handler) { - registerHandler("open-session", handler); - } - - @Override - public void unregisterOpenSessionHandler() { - unregisterHandler("open-session"); - } - - @Override - public void registerCloseSessionHandler(Function<CloseSessionRequest, - CompletableFuture<CloseSessionResponse>> handler) { - registerHandler("close-session", handler); - } - - @Override - public void unregisterCloseSessionHandler() { - unregisterHandler("close-session"); - } - - @Override - public void registerKeepAliveHandler(Function<KeepAliveRequest, - CompletableFuture<KeepAliveResponse>> handler) { - registerHandler("keep-alive", handler); - } - - @Override - public void unregisterKeepAliveHandler() { - unregisterHandler("keep-alive"); - } - - @Override - public void registerQueryHandler(Function<QueryRequest, - CompletableFuture<QueryResponse>> handler) { - registerHandler("query", handler); - } - - @Override - public void unregisterQueryHandler() { - unregisterHandler("query"); - } - - @Override - public void registerCommandHandler(Function<CommandRequest, - CompletableFuture<CommandResponse>> handler) { - registerHandler("command", handler); - } - - @Override - public void unregisterCommandHandler() { - unregisterHandler("command"); - } - - @Override - public void registerMetadataHandler(Function<MetadataRequest, - CompletableFuture<MetadataResponse>> handler) { - registerHandler("metadata", handler); - } - - @Override - public void unregisterMetadataHandler() { - unregisterHandler("metadata"); - } - - @Override - public void registerJoinHandler(Function<JoinRequest, - CompletableFuture<JoinResponse>> handler) { - registerHandler("join", handler); - } - - @Override - public void unregisterJoinHandler() { - unregisterHandler("join"); - } - - @Override - public void registerLeaveHandler(Function<LeaveRequest, - CompletableFuture<LeaveResponse>> handler) { - registerHandler("leave", handler); - } - - @Override - public void unregisterLeaveHandler() { - unregisterHandler("leave"); - } - - @Override - public void registerConfigureHandler(Function<ConfigureRequest, - CompletableFuture<ConfigureResponse>> handler) { - registerHandler("configure", handler); - } - - @Override - public void unregisterConfigureHandler() { - unregisterHandler("configure"); - } - - @Override - public void registerReconfigureHandler(Function<ReconfigureRequest, - CompletableFuture<ReconfigureResponse>> handler) { - registerHandler("reconfigure", handler); - } - - @Override - public void unregisterReconfigureHandler() { - unregisterHandler("reconfigure"); - } - - @Override - public void registerInstallHandler(Function<InstallRequest, - CompletableFuture<InstallResponse>> handler) { - registerHandler("install", handler); - } - - @Override - public void unregisterInstallHandler() { - unregisterHandler("install"); - } - - @Override - public void registerTransferHandler(Function<TransferRequest, - CompletableFuture<TransferResponse>> handler) { - registerHandler("transfer", handler); - } - - @Override - public void unregisterTransferHandler() { - unregisterHandler("transfer"); - } - - @Override - public void registerPollHandler(Function<PollRequest, - CompletableFuture<PollResponse>> handler) { - registerHandler("poll", handler); - } - - @Override - public void unregisterPollHandler() { - unregisterHandler("poll"); - } - - @Override - public void registerVoteHandler(Function<VoteRequest, - CompletableFuture<VoteResponse>> handler) { - registerHandler("vote", handler); - } - - @Override - public void unregisterVoteHandler() { - unregisterHandler("vote"); - } - - @Override - public void registerAppendHandler(Function<AppendRequest, - CompletableFuture<AppendResponse>> handler) { - registerHandler("append", handler); - } - - @Override - public void unregisterAppendHandler() { - unregisterHandler("append"); - } - - @Override - public void registerResetListener(SessionId sessionId, - Consumer<ResetRequest> listener, Executor executor) { - messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> { - listener.accept(serializer.decode(p)); - }, executor); - } - - @Override - public void unregisterResetListener(SessionId sessionId) { - messagingService.unregisterHandler(String.format("reset-%d", sessionId.id())); - } -}
zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java+0 −27 modified@@ -17,7 +17,6 @@ package org.apache.zeppelin.conf; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -326,7 +325,6 @@ public String getServerAddress() { return getString(ConfVars.ZEPPELIN_ADDR); } - @VisibleForTesting public void setServerPort(int port) { properties.put(ConfVars.ZEPPELIN_PORT.getVarName(), String.valueOf(port)); } @@ -831,26 +829,6 @@ public boolean isOnlyYarnCluster() { return getBoolean(ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER); } - public String getClusterAddress() { - return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR); - } - - public void setClusterAddress(String clusterAddr) { - properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr); - } - - public boolean isClusterMode() { - return !StringUtils.isEmpty(getString(ConfVars.ZEPPELIN_CLUSTER_ADDR)); - } - - public int getClusterHeartbeatInterval() { - return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL); - } - - public int getClusterHeartbeatTimeout() { - return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT); - } - public RUN_MODE getRunMode() { String mode = getString(ConfVars.ZEPPELIN_RUN_MODE); if ("auto".equalsIgnoreCase(mode)) { // auto detect @@ -864,7 +842,6 @@ public RUN_MODE getRunMode() { } } - @VisibleForTesting public void setRunMode(RUN_MODE runMode) { properties.put(ConfVars.ZEPPELIN_RUN_MODE.getVarName(), runMode.name()); } @@ -1090,10 +1067,6 @@ public enum ConfVars { ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""), - ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""), - ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000), - ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000), - ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"), // auto | local | k8s | Docker ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes
zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java+1 −2 modified@@ -17,7 +17,6 @@ package org.apache.zeppelin.interpreter; -import com.google.common.collect.Maps; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; @@ -124,7 +123,7 @@ public Builder setAuthenticationInfo(AuthenticationInfo authenticationInfo) { public Builder setConfig(Map<String, Object> config) { if (config != null) { - context.config = Maps.newHashMap(config); + context.config = new HashMap<>(config); } return this; }
zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java+0 −54 modified@@ -25,8 +25,6 @@ import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.cluster.ClusterManagerClient; -import org.apache.zeppelin.cluster.meta.ClusterMeta; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObject; @@ -80,7 +78,6 @@ import java.lang.reflect.Method; import java.net.URL; import java.nio.ByteBuffer; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -97,8 +94,6 @@ import java.util.concurrent.TimeUnit; import java.util.Optional; -import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META; - /** * Entry point for Interpreter process. * Accepting thrift connections from ZeppelinServer. @@ -150,8 +145,6 @@ public class RemoteInterpreterServer extends Thread private boolean isForceShutdown = true; private ZeppelinConfiguration zConf; - // cluster manager client - private ClusterManagerClient clusterManagerClient; private static Thread shutdownThread; @@ -212,17 +205,6 @@ public void init(Map<String, String> properties) throws InterpreterRPCException, this.zConf.setProperty(entry.getKey(), entry.getValue()); } - if (zConf.isClusterMode()) { - clusterManagerClient = ClusterManagerClient.getInstance(zConf); - clusterManagerClient.start(interpreterGroupId); - - // Cluster mode, discovering interpreter processes through metadata registration - // TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes - // 1. Can optimize the startup logic of the process - // 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP - putClusterMeta(); - } - try { lifecycleManager = createLifecycleManager(); lifecycleManager.onInterpreterProcessStarted(interpreterGroupId); @@ -333,26 +315,6 @@ public static void main(String[] args) throws Exception { } } - // Submit interpreter process metadata information to cluster metadata - private void putClusterMeta() { - if (!zConf.isClusterMode()){ - return; - } - String nodeName = clusterManagerClient.getClusterNodeName(); - - // commit interpreter meta - HashMap<String, Object> meta = new HashMap<>(); - meta.put(ClusterMeta.NODE_NAME, nodeName); - meta.put(ClusterMeta.INTP_PROCESS_NAME, interpreterGroupId); - meta.put(ClusterMeta.INTP_TSERVER_HOST, host); - meta.put(ClusterMeta.INTP_TSERVER_PORT, port); - meta.put(ClusterMeta.INTP_START_TIME, LocalDateTime.now()); - meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now()); - meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS); - - clusterManagerClient.putClusterMeta(INTP_PROCESS_META, interpreterGroupId, meta); - } - @Override public void createInterpreter(String interpreterGroupId, String sessionId, String className, Map<String, String> properties, String userName) throws InterpreterRPCException, TException { @@ -674,8 +636,6 @@ public ShutdownThread(String cause) { public void run() { LOGGER.info("Shutting down..."); LOGGER.info("Shutdown initialized by {}", cause); - // delete interpreter cluster meta - deleteClusterMeta(); if (interpreterGroup != null) { synchronized (interpreterGroup) { @@ -737,20 +697,6 @@ public void run() { LOGGER.info("Shutting down"); } - private void deleteClusterMeta() { - if (zConf == null || !zConf.isClusterMode()){ - return; - } - - try { - // delete interpreter cluster meta - clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId); - Thread.sleep(300); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - Thread.currentThread().interrupt(); - } - } } class InterpretJobListener implements JobListener {
zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java+0 −159 removed@@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class ClusterMultiNodeTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class); - - private static List<ClusterManagerServer> clusterServers = new ArrayList<>(); - private static ClusterManagerClient clusterClient = null; - private static ZeppelinConfiguration zConf; - - static final String metaKey = "ClusterMultiNodeTestKey"; - - @BeforeAll - static void startCluster() throws IOException, InterruptedException { - LOGGER.info("startCluster >>>"); - - String clusterAddrList = ""; - String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); - for (int i = 0; i < 3; i ++) { - // Set the cluster IP and port - int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - clusterAddrList += zServerHost + ":" + zServerPort; - if (i != 2) { - clusterAddrList += ","; - } - } - zConf = ZeppelinConfiguration.load(); - zConf.setClusterAddress(clusterAddrList); - - // mock cluster manager server - String cluster[] = clusterAddrList.split(","); - try { - for (int i = 0; i < 3; i ++) { - String[] parts = cluster[i].split(":"); - String clusterHost = parts[0]; - int clusterPort = Integer.valueOf(parts[1]); - - Class<ClusterManagerServer> clazz = ClusterManagerServer.class; - Constructor<ClusterManagerServer> constructor = clazz.getDeclaredConstructor(); - constructor.setAccessible(true); - ClusterManagerServer clusterServer = constructor.newInstance(); - clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort); - - clusterServers.add(clusterServer); - } - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - - for (ClusterManagerServer clusterServer : clusterServers) { - clusterServer.start(); - } - - // mock cluster manager client - clusterClient = ClusterManagerClient.getInstance(zConf); - clusterClient.start(metaKey); - - // Waiting for cluster startup - int wait = 0; - while(wait++ < 100) { - if (clusterIsStartup() && clusterClient.raftInitialized()) { - LOGGER.info("wait {}(ms) found cluster leader", wait*3000); - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - - Thread.sleep(3000); - assertEquals(true, clusterIsStartup()); - LOGGER.info("startCluster <<<"); - - getClusterServerMeta(); - } - - @AfterAll - static void stopCluster() { - LOGGER.info("stopCluster >>>"); - if (null != clusterClient) { - clusterClient.shutdown(); - } - for (ClusterManagerServer clusterServer : clusterServers) { - clusterServer.shutdown(); - } - LOGGER.info("stopCluster <<<"); - } - - static boolean clusterIsStartup() { - boolean foundLeader = false; - for (ClusterManagerServer clusterServer : clusterServers) { - if (!clusterServer.raftInitialized()) { - LOGGER.warn("clusterServer not Initialized!"); - return false; - } - if (clusterServer.isClusterLeader()) { - foundLeader = true; - } - } - - if (!foundLeader) { - LOGGER.warn("Can not found leader!"); - return false; - } - - return true; - } - - public static void getClusterServerMeta() { - LOGGER.info("getClusterServerMeta >>>"); - // Get metadata for all services - Map<String, Map<String, Object>> srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); - LOGGER.info(srvMeta.toString()); - - Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); - LOGGER.info(intpMeta.toString()); - - assertNotNull(srvMeta); - assertTrue(srvMeta instanceof Map); - - assertEquals(3, srvMeta.size()); - LOGGER.info("getClusterServerMeta <<<"); - } -}
zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java+0 −146 removed@@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - - -class ClusterSingleNodeTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class); - private static ZeppelinConfiguration zConf; - - private static ClusterManagerServer clusterServer = null; - private static ClusterManagerClient clusterClient = null; - - static String zServerHost; - static int zServerPort; - static final String metaKey = "ClusterSingleNodeTestKey"; - - @BeforeAll - static void startCluster() throws IOException, InterruptedException { - LOGGER.info("startCluster >>>"); - - zConf = ZeppelinConfiguration.load("zeppelin-site-test.xml"); - - // Set the cluster IP and port - zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); - zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - zConf.setClusterAddress(zServerHost + ":" + zServerPort); - - clusterServer = ClusterManagerServer.getInstance(zConf); - clusterServer.start(); - - // mock cluster manager client - clusterClient = ClusterManagerClient.getInstance(zConf); - clusterClient.start(metaKey); - - // Waiting for cluster startup - int wait = 0; - while(wait++ < 100) { - if (clusterServer.isClusterLeader() - && clusterServer.raftInitialized() - && clusterClient.raftInitialized()) { - LOGGER.info("wait {}(ms) found cluster leader", wait*3000); - break; - } - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - Thread.sleep(3000); - assertEquals(true, clusterServer.isClusterLeader()); - LOGGER.info("startCluster <<<"); - } - - @AfterAll - static void stopCluster() { - if (null != clusterClient) { - clusterClient.shutdown(); - } - if (null != clusterClient) { - clusterServer.shutdown(); - } - LOGGER.info("stopCluster"); - } - - @Test - void getServerMeta() { - LOGGER.info("getServerMeta >>>"); - - // Get metadata for all services - Map<String, Map<String, Object>> meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); - LOGGER.info(meta.toString()); - - Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); - LOGGER.info(intpMeta.toString()); - - assertNotNull(meta); - assertTrue(meta instanceof Map); - - // Get metadata for the current service - Map<String, Object> values = meta.get(clusterClient.getClusterNodeName()); - assertTrue(values instanceof Map); - assertTrue(values.size() > 0); - - LOGGER.info("getServerMeta <<<"); - } - - @Test - void putIntpProcessMeta() { - // mock IntpProcess Meta - Map<String, Object> meta = new HashMap<>(); - meta.put(ClusterMeta.SERVER_HOST, zServerHost); - meta.put(ClusterMeta.SERVER_PORT, zServerPort); - meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST"); - meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT"); - meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY"); - meta.put(ClusterMeta.CPU_USED, "CPU_USED"); - meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY"); - meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); - - // put IntpProcess Meta - clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); - - // get IntpProcess Meta - Map<String, Map<String, Object>> check - = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); - - LOGGER.info(check.toString()); - - assertNotNull(check); - assertNotNull(check.get(metaKey)); - assertEquals(true, check.get(metaKey).size()>0); - } -}
zeppelin-interpreter/src/test/resources/zeppelin-site-test.xml+0 −5 modified@@ -18,11 +18,6 @@ --> <configuration> - <property> - <name>zeppelin.cluster.addr</name> - <value></value> - <description>Server cluster address</description> - </property> <property> <name>zeppelin.server.addr</name>
zeppelin-plugins/launcher/cluster/pom.xml+0 −85 removed@@ -1,85 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>zengine-plugins-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.12.0-SNAPSHOT</version> - <relativePath>../../../zeppelin-plugins</relativePath> - </parent> - - <artifactId>launcher-cluster</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Plugin Cluster Launcher</name> - <description>Launcher implementation to run interpreters on cluster</description> - - <properties> - <plugin.name>Launcher/ClusterInterpreterLauncher</plugin.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>launcher-docker</artifactId> - <version>0.12.0-SNAPSHOT</version> - </dependency> - </dependencies> - - <build> - <testResources> - <testResource> - <directory>${project.basedir}/src/test/resources</directory> - </testResource> - <testResource> - <directory>${project.basedir}/src/main/resources</directory> - </testResource> - </testResources> - <plugins> - <plugin> - <artifactId>maven-enforcer-plugin</artifactId> - <executions> - <execution> - <id>enforce-dependency-convergence</id> - <phase>none</phase> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> - </plugins> - <resources> - <resource> - <directory>src/main/resources</directory> - <includes> - <include>**/*.*</include> - </includes> - </resource> - </resources> - </build> -</project>
zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheck.java+0 −84 removed@@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.zeppelin.cluster.ClusterCallback; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; - -// Metadata registered in the cluster by the interpreter process, -// Keep the interpreter process started -public class ClusterInterpreterCheck implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterCheck.class); - - private final InterpreterClient intpProcess; - private final String intpGroupId; - private final int connectTimeout; - private final ZeppelinConfiguration zConf; - - ClusterInterpreterCheck(InterpreterClient intpProcess, - String intpGroupId, - int connectTimeout, - ZeppelinConfiguration zConf) { - this.intpProcess = intpProcess; - this.intpGroupId = intpGroupId; - this.connectTimeout = connectTimeout; - this.zConf = zConf; - } - - @Override - public void run() { - LOGGER.info("ClusterInterpreterCheckThread run() >>>"); - - ClusterManagerServer clusterServer = ClusterManagerServer.getInstance(zConf); - - clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout, - new ClusterCallback<Map<String, Object>>() { - @Override - public InterpreterClient online(Map<String, Object> result) { - String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST); - int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT); - LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost, intpTSrvPort); - - if (intpProcess instanceof DockerInterpreterProcess) { - ((DockerInterpreterProcess) intpProcess).processStarted(intpTSrvPort, intpTSrvHost); - } else if (intpProcess instanceof ClusterInterpreterProcess) { - ((ClusterInterpreterProcess) intpProcess).processStarted(intpTSrvPort, intpTSrvHost); - } else { - LOGGER.error("Unknown type !"); - } - - return null; - } - - @Override - public void offline() { - LOGGER.error("Can not found cluster interpreter!"); - } - }); - - LOGGER.info("ClusterInterpreterCheckThread run() <<<"); - } -}
zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java+0 −275 removed@@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.zeppelin.cluster.ClusterCallback; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.event.ClusterEvent; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterRunner; -import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; - -import static org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT; - -/** - * Interpreter Launcher which use cluster to launch the interpreter process. - */ -public class ClusterInterpreterLauncher extends StandardInterpreterLauncher - implements ClusterEventListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class); - - private InterpreterLaunchContext context; - private ClusterManagerServer clusterServer; - public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) - throws IOException { - super(zConf, recoveryStorage); - this.clusterServer = ClusterManagerServer.getInstance(zConf); - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, this); - } - - @Override - public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException { - LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); - - this.context = context; - int connectTimeout = getConnectTimeout(context); - String intpGroupId = context.getInterpreterGroupId(); - - // connect exist Interpreter Process - InterpreterClient intpClient = clusterServer.getIntpProcessStatus( - intpGroupId, 3000, new ClusterCallback<Map<String, Object>>() { - @Override - public InterpreterClient online(Map<String, Object> result) { - String intpTserverHost = (String) result.get(INTP_TSERVER_HOST); - int intpTserverPort = (int) result.get(INTP_TSERVER_PORT); - - return new RemoteInterpreterRunningProcess( - context.getInterpreterSettingName(), - context.getInterpreterGroupId(), - connectTimeout, - getConnectPoolSize(context), - context.getIntpEventServerHost(), - context.getIntpEventServerPort(), - intpTserverHost, - intpTserverPort, - false); - } - - @Override - public void offline() { - LOGGER.info("interpreter {} is not exist!", intpGroupId); - } - }); - if (null != intpClient) { - return intpClient; - } - - // No process was found for the InterpreterGroup ID - String srvHost = null; - int srvPort = 0; - Map<String, Object> meta = clusterServer.getIdleNodeMeta(); - if (null == meta) { - LOGGER.error("Don't get idle node meta, launch interpreter on local."); - InterpreterClient clusterIntpProcess = createInterpreterProcess(context); - try { - clusterIntpProcess.start(context.getUserName()); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - return clusterIntpProcess; - } - } else { - srvHost = (String) meta.get(SERVER_HOST); - String localhost = RemoteInterpreterUtils.findAvailableHostAddress(); - - if (localhost.equalsIgnoreCase(srvHost)) { - LOGGER.info("launch interpreter on local"); - InterpreterClient clusterIntpProcess = createInterpreterProcess(context); - try { - clusterIntpProcess.start(context.getUserName()); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - return clusterIntpProcess; - } - } else { - // launch interpreter on cluster - srvPort = (int) meta.get(SERVER_PORT); - - Gson gson = new Gson(); - String sContext = gson.toJson(context); - - Map<String, Object> mapEvent = new HashMap<>(); - mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS); - mapEvent.put(CLUSTER_EVENT_MSG, sContext); - String strEvent = gson.toJson(mapEvent); - // Notify other server in the cluster that the resource is idle to create an interpreter - clusterServer.unicastClusterEvent( - srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, strEvent); - } - } - - // Find the ip and port of thrift registered by the remote interpreter process - // through the cluster metadata - String finalSrvHost = srvHost; - int finalSrvPort = srvPort; - intpClient = clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout, - new ClusterCallback<Map<String, Object>>() { - @Override - public InterpreterClient online(Map<String, Object> result) { - // connect exist Interpreter Process - String intpTserverHost = (String) result.get(INTP_TSERVER_HOST); - int intpTserverPort = (int) result.get(INTP_TSERVER_PORT); - - return new RemoteInterpreterRunningProcess( - context.getInterpreterSettingName(), - context.getInterpreterGroupId(), - connectTimeout, - getConnectPoolSize(context), - context.getIntpEventServerHost(), - context.getIntpEventServerPort(), - intpTserverHost, - intpTserverPort, - false); - } - - @Override - public void offline() { - String errorInfo = String.format("Creating process %s failed on remote server %s:%d", - intpGroupId, finalSrvHost, finalSrvPort); - LOGGER.error(errorInfo); - } - }); - if (null == intpClient) { - String errorInfo = String.format("Creating process %s failed on remote server %s:%d", - intpGroupId, srvHost, srvPort); - throw new IOException(errorInfo); - } else { - return intpClient; - } - } - - @Override - public void onClusterEvent(String msg) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(msg); - } - - try { - Gson gson = new Gson(); - Map<String, Object> mapEvent = gson.fromJson(msg, - new TypeToken<Map<String, Object>>() { - }.getType()); - String sEvent = (String) mapEvent.get(CLUSTER_EVENT); - ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent); - - switch (clusterEvent) { - case CREATE_INTP_PROCESS: - // 1)Other zeppelin servers in the cluster send requests to create an interpreter process - // 2)After the interpreter process is created, and the interpreter is started, - // the interpreter registers the thrift ip and port into the cluster metadata. - // 3)Other servers connect through the IP and port of thrift in the cluster metadata, - // using this remote interpreter process - String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG); - InterpreterLaunchContext context = gson.fromJson( - eventMsg, new TypeToken<InterpreterLaunchContext>() { - }.getType()); - InterpreterClient intpProcess = createInterpreterProcess(context); - intpProcess.start(context.getUserName()); - break; - default: - LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg); - break; - } - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - } - - private InterpreterClient createInterpreterProcess(InterpreterLaunchContext context) - throws IOException { - this.context = context; - - InterpreterClient intpProcess = null; - if (isRunningOnDocker(zConf)) { - DockerInterpreterLauncher dockerIntpLauncher = new DockerInterpreterLauncher(zConf, null); - intpProcess = dockerIntpLauncher.launch(context); - } else { - intpProcess = createClusterIntpProcess(); - } - - // must first step start check interpreter thread - ClusterInterpreterCheck intpCheck = new ClusterInterpreterCheck( - intpProcess, context.getInterpreterGroupId(), getConnectTimeout(context), zConf); - Executors.newSingleThreadExecutor().execute(intpCheck); - - return intpProcess; - } - - private RemoteInterpreterProcess createClusterIntpProcess() { - ClusterInterpreterProcess clusterIntpProcess = null; - try { - InterpreterOption option = context.getOption(); - InterpreterRunner runner = context.getRunner(); - String intpSetGroupName = context.getInterpreterSettingGroup(); - String intpSetName = context.getInterpreterSettingName(); - int connectTimeout = getConnectTimeout(context); - int connectionPoolSize = getConnectPoolSize(context); - String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" - + context.getInterpreterSettingId(); - - clusterIntpProcess = new ClusterInterpreterProcess( - runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), - context.getIntpEventServerPort(), - context.getIntpEventServerHost(), - zConf.getInterpreterPortRange(), - zConf.getInterpreterDir() + "/" + intpSetGroupName, - localRepoPath, - buildEnvFromProperties(context), - connectTimeout, - connectionPoolSize, - intpSetName, - context.getInterpreterGroupId(), - option.isUserImpersonate()); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - - return clusterIntpProcess; - } - - private boolean isRunningOnDocker(ZeppelinConfiguration zConf) { - return zConf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER; - } -}
zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java+0 −74 removed@@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import java.io.IOException; -import java.util.Map; - -import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; - -public class ClusterInterpreterProcess extends ExecRemoteInterpreterProcess { - - public ClusterInterpreterProcess( - String intpRunner, - int intpEventServerPort, - String intpEventServerHost, - String interpreterPortRange, - String intpDir, - String localRepoDir, - Map<String, String> env, - int connectTimeout, - int connectionPoolSize, - String interpreterSettingName, - String interpreterGroupId, - boolean isUserImpersonated) { - - super(intpEventServerPort, - intpEventServerHost, - interpreterPortRange, - intpDir, - localRepoDir, - env, - connectTimeout, - connectionPoolSize, - interpreterSettingName, - interpreterGroupId, - isUserImpersonated, - intpRunner); - } - - @Override - public void start(String userName) throws IOException { - super.start(userName); - } - - @Override - public boolean isRunning() { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { - return true; - } - return false; - } - - @Override - public String getErrorMessage() { - return null; - } -}
zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java+0 −142 removed@@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.util.Properties; - -class ClusterInterpreterLauncherTest extends ClusterMockTest { - - @BeforeAll - static void startTest() throws IOException, InterruptedException { - ClusterMockTest.startCluster(); - } - - @AfterAll - static void stopTest() throws IOException, InterruptedException { - ClusterMockTest.stopCluster(); - } - - @BeforeEach - void setUp() { - for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) { - System.clearProperty(confVar.getVarName()); - } - } - - // TODO(zjffdu) disable this test because this is not a correct unit test, - // Actually the interpreter process here never start before ZEPPELIN-5300. - // @Test - void testConnectExistOnlineIntpProcess() throws IOException { - mockIntpProcessMeta("intpGroupId", true); - - ClusterInterpreterLauncher launcher = - new ClusterInterpreterLauncher(ClusterMockTest.zConf, null); - Properties properties = new Properties(); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, - "user1", "intpGroupId", "groupId", - "groupName", "name", 0, "host"); - - InterpreterClient client = launcher.launch(context); - - assertTrue(client instanceof RemoteInterpreterRunningProcess); - RemoteInterpreterRunningProcess interpreterProcess = (RemoteInterpreterRunningProcess) client; - assertEquals("127.0.0.1", interpreterProcess.getHost()); - assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(5000, interpreterProcess.getConnectTimeout()); - interpreterProcess.close(); - } - - @Test - void testConnectExistOfflineIntpProcess() throws IOException { - mockIntpProcessMeta("intpGroupId2", false); - - ClusterInterpreterLauncher launcher = - new ClusterInterpreterLauncher(ClusterMockTest.zConf, null); - Properties properties = new Properties(); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, - "user1", "intpGroupId2", "groupId", - "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - - assertTrue(client instanceof ClusterInterpreterProcess); - ClusterInterpreterProcess interpreterProcess = (ClusterInterpreterProcess) client; - assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); - assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); - assertEquals(5000, interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); - assertTrue(interpreterProcess.getEnv().size() >= 1); - assertEquals(true, interpreterProcess.isUserImpersonated()); - interpreterProcess.close(); - } - - @Test - void testCreateIntpProcessDockerMode() throws IOException { - zConf.setRunMode(ZeppelinConfiguration.RUN_MODE.DOCKER); - - ClusterInterpreterLauncher launcher = new ClusterInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, - "user1", "intpGroupId3", "groupId3", - "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - - assertTrue(client instanceof DockerInterpreterProcess); - } - - @Test - void testCreateIntpProcessLocalMode() throws IOException { - zConf.setRunMode(ZeppelinConfiguration.RUN_MODE.LOCAL); - - ClusterInterpreterLauncher launcher = new ClusterInterpreterLauncher(zConf, null); - Properties properties = new Properties(); - properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "1000"); - InterpreterOption option = new InterpreterOption(); - option.setUserImpersonate(true); - InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, - "user1", "intpGroupId4", "groupId4", - "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - - assertTrue(client instanceof ClusterInterpreterProcess); - } -}
zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java+0 −163 removed@@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.cluster.ClusterManagerClient; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS; -import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -public class ClusterMockTest { - private static Logger LOGGER = LoggerFactory.getLogger(ClusterMockTest.class); - - private static ClusterManagerServer clusterServer = null; - private static ClusterManagerClient clusterClient = null; - - protected static ZeppelinConfiguration zConf = null; - - static String zServerHost; - static int zServerPort; - static final String metaKey = "ClusterMockKey"; - - static TServerSocket tSocket = null; - - public static void startCluster() throws IOException, InterruptedException { - LOGGER.info("startCluster >>>"); - - zConf = ZeppelinConfiguration.load(); - - // Set the cluster IP and port - zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); - zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - zConf.setClusterAddress(zServerHost + ":" + zServerPort); - - // mock cluster manager server - clusterServer = ClusterManagerServer.getInstance(zConf); - clusterServer.start(); - - // mock cluster manager client - clusterClient = ClusterManagerClient.getInstance(zConf); - clusterClient.start(metaKey); - - // Waiting for cluster startup - int wait = 0; - while (wait++ < 100) { - if (clusterServer.isClusterLeader() - && clusterServer.raftInitialized() - && clusterClient.raftInitialized()) { - LOGGER.info("wait {}(ms) found cluster leader", wait * 3000); - break; - } - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - assertEquals(true, clusterServer.isClusterLeader()); - - try { - tSocket = new TServerSocket(0); - } catch (TTransportException e) { - throw new IOException("Fail to create TServerSocket", e); - } - - LOGGER.info("startCluster <<<"); - } - - public static void stopCluster() { - LOGGER.info("stopCluster >>>"); - if (null != clusterClient) { - clusterClient.shutdown(); - } - if (null != clusterClient) { - clusterServer.shutdown(); - } - - tSocket.close(); - LOGGER.info("stopCluster <<<"); - } - - public void getServerMeta() { - LOGGER.info("serverMeta >>>"); - - // Get metadata for all services - Map<String, Map<String, Object>> meta = - clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); - - LOGGER.info(meta.toString()); - - assertNotNull(meta); - assertEquals(true, (meta instanceof Map)); - - // Get metadata for the current service - Map<String, Object> values = meta.get(zServerHost + ":" + zServerPort); - assertEquals(true, (values instanceof Map)); - - assertEquals(true, values.size() > 0); - - LOGGER.info("serverMeta <<<"); - } - - public void mockIntpProcessMeta(String metaKey, boolean online) { - // mock IntpProcess Meta - Map<String, Object> meta = new HashMap<>(); - meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1"); - meta.put(ClusterMeta.SERVER_PORT, 6000); - meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1"); - meta.put(ClusterMeta.INTP_TSERVER_PORT, tSocket.getServerSocket().getLocalPort()); - meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY"); - meta.put(ClusterMeta.CPU_USED, "CPU_USED"); - meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY"); - meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED"); - meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now()); - - if (online) { - meta.put(ClusterMeta.STATUS, ONLINE_STATUS); - } else { - meta.put(ClusterMeta.STATUS, OFFLINE_STATUS); - } - // put IntpProcess Meta - clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta); - - // get IntpProcess Meta - Map<String, Map<String, Object>> check = - clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey); - - LOGGER.info(check.toString()); - - assertNotNull(check); - assertNotNull(check.get(metaKey)); - assertEquals(true, check.get(metaKey).size() == 10); - } -}
zeppelin-plugins/launcher/cluster/src/test/resources/log4j.properties+0 −24 removed@@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Root logger option -log4j.rootLogger=INFO, stdout - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
zeppelin-plugins/pom.xml+0 −8 modified@@ -46,7 +46,6 @@ <module>notebookrepo/oss</module> <module>launcher/k8s-standard</module> - <module>launcher/cluster</module> <module>launcher/docker</module> <module>launcher/yarn</module> <module>launcher/flink</module> @@ -82,13 +81,6 @@ <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> - <exclusions> - <!-- used from zeppelin-zenigne --> - <exclusion> - <groupId>org.objenesis</groupId> - <artifactId>objenesis</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies>
zeppelin-server/pom.xml+0 −7 modified@@ -332,13 +332,6 @@ <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> - <exclusions> - <!-- used from zeppelin-zenigne --> - <exclusion> - <groupId>org.objenesis</groupId> - <artifactId>objenesis</artifactId> - </exclusion> - </exclusions> </dependency> <dependency>
zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java+0 −243 removed@@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.rest; - -import com.google.gson.Gson; -import org.apache.zeppelin.annotation.ZeppelinApi; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.meta.ClusterMeta; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.server.JsonResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Response; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * clusters Rest api. - */ -@Path("/cluster") -@Produces("application/json") -public class ClusterRestApi { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterRestApi.class); - Gson gson = new Gson(); - - private ClusterManagerServer clusterManagerServer; - - - // Do not modify, Use by `zeppelin-web/src/app/cluster/cluster.html` - private static final String PROPERTIES = "properties"; - private final ZeppelinConfiguration zConf; - - @Inject - public ClusterRestApi(ZeppelinConfiguration zConf) { - this.zConf = zConf; - if (zConf.isClusterMode()) { - clusterManagerServer = ClusterManagerServer.getInstance(zConf); - } else { - LOGGER.warn("Cluster mode is disabled, ClusterRestApi won't work"); - } - } - - @GET - @Path("/address") - @ZeppelinApi - public Response getClusterAddress() { - String clusterAddr = zConf.getClusterAddress(); - Map<String, String> data = new HashMap<>(); - data.put("clusterAddr", clusterAddr); - - return new JsonResponse<>(Response.Status.OK, "Cluster Address", data).build(); - } - - /** - * get all nodes of clusters - */ - @GET - @Path("/nodes") - @ZeppelinApi - public Response getClusterNodes(){ - List<Map<String, Object>> nodes = new ArrayList<>(); - - Map<String, Map<String, Object>> clusterMeta; - Map<String, Map<String, Object>> intpMeta; - clusterMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.SERVER_META, ""); - intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); - - // Number of calculation processes - for (Entry<String, Map<String, Object>> serverMetaEntity : clusterMeta.entrySet()) { - if (!serverMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)) { - continue; - } - String serverNodeName = (String) serverMetaEntity.getValue().get(ClusterMeta.NODE_NAME); - - List<String> arrIntpProcess = new ArrayList<>(); - int intpProcCount = 0; - for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) { - if (!intpMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME) - && !intpMetaEntity.getValue().containsKey(ClusterMeta.INTP_PROCESS_NAME)) { - continue; - } - String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME); - - if (serverNodeName.equals(intpNodeName)) { - intpProcCount ++; - String intpName = (String) intpMetaEntity.getValue().get(ClusterMeta.INTP_PROCESS_NAME); - arrIntpProcess.add(intpName); - } - } - serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_COUNT, intpProcCount); - serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_LIST, arrIntpProcess); - } - - for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) { - String nodeName = entry.getKey(); - Map<String, Object> properties = entry.getValue(); - - Map<String, Object> sortProperties = new HashMap<>(); - - if (properties.containsKey(ClusterMeta.CPU_USED) - && properties.containsKey(ClusterMeta.CPU_CAPACITY)) { - float cpuUsed = (long) properties.get(ClusterMeta.CPU_USED) / (float) 100.0; - float cpuCapacity = (long) properties.get(ClusterMeta.CPU_CAPACITY) / (float) 100.0; - float cpuRate = cpuUsed / cpuCapacity * 100; - - String cpuInfo = String.format("%.2f / %.2f = %.2f", cpuUsed, cpuCapacity, cpuRate); - sortProperties.put(ClusterMeta.CPU_USED + " / " + ClusterMeta.CPU_CAPACITY, cpuInfo + "%"); - } - - if (properties.containsKey(ClusterMeta.MEMORY_USED) - && properties.containsKey(ClusterMeta.MEMORY_CAPACITY)) { - float memoryUsed = (long) properties.get(ClusterMeta.MEMORY_USED) / (float) (1024*1024*1024); - float memoryCapacity = (long) properties.get(ClusterMeta.MEMORY_CAPACITY) / (float) (1024*1024*1024); - float memoryRate = memoryUsed / memoryCapacity * 100; - - String memoryInfo = String.format("%.2fGB / %.2fGB = %.2f", - memoryUsed, memoryCapacity, memoryRate); - sortProperties.put(ClusterMeta.MEMORY_USED + " / " + ClusterMeta.MEMORY_CAPACITY, memoryInfo + "%"); - } - - if (properties.containsKey(ClusterMeta.SERVER_START_TIME)) { - // format LocalDateTime - Object serverStartTime = properties.get(ClusterMeta.SERVER_START_TIME); - if (serverStartTime instanceof LocalDateTime) { - LocalDateTime localDateTime = (LocalDateTime) serverStartTime; - String dateTime = formatLocalDateTime(localDateTime); - sortProperties.put(ClusterMeta.SERVER_START_TIME, dateTime); - } else { - sortProperties.put(ClusterMeta.SERVER_START_TIME, "Wrong time type!"); - } - } - if (properties.containsKey(ClusterMeta.STATUS)) { - sortProperties.put(ClusterMeta.STATUS, properties.get(ClusterMeta.STATUS)); - } - if (properties.containsKey(ClusterMeta.LATEST_HEARTBEAT)) { - // format LocalDateTime - Object latestHeartbeat = properties.get(ClusterMeta.LATEST_HEARTBEAT); - if (latestHeartbeat instanceof LocalDateTime) { - LocalDateTime localDateTime = (LocalDateTime) latestHeartbeat; - String dateTime = formatLocalDateTime(localDateTime); - sortProperties.put(ClusterMeta.LATEST_HEARTBEAT, dateTime); - } else { - sortProperties.put(ClusterMeta.LATEST_HEARTBEAT, "Wrong time type!"); - } - } - if (properties.containsKey(ClusterMeta.INTP_PROCESS_LIST)) { - sortProperties.put(ClusterMeta.INTP_PROCESS_LIST, properties.get(ClusterMeta.INTP_PROCESS_LIST)); - } - - Map<String, Object> node = new HashMap<>(); - node.put(ClusterMeta.NODE_NAME, nodeName); - node.put(PROPERTIES, sortProperties); - - nodes.add(node); - } - - return new JsonResponse<>(Response.Status.OK, "", nodes).build(); - } - - private String formatLocalDateTime(LocalDateTime localDateTime) { - DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; - return localDateTime.format(dtf); - } - - /** - * get node info by id - */ - @GET - @Path("/node/{nodeName}/{intpName}") - @ZeppelinApi - public Response getClusterNode(@PathParam("nodeName") String nodeName, - @PathParam("intpName") String intpName){ - List<Map<String, Object>> intpProcesses = new ArrayList<>(); - - Map<String, Map<String, Object>> intpMeta = clusterManagerServer.getClusterMeta( - ClusterMetaType.INTP_PROCESS_META, ""); - - // Number of calculation processes - for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) { - String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME); - - if (null != intpNodeName && intpNodeName.equals(nodeName)) { - Map<String, Object> node = new HashMap<>(); - node.put(ClusterMeta.NODE_NAME, intpNodeName); - node.put(PROPERTIES, intpMetaEntity.getValue()); - - // format LocalDateTime - Map<String, Object> properties = intpMetaEntity.getValue(); - if (properties.containsKey(ClusterMeta.INTP_START_TIME)) { - Object intpStartTime = properties.get(ClusterMeta.INTP_START_TIME); - if (intpStartTime instanceof LocalDateTime) { - LocalDateTime localDateTime = (LocalDateTime) intpStartTime; - String dateTime = formatLocalDateTime(localDateTime); - properties.put(ClusterMeta.INTP_START_TIME, dateTime); - } else { - properties.put(ClusterMeta.INTP_START_TIME, "Wrong time type!"); - } - } - if (properties.containsKey(ClusterMeta.LATEST_HEARTBEAT)) { - Object latestHeartbeat = properties.get(ClusterMeta.LATEST_HEARTBEAT); - if (latestHeartbeat instanceof LocalDateTime) { - LocalDateTime localDateTime = (LocalDateTime) latestHeartbeat; - String dateTime = formatLocalDateTime(localDateTime); - properties.put(ClusterMeta.LATEST_HEARTBEAT, dateTime); - } else { - properties.put(ClusterMeta.LATEST_HEARTBEAT, "Wrong time type!"); - } - } - - intpProcesses.add(node); - } - } - - return new JsonResponse<>(Response.Status.OK, "", intpProcesses).build(); - } -}
zeppelin-server/src/main/java/org/apache/zeppelin/server/RestApiApplication.java+0 −2 modified@@ -22,7 +22,6 @@ import javax.ws.rs.core.Application; import org.apache.zeppelin.rest.AdminRestApi; -import org.apache.zeppelin.rest.ClusterRestApi; import org.apache.zeppelin.rest.ConfigurationsRestApi; import org.apache.zeppelin.rest.CredentialRestApi; import org.apache.zeppelin.rest.HeliumRestApi; @@ -40,7 +39,6 @@ public class RestApiApplication extends Application { public Set<Class<?>> getClasses() { Set<Class<?>> s = new HashSet<>(); s.add(AdminRestApi.class); - s.add(ClusterRestApi.class); s.add(ConfigurationsRestApi.class); s.add(CredentialRestApi.class); s.add(HeliumRestApi.class);
zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java+0 −46 modified@@ -60,7 +60,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.shiro.web.env.EnvironmentLoaderListener; import org.apache.shiro.web.servlet.ShiroFilter; -import org.apache.zeppelin.cluster.ClusterManagerServer; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -70,9 +69,7 @@ import org.apache.zeppelin.helium.HeliumApplicationFactory; import org.apache.zeppelin.helium.HeliumBundleFactory; import org.apache.zeppelin.interpreter.InterpreterFactory; -import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.metric.JVMInfoBinder; import org.apache.zeppelin.metric.PrometheusServlet; @@ -89,7 +86,6 @@ import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; import org.apache.zeppelin.notebook.scheduler.SchedulerService; import org.apache.zeppelin.plugin.PluginManager; -import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper; import org.apache.zeppelin.search.LuceneSearch; import org.apache.zeppelin.search.NoSearchService; import org.apache.zeppelin.search.SearchService; @@ -101,7 +97,6 @@ import org.apache.zeppelin.storage.ConfigStorage; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; -import org.apache.zeppelin.util.ReflectionUtils; import org.apache.zeppelin.utils.PEMImporter; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpVersion; @@ -241,8 +236,6 @@ protected void configure() { } catch (IOException e) { LOGGER.error("Failed to init NotebookRepo", e); } - // Cluster Manager Server - setupClusterManagerServer(); initJMX(); @@ -477,45 +470,6 @@ private void setupNotebookServer(WebAppContext webapp) { }); } - private void setupClusterManagerServer() { - if (zConf.isClusterMode()) { - LOGGER.info("Cluster mode is enabled, starting ClusterManagerServer"); - ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance(zConf); - - NotebookServer notebookServer = sharedServiceLocator.getService(NotebookServer.class); - clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer); - - AuthorizationService authorizationService = - sharedServiceLocator.getService(AuthorizationService.class); - clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService); - - InterpreterSettingManager interpreterSettingManager = - sharedServiceLocator.getService(InterpreterSettingManager.class); - clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC, interpreterSettingManager); - - // Since the ClusterInterpreterLauncher is lazy, dynamically generated, So in cluster mode, - // when the zeppelin service starts, Create a ClusterInterpreterLauncher object, - // This allows the ClusterInterpreterLauncher to listen for cluster events. - try { - InterpreterSettingManager intpSettingManager = - sharedServiceLocator.getService(InterpreterSettingManager.class); - RecoveryStorage recoveryStorage = ReflectionUtils.createClazzInstance( - zConf.getRecoveryStorageClass(), - new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, - new Object[] {zConf, intpSettingManager}); - recoveryStorage.init(); - sharedServiceLocator.getService(PluginManager.class).loadInterpreterLauncher( - InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME, recoveryStorage); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - - clusterManagerServer.start(); - } else { - LOGGER.info("Cluster mode is disabled"); - } - } - private static SslContextFactory getSslContextFactory(ZeppelinConfiguration zConf) { SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java+1 −116 modified@@ -43,7 +43,6 @@ import javax.inject.Inject; import javax.inject.Provider; import javax.websocket.CloseReason; -import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; import javax.websocket.OnClose; import javax.websocket.OnError; @@ -54,10 +53,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.thrift.TException; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.event.ClusterEvent; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.common.Message; import org.apache.zeppelin.common.Message.OP; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -86,7 +81,6 @@ import org.apache.zeppelin.notebook.NotebookImportDeserializer; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.ParagraphJobListener; -import org.apache.zeppelin.notebook.exception.CorruptedNoteException; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; import org.apache.zeppelin.rest.exception.ForbiddenException; import org.apache.zeppelin.scheduler.Job; @@ -122,8 +116,7 @@ public class NotebookServer implements AngularObjectRegistryListener, RemoteInterpreterProcessListener, ApplicationEventListener, ParagraphJobListener, - NoteEventListener, - ClusterEventListener { + NoteEventListener { /** * Job manager service type. @@ -651,7 +644,6 @@ public void saveInterpreterBindings(NotebookSocket conn, ServiceContext context, public void broadcastNote(Note note) { inlineBroadcastNote(note); - broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, MSG_ID_NOT_DEFINED, note); } private void inlineBroadcastNote(Note note) { @@ -672,7 +664,6 @@ private void inlineBroadcastParagraph(Note note, Paragraph p, String msgId) { public void broadcastParagraph(Note note, Paragraph p, String msgId) { inlineBroadcastParagraph(note, p, msgId); - broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, msgId, note, p); } private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap, String msgId) { @@ -686,7 +677,6 @@ private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap, private void broadcastParagraphs(Map<String, Paragraph> userParagraphMap, Paragraph defaultParagraph, String msgId) { inlineBroadcastParagraphs(userParagraphMap, msgId); - broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, msgId, userParagraphMap, defaultParagraph); } private void inlineBroadcastNewParagraph(Note note, Paragraph para) { @@ -699,7 +689,6 @@ private void inlineBroadcastNewParagraph(Note note, Paragraph para) { private void broadcastNewParagraph(Note note, Paragraph para) { inlineBroadcastNewParagraph(note, para); - broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, MSG_ID_NOT_DEFINED, note, para); } private void inlineBroadcastNoteList() { @@ -718,110 +707,6 @@ public void broadcastNoteListUpdate() { public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { inlineBroadcastNoteList(); - broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, MSG_ID_NOT_DEFINED, subject, userAndRoles); - } - - // broadcast ClusterEvent - private void broadcastClusterEvent(ClusterEvent event, String msgId, Object... objects) { - if (!zConf.isClusterMode()) { - return; - } - - ClusterMessage clusterMessage = new ClusterMessage(event); - clusterMessage.setMsgId(msgId); - - for (Object object : objects) { - String json; - if (object instanceof AuthenticationInfo) { - json = ((AuthenticationInfo) object).toJson(); - clusterMessage.put("AuthenticationInfo", json); - } else if (object instanceof Note) { - json = ((Note) object).toJson(); - clusterMessage.put("Note", json); - } else if (object instanceof Paragraph) { - json = ((Paragraph) object).toJson(); - clusterMessage.put("Paragraph", json); - } else if (object instanceof Set) { - Gson gson = new Gson(); - json = gson.toJson(object); - clusterMessage.put("Set<String>", json); - } else if (object instanceof Map) { - Gson gson = new Gson(); - json = gson.toJson(object); - clusterMessage.put("Map<String, Paragraph>", json); - } else { - LOGGER.error("Unknown object type!"); - } - } - - String msg = ClusterMessage.serializeMessage(clusterMessage); - ClusterManagerServer.getInstance(zConf).broadcastClusterEvent( - ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, msg); - } - - @Override - public void onClusterEvent(String msg) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("onClusterEvent : {}", msg); - } - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - - Note note = null; - Paragraph paragraph = null; - Set<String> userAndRoles = null; - Map<String, Paragraph> userParagraphMap = null; - AuthenticationInfo authenticationInfo = null; - for (Map.Entry<String, String> entry : message.getData().entrySet()) { - String key = entry.getKey(); - String json = entry.getValue(); - if (StringUtils.equals(key, "AuthenticationInfo")) { - authenticationInfo = AuthenticationInfo.fromJson(json); - } else if (StringUtils.equals(key, "Note")) { - try { - note = noteParser.get().fromJson(null, json); - } catch (CorruptedNoteException e) { - LOGGER.warn("Fail to parse note json", e); - } - } else if (StringUtils.equals(key, "Paragraph")) { - paragraph = noteParser.get().fromJson(json); - } else if (StringUtils.equals(key, "Set<String>")) { - Gson gson = new Gson(); - userAndRoles = gson.fromJson(json, new TypeToken<Set<String>>() { - }.getType()); - } else if (StringUtils.equals(key, "Map<String, Paragraph>")) { - Gson gson = new Gson(); - userParagraphMap = gson.fromJson(json, new TypeToken<Map<String, Paragraph>>() { - }.getType()); - } else { - LOGGER.error("Unknown key:{}, json:{}!", key, json); - } - } - - switch (message.clusterEvent) { - case BROADCAST_NOTE: - inlineBroadcastNote(note); - break; - case BROADCAST_NOTE_LIST: - try { - getNotebook().reloadAllNotes(authenticationInfo); - inlineBroadcastNoteList(); - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } - break; - case BROADCAST_PARAGRAPH: - inlineBroadcastParagraph(note, paragraph, message.getMsgId()); - break; - case BROADCAST_PARAGRAPHS: - inlineBroadcastParagraphs(userParagraphMap, message.getMsgId()); - break; - case BROADCAST_NEW_PARAGRAPH: - inlineBroadcastNewParagraph(note, paragraph); - break; - default: - LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); - break; - } } public void listNotesInfo(NotebookSocket conn, ServiceContext context) throws IOException {
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java+0 −68 removed@@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.fail; - -import java.util.Set; - -public class ClusterAuthEventListenerTest implements ClusterEventListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterAuthEventListenerTest.class); - - public String receiveMsg = null; - - @Override - public void onClusterEvent(String msg) { - receiveMsg = msg; - LOGGER.info("ClusterAuthEventListenerTest#onClusterEvent : {}", msg); - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - - String noteId = message.get("noteId"); - String user = message.get("user"); - String jsonSet = message.get("set"); - Gson gson = new Gson(); - Set<String> set = gson.fromJson(jsonSet, new TypeToken<Set<String>>() {}.getType()); - - switch (message.clusterEvent) { - case SET_READERS_PERMISSIONS: - case SET_WRITERS_PERMISSIONS: - case SET_OWNERS_PERMISSIONS: - case SET_RUNNERS_PERMISSIONS: - assertNotNull(set); - assertNotNull(noteId); - break; - case CLEAR_PERMISSION: - assertNotNull(noteId); - break; - case SET_ROLES: - assertNotNull(user); - break; - default: - receiveMsg = null; - fail("Unknown clusterEvent : " + message.clusterEvent); - break; - } - } -}
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java+0 −572 removed@@ -1,572 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.reflect.TypeToken; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.util.EntityUtils; -import org.apache.thrift.TException; -import org.apache.zeppelin.MiniZeppelinServer; -import org.apache.zeppelin.cluster.meta.ClusterMetaType; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -import org.apache.zeppelin.interpreter.thrift.ParagraphInfo; -import org.apache.zeppelin.interpreter.thrift.ServiceException; -import org.apache.zeppelin.notebook.AuthorizationService; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.Paragraph; -import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService; -import org.apache.zeppelin.rest.AbstractTestRestApi; -import org.apache.zeppelin.rest.message.NewParagraphRequest; -import org.apache.zeppelin.service.ConfigurationService; -import org.apache.zeppelin.service.NotebookService; -import org.apache.zeppelin.socket.NotebookServer; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -public class ClusterEventTest extends AbstractTestRestApi { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterEventTest.class); - - private static List<ClusterAuthEventListenerTest> clusterAuthEventListenerTests = new ArrayList<>(); - private static List<ClusterNoteEventListenerTest> clusterNoteEventListenerTests = new ArrayList<>(); - private static List<ClusterIntpSettingEventListenerTest> clusterIntpSettingEventListenerTests = new ArrayList<>(); - - private static List<ClusterManagerServer> clusterServers = new ArrayList<>(); - private static ClusterManagerClient clusterClient = null; - static final String metaKey = "ClusterEventTestKey"; - - private static Notebook notebook; - private static NotebookServer notebookServer; - private static QuartzSchedulerService schedulerService; - private static NotebookService notebookService; - private static AuthorizationService authorizationService; - private AuthenticationInfo anonymous; - - Gson gson = new Gson(); - - private static MiniZeppelinServer zepServer; - - @BeforeEach - void setup() { - zConf = zepServer.getZeppelinConfiguration(); - } - - @BeforeAll - static void init() throws Exception { - zepServer = new MiniZeppelinServer(ClusterEventTest.class.getSimpleName()); - zepServer.addInterpreter("md"); - zepServer.addInterpreter("sh"); - genClusterAddressConf(zepServer.getZeppelinConfiguration()); - zepServer.start(); - notebook = zepServer.getService(Notebook.class); - authorizationService = zepServer.getService(AuthorizationService.class); - ZeppelinConfiguration zConf = zepServer.getZeppelinConfiguration(); - schedulerService = new QuartzSchedulerService(zConf, notebook); - notebook.initNotebook(); - notebook.waitForFinishInit(1, TimeUnit.MINUTES); - notebookServer = spy(zepServer.getService(NotebookServer.class)); - notebookService = new NotebookService(notebook, authorizationService, zConf, schedulerService); - - ConfigurationService configurationService = new ConfigurationService(notebook.getConf()); - when(notebookServer.getNotebookService()).thenReturn(notebookService); - when(notebookServer.getConfigurationService()).thenReturn(configurationService); - - startOtherZeppelinClusterNode(zConf); - - // wait zeppelin cluster startup - Thread.sleep(10000); - // mock cluster manager client - clusterClient = ClusterManagerClient.getInstance(zConf); - clusterClient.start(metaKey); - - // Waiting for cluster startup - int wait = 0; - while(wait++ < 100) { - if (clusterIsStartup() && clusterClient.raftInitialized()) { - LOGGER.info("wait {}(ms) found cluster leader", wait*500); - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } - } - - Thread.sleep(3000); - assertEquals(true, clusterIsStartup()); - - getClusterServerMeta(); - } - - @AfterAll - static void destroy() throws Exception { - if (null != clusterClient) { - clusterClient.shutdown(); - } - for (ClusterManagerServer clusterServer : clusterServers) { - clusterServer.shutdown(); - } - - zepServer.destroy(); - LOGGER.info("stopCluster <<<"); - } - - @BeforeEach - void setUp() { - anonymous = new AuthenticationInfo("anonymous"); - zConf = zepServer.getZeppelinConfiguration(); - } - - private static void genClusterAddressConf(ZeppelinConfiguration zConf) - throws IOException, InterruptedException { - String clusterAddrList = ""; - String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); - for (int i = 0; i < 3; i ++) { - // Set the cluster IP and port - int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - clusterAddrList += zServerHost + ":" + zServerPort; - if (i != 2) { - clusterAddrList += ","; - } - } - zConf.setClusterAddress(clusterAddrList); - LOGGER.info("clusterAddrList = {}", clusterAddrList); - } - - public static ClusterManagerServer startClusterSingleNode(String clusterAddrList, - String clusterHost, - int clusterPort, - ZeppelinConfiguration zConf) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - Class<ClusterManagerServer> clazz = ClusterManagerServer.class; - Constructor<ClusterManagerServer> constructor = clazz.getDeclaredConstructor(ZeppelinConfiguration.class); - constructor.setAccessible(true); - ClusterManagerServer clusterServer = constructor.newInstance(zConf); - clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort); - - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer); - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService); - return clusterServer; - } - - // - public static void startOtherZeppelinClusterNode(ZeppelinConfiguration zConf) - throws IOException, InterruptedException { - LOGGER.info("startCluster >>>"); - String clusterAddrList = zConf.getClusterAddress(); - - // mock cluster manager server - String cluster[] = clusterAddrList.split(","); - try { - // NOTE: cluster[2] is ZeppelinServerMock - for (int i = 0; i < 2; i ++) { - String[] parts = cluster[i].split(":"); - String clusterHost = parts[0]; - int clusterPort = Integer.valueOf(parts[1]); - - ClusterManagerServer clusterServer - = startClusterSingleNode(clusterAddrList, clusterHost, clusterPort, zConf); - clusterServers.add(clusterServer); - } - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - - for (ClusterManagerServer clusterServer : clusterServers) { - ClusterAuthEventListenerTest clusterAuthEventListenerTest = new ClusterAuthEventListenerTest(); - clusterAuthEventListenerTests.add(clusterAuthEventListenerTest); - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, clusterAuthEventListenerTest); - - ClusterNoteEventListenerTest clusterNoteEventListenerTest = new ClusterNoteEventListenerTest(); - clusterNoteEventListenerTests.add(clusterNoteEventListenerTest); - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, clusterNoteEventListenerTest); - - ClusterIntpSettingEventListenerTest clusterIntpSettingEventListenerTest = new ClusterIntpSettingEventListenerTest(); - clusterIntpSettingEventListenerTests.add(clusterIntpSettingEventListenerTest); - clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC, clusterIntpSettingEventListenerTest); - - clusterServer.start(); - } - - LOGGER.info("startCluster <<<"); - } - - private void checkClusterNoteEventListener() { - for (ClusterNoteEventListenerTest clusterNoteEventListenerTest : clusterNoteEventListenerTests) { - assertNotNull(clusterNoteEventListenerTest.receiveMsg); - } - } - - private void checkClusterAuthEventListener() { - for (ClusterAuthEventListenerTest clusterAuthEventListenerTest : clusterAuthEventListenerTests) { - assertNotNull(clusterAuthEventListenerTest.receiveMsg); - } - } - - private void checkClusterIntpSettingEventListener() { - for (ClusterIntpSettingEventListenerTest clusterIntpSettingEventListenerTest : clusterIntpSettingEventListenerTests) { - assertNotNull(clusterIntpSettingEventListenerTest.receiveMsg); - } - } - - static boolean clusterIsStartup() { - for (ClusterManagerServer clusterServer : clusterServers) { - if (!clusterServer.raftInitialized()) { - LOGGER.warn("clusterServer not Initialized!"); - return false; - } - } - - return true; - } - - public static void getClusterServerMeta() { - LOGGER.info("getClusterServerMeta >>>"); - // Get metadata for all services - Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, ""); - LOGGER.info(srvMeta.toString()); - - Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, ""); - LOGGER.info(intpMeta.toString()); - - assertNotNull(srvMeta); - assertEquals(true, (srvMeta instanceof HashMap)); - HashMap hashMap = (HashMap) srvMeta; - - assertEquals(3, hashMap.size()); - - LOGGER.info("getClusterServerMeta <<< "); - } - - @Test - void testRenameNoteEvent() throws IOException { - String noteId = null; - try { - String oldName = "old_name"; - noteId = notebook.createNote(oldName, anonymous); - notebook.processNote(noteId, - note -> { - assertEquals(note.getName(), oldName); - return null; - }); - - final String newName = "testName"; - String jsonRequest = "{\"name\": " + newName + "}"; - - CloseableHttpResponse put = httpPut("/notebook/" + noteId + "/rename/", jsonRequest); - assertThat("test testRenameNote:", put, AbstractTestRestApi.isAllowed()); - put.close(); - - // wait cluster sync event - Thread.sleep(1000); - checkClusterNoteEventListener(); - - notebook.processNote(noteId, - note -> { - assertEquals(note.getName(), newName); - return null; - }); - - - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } finally { - // cleanup - if (null != noteId) { - notebook.removeNote(noteId, anonymous); - } - } - } - - @Test - void testCloneNoteEvent() throws IOException { - String note1Id = null; - String clonedNoteId = null; - try { - note1Id = notebook.createNote("note1", anonymous); - Thread.sleep(1000); - - CloseableHttpResponse post = httpPost("/notebook/" + note1Id, ""); - LOGGER.info("testCloneNote response\n" + post.getStatusLine().getReasonPhrase()); - assertThat(post, AbstractTestRestApi.isAllowed()); - - Map<String, Object> resp = gson.fromJson(EntityUtils.toString(post.getEntity(), StandardCharsets.UTF_8), - new TypeToken<Map<String, Object>>() {}.getType()); - clonedNoteId = (String) resp.get("body"); - post.close(); - Thread.sleep(1000); - - CloseableHttpResponse get = httpGet("/notebook/" + clonedNoteId); - assertThat(get, AbstractTestRestApi.isAllowed()); - Map<String, Object> resp2 = gson.fromJson(EntityUtils.toString(get.getEntity(), StandardCharsets.UTF_8), - new TypeToken<Map<String, Object>>() {}.getType()); - Map<String, Object> resp2Body = (Map<String, Object>) resp2.get("body"); - - get.close(); - - // wait cluster sync event - Thread.sleep(1000); - checkClusterNoteEventListener(); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } finally { - // cleanup - if (null != note1Id) { - notebook.removeNote(note1Id, anonymous); - } - if (null != clonedNoteId) { - notebook.removeNote(clonedNoteId, anonymous); - } - } - } - - @Test - void insertParagraphEvent() throws IOException { - String noteId = null; - try { - // Create note and set result explicitly - noteId = notebook.createNote("note1", anonymous); - notebook.processNote(noteId, - note -> { - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TEXT, "result"); - p1.setResult(result); - return null; - }); - - // insert new paragraph - NewParagraphRequest newParagraphRequest = new NewParagraphRequest("Test", null, null, null); - - CloseableHttpResponse post = - httpPost("/notebook/" + noteId + "/paragraph", gson.toJson(newParagraphRequest)); - LOGGER.info("test clear paragraph output response\n" - + EntityUtils.toString(post.getEntity(), StandardCharsets.UTF_8)); - assertThat(post, AbstractTestRestApi.isAllowed()); - post.close(); - - // wait cluster sync event - Thread.sleep(1000); - checkClusterNoteEventListener(); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } finally { - // cleanup - if (null != noteId) { - notebook.removeNote(noteId, anonymous); - } - } - } - - @Test - void testClusterAuthEvent() throws IOException { - String noteId = null; - - try { - noteId = notebook.createNote("note1", anonymous); - notebook.processNote(noteId, - note -> { - Paragraph p1 = note.addNewParagraph(anonymous); - p1.setText("%md start remote interpreter process"); - p1.setAuthenticationInfo(anonymous); - notebookServer.getNotebook().saveNote(note, anonymous); - return null; - }); - - - String user1Id = "user1", user2Id = "user2"; - - // test user1 can get anonymous's note - List<ParagraphInfo> paragraphList0 = null; - try { - paragraphList0 = notebookServer.getParagraphList(user1Id, noteId); - } catch (ServiceException e) { - LOGGER.error(e.getMessage(), e); - } catch (TException e) { - LOGGER.error(e.getMessage(), e); - } - assertNotNull(paragraphList0, user1Id + " can get anonymous's note"); - - // test user1 cannot get user2's note - authorizationService.setOwners(noteId, new HashSet<>(Arrays.asList(user2Id))); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - - authorizationService.setReaders(noteId, new HashSet<>(Arrays.asList(user2Id))); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - - authorizationService.setRunners(noteId, new HashSet<>(Arrays.asList(user2Id))); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - - authorizationService.setWriters(noteId, new HashSet<>(Arrays.asList(user2Id))); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - - Set<String> roles = new HashSet<>(Arrays.asList("admin")); - // set admin roles for both user1 and user2 - authorizationService.setRoles(user2Id, roles); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - - authorizationService.clearPermission(noteId); - // wait cluster sync event - Thread.sleep(1000); - checkClusterAuthEventListener(); - } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - } finally { - if (null != noteId) { - notebook.removeNote(noteId, anonymous); - } - } - } - - @Test - void testInterpreterEvent() throws IOException, InterruptedException { - // when: Create 1 interpreter settings `sh1` - String md1Name = "sh1"; - - String md1Dep = "org.apache.drill.exec:drill-jdbc:jar:1.7.0"; - - String reqBody1 = "{\"name\":\"" + md1Name + "\",\"group\":\"sh\"," + - "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", " + - "\"type\": \"textarea\"}}," + - "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.shell.ShellInterpreter\"," + - "\"name\":\"md\"}]," + - "\"dependencies\":[ {\n" + - " \"groupArtifactVersion\": \"" + md1Dep + "\",\n" + - " \"exclusions\":[]\n" + - " }]," + - "\"option\": { \"remote\": true, \"session\": false }}"; - CloseableHttpResponse post = httpPost("/interpreter/setting", reqBody1); - String postResponse = EntityUtils.toString(post.getEntity(), StandardCharsets.UTF_8); - LOGGER.info("testCreatedInterpreterDependencies create response\n" + postResponse); - InterpreterSetting created = convertResponseToInterpreterSetting(postResponse); - assertThat("test create method:", post, AbstractTestRestApi.isAllowed()); - post.close(); - - // 1. Call settings API - CloseableHttpResponse get = httpGet("/interpreter/setting"); - String rawResponse = EntityUtils.toString(get.getEntity(), StandardCharsets.UTF_8); - get.close(); - - // 2. Parsing to List<InterpreterSettings> - JsonObject responseJson = gson.fromJson(rawResponse, JsonElement.class).getAsJsonObject(); - JsonArray bodyArr = responseJson.getAsJsonArray("body"); - List<InterpreterSetting> settings = new Gson().fromJson(bodyArr, - new TypeToken<ArrayList<InterpreterSetting>>() { - }.getType()); - - // 3. Filter interpreters out we have just created - InterpreterSetting md1 = null; - for (InterpreterSetting setting : settings) { - if (md1Name.equals(setting.getName())) { - md1 = setting; - } - } - - // then: should get created interpreters which have different dependencies - - // 4. Validate each md interpreter has its own dependencies - assertEquals(1, md1.getDependencies().size()); - assertEquals(md1Dep, md1.getDependencies().get(0).getGroupArtifactVersion()); - Thread.sleep(1000); - checkClusterIntpSettingEventListener(); - - // 2. test update Interpreter - String rawRequest = "{\"name\":\"sh1\",\"group\":\"sh\"," + - "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", " + - "\"type\": \"textarea\"}}," + - "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\"," + - "\"name\":\"md\"}],\"dependencies\":[]," + - "\"option\": { \"remote\": true, \"session\": false }}"; - JsonObject jsonRequest = gson.fromJson(rawRequest, JsonElement.class).getAsJsonObject(); - - // when: call update setting API - JsonObject jsonObject = new JsonObject(); - jsonObject.addProperty("name", "propname2"); - jsonObject.addProperty("value", "this is new prop"); - jsonObject.addProperty("type", "textarea"); - jsonRequest.getAsJsonObject("properties").add("propname2", jsonObject); - CloseableHttpResponse put = - httpPut("/interpreter/setting/" + created.getId(), jsonRequest.toString()); - LOGGER.info("testSettingCRUD update response\n" - + EntityUtils.toString(put.getEntity(), StandardCharsets.UTF_8)); - // then: call update setting API - assertThat("test update method:", put, AbstractTestRestApi.isAllowed()); - put.close(); - Thread.sleep(1000); - checkClusterIntpSettingEventListener(); - - // 3: call delete setting API - CloseableHttpResponse delete = httpDelete("/interpreter/setting/" + created.getId()); - LOGGER.info("testSettingCRUD delete response\n" - + EntityUtils.toString(delete.getEntity(), StandardCharsets.UTF_8)); - // then: call delete setting API - assertThat("Test delete method:", delete, AbstractTestRestApi.isAllowed()); - delete.close(); - Thread.sleep(1000); - checkClusterIntpSettingEventListener(); - } - - private JsonObject getBodyFieldFromResponse(String rawResponse) { - JsonObject response = gson.fromJson(rawResponse, JsonElement.class).getAsJsonObject(); - return response.getAsJsonObject("body"); - } - - private InterpreterSetting convertResponseToInterpreterSetting(String rawResponse) { - return gson.fromJson(getBodyFieldFromResponse(rawResponse), InterpreterSetting.class); - } -}
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java+0 −38 removed@@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ClusterIntpSettingEventListenerTest implements ClusterEventListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterIntpSettingEventListenerTest.class); - - public String receiveMsg = null; - - @Override - public void onClusterEvent(String msg) { - receiveMsg = msg; - LOGGER.info("ClusterIntpSettingEventListenerTest#onClusterEvent : {}", msg); - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - assertNotNull(message); - } -}
zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java+0 −96 removed@@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.cluster; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.notebook.GsonNoteParser; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.NoteParser; -import org.apache.zeppelin.notebook.Paragraph; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.fail; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -public class ClusterNoteEventListenerTest implements ClusterEventListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterNoteEventListenerTest.class); - - public String receiveMsg = null; - - private ZeppelinConfiguration conf; - private NoteParser noteParser; - - @BeforeEach - void setup() { - conf = ZeppelinConfiguration.load(); - noteParser = new GsonNoteParser(conf); - } - - @Override - public void onClusterEvent(String msg) { - receiveMsg = msg; - LOGGER.debug("ClusterNoteEventListenerTest#onClusterEvent : {}", msg); - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - - Note note = null; - Paragraph paragraph = null; - Set<String> userAndRoles = null; - Map<String, Paragraph> userParagraphMap = null; - AuthenticationInfo authenticationInfo = null; - for (Map.Entry<String, String> entry : message.getData().entrySet()) { - String key = entry.getKey(); - String json = entry.getValue(); - if (key.equals("AuthenticationInfo")) { - authenticationInfo = AuthenticationInfo.fromJson(json); - LOGGER.debug(authenticationInfo.toJson()); - } else if (key.equals("Note")) { - try { - note = noteParser.fromJson(null, json); - LOGGER.debug(note.toJson()); - } catch (IOException e) { - LOGGER.warn("Fail to parse note json", e); - } - } else if (key.equals("Paragraph")) { - paragraph = noteParser.fromJson(json); - LOGGER.debug(paragraph.toJson()); - } else if (key.equals("Set<String>")) { - Gson gson = new Gson(); - userAndRoles = gson.fromJson(json, new TypeToken<Set<String>>() { - }.getType()); - LOGGER.debug(userAndRoles.toString()); - } else if (key.equals("Map<String, Paragraph>")) { - Gson gson = new Gson(); - userParagraphMap = gson.fromJson(json, new TypeToken<Map<String, Paragraph>>() { - }.getType()); - LOGGER.debug(userParagraphMap.toString()); - } else { - receiveMsg = null; - fail("Unknown clusterEvent : " + message.clusterEvent); - } - } - } -}
zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-common.interface.ts+0 −3 modified@@ -63,7 +63,6 @@ export interface ConfigurationsInfo { 'zeppelin.notebook.autoInterpreterBinding': string; 'zeppelin.config.storage.class': string; 'zeppelin.helium.node.installer.url': string; - 'zeppelin.cluster.heartbeat.interval': string; 'zeppelin.notebook.storage': string; 'zeppelin.notebook.new_format.convert': string; 'zeppelin.interpreter.dir': string; @@ -76,7 +75,6 @@ export interface ConfigurationsInfo { 'zeppelin.encoding': string; 'zeppelin.server.jetty.request.header.size': string; 'zeppelin.search.temp.path': string; - 'zeppelin.cluster.heartbeat.timeout': string; 'zeppelin.notebook.s3.endpoint': string; 'zeppelin.notebook.homescreen.hide': string; 'zeppelin.scheduler.threadpool.size': string; @@ -102,7 +100,6 @@ export interface ConfigurationsInfo { 'zeppelin.notebook.default.owner.username': string; 'zeppelin.home': string; 'zeppelin.interpreter.lifecyclemanager.timeout.threshold': string; - 'zeppelin.cluster.addr': string; 'zeppelin.notebook.git.remote.url': string; 'zeppelin.notebook.mongo.autoimport': string; 'zeppelin.notebook.one.way.sync': string;
zeppelin-zengine/pom.xml+7 −0 modified@@ -34,6 +34,7 @@ <properties> <!--library versions--> + <guava.version>32.0.0-jre</guava.version> <lucene.version>8.7.0</lucene.version> <org.reflections.version>0.9.8</org.reflections.version> <xml.apis.version>1.4.01</xml.apis.version> @@ -156,6 +157,12 @@ <version>${dropwizard.version}</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId>
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java+0 −9 modified@@ -138,8 +138,6 @@ public class InterpreterSetting { private transient RecoveryStorage recoveryStorage; private transient RemoteInterpreterEventServer interpreterEventServer; - public static final String CLUSTER_INTERPRETER_LAUNCHER_NAME = "ClusterInterpreterLauncher"; - /////////////////////////////////////////////////////////////////////////////////////////// /** @@ -784,8 +782,6 @@ public String getLauncherPlugin(Properties properties) { return "FlinkInterpreterLauncher"; } return "K8sStandardInterpreterLauncher"; - } else if (isRunningOnCluster()) { - return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME; } else if (isRunningOnDocker()) { return "DockerInterpreterLauncher"; } else { @@ -811,11 +807,6 @@ private boolean isRunningOnKubernetes() { return zConf.getRunMode() == ZeppelinConfiguration.RUN_MODE.K8S; } - - private boolean isRunningOnCluster() { - return zConf.isClusterMode(); - } - private boolean isRunningOnDocker() { return zConf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER; }
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java+1 −64 modified@@ -39,10 +39,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.event.ClusterEvent; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.dep.Dependency; @@ -99,16 +95,14 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.zeppelin.cluster.ClusterManagerServer.CLUSTER_INTP_SETTING_EVENT_TOPIC; - /** * InterpreterSettingManager is the component which manage all the interpreter settings. * (load/create/update/remove/get) * TODO(zjffdu) We could move it into another separated component. */ @ManagedObject("interpreterSettingManager") -public class InterpreterSettingManager implements NoteEventListener, ClusterEventListener { +public class InterpreterSettingManager implements NoteEventListener { private static final Pattern VALID_INTERPRETER_NAME = Pattern.compile("^[-_a-zA-Z0-9]+$"); private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class); @@ -837,8 +831,6 @@ public InterpreterSetting createNewSetting(String name, String group, InterpreterSetting interpreterSetting = null; try { interpreterSetting = inlineCreateNewSetting(name, group, dependencies, option, properties); - - broadcastClusterEvent(ClusterEvent.CREATE_INTP_SETTING, interpreterSetting); } catch (IOException e) { LOGGER.error(e.getMessage(), e); throw e; @@ -958,8 +950,6 @@ public void setPropertyAndRestart( throws InterpreterException, IOException { try { InterpreterSetting intpSetting = inlineSetPropertyAndRestart(id, option, properties, dependencies, true); - // broadcast cluster event - broadcastClusterEvent(ClusterEvent.UPDATE_INTP_SETTING, intpSetting); } catch (Exception e) { throw e; } @@ -1024,7 +1014,6 @@ public void remove(String id) throws IOException { // broadcast cluster event InterpreterSetting intpSetting = new InterpreterSetting(); intpSetting.setId(id); - broadcastClusterEvent(ClusterEvent.DELETE_INTP_SETTING, intpSetting); } } @@ -1221,56 +1210,4 @@ public void onParagraphStatusChange(Paragraph p, Job.Status status) { // do nothing } - @Override - public void onClusterEvent(String msg) { - LOGGER.debug("onClusterEvent : {}", msg); - - try { - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - String jsonIntpSetting = message.get("intpSetting"); - InterpreterSetting intpSetting = InterpreterSetting.fromJson(jsonIntpSetting); - String id = intpSetting.getId(); - String name = intpSetting.getName(); - String group = intpSetting.getGroup(); - InterpreterOption option = intpSetting.getOption(); - HashMap<String, InterpreterProperty> properties - = (HashMap<String, InterpreterProperty>) InterpreterSetting - .convertInterpreterProperties(intpSetting.getProperties()); - List<Dependency> dependencies = intpSetting.getDependencies(); - - switch (message.clusterEvent) { - case CREATE_INTP_SETTING: - inlineCreateNewSetting(name, group, dependencies, option, properties); - break; - case UPDATE_INTP_SETTING: - inlineSetPropertyAndRestart(id, option, properties, dependencies, false); - break; - case DELETE_INTP_SETTING: - inlineRemove(id, false); - break; - default: - LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); - break; - } - } catch (IOException e) { - LOGGER.error(e.getMessage(), e); - } catch (InterpreterException e) { - LOGGER.error(e.getMessage(), e); - } - } - - // broadcast cluster event - private void broadcastClusterEvent(ClusterEvent event, InterpreterSetting intpSetting) { - if (!zConf.isClusterMode()) { - return; - } - - String jsonIntpSetting = InterpreterSetting.toJson(intpSetting); - - ClusterMessage message = new ClusterMessage(event); - message.put("intpSetting", jsonIntpSetting); - String msg = ClusterMessage.serializeMessage(message); - ClusterManagerServer.getInstance(zConf).broadcastClusterEvent( - CLUSTER_INTP_SETTING_EVENT_TOPIC, msg); - } }
zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java+1 −87 modified@@ -17,13 +17,7 @@ package org.apache.zeppelin.notebook; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.cluster.ClusterManagerServer; -import org.apache.zeppelin.cluster.event.ClusterEvent; -import org.apache.zeppelin.cluster.event.ClusterEventListener; -import org.apache.zeppelin.cluster.event.ClusterMessage; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.storage.ConfigStorage; import org.apache.zeppelin.user.AuthenticationInfo; @@ -41,7 +35,7 @@ * This class is responsible for maintain notes authorization info. And provide api for * setting and querying note authorization info. */ -public class AuthorizationService implements ClusterEventListener { +public class AuthorizationService { private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizationService.class); private static final Set<String> EMPTY_SET = new HashSet<>(); @@ -150,9 +144,6 @@ public void setOwners(String noteId, Set<String> entities, boolean broadcast) th throw new IOException("No noteAuth found for noteId: " + noteId); } noteAuth.setOwners(entities); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.SET_OWNERS_PERMISSIONS, noteId, null, entities); - } } public void setReaders(String noteId, Set<String> entities, boolean broadcast) throws IOException { @@ -162,9 +153,6 @@ public void setReaders(String noteId, Set<String> entities, boolean broadcast) t throw new IOException("No noteAuth found for noteId: " + noteId); } noteAuth.setReaders(entities); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.SET_READERS_PERMISSIONS, noteId, null, entities); - } } public void setRunners(String noteId, Set<String> entities, boolean broadcast) throws IOException { @@ -174,9 +162,6 @@ public void setRunners(String noteId, Set<String> entities, boolean broadcast) t throw new IOException("No noteAuth found for noteId: " + noteId); } noteAuth.setRunners(entities); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.SET_RUNNERS_PERMISSIONS, noteId, null, entities); - } } public void setWriters(String noteId, Set<String> entities, boolean broadcast) throws IOException { @@ -186,9 +171,6 @@ public void setWriters(String noteId, Set<String> entities, boolean broadcast) t throw new IOException("No noteAuth found for noteId: " + noteId); } noteAuth.setWriters(entities); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.SET_WRITERS_PERMISSIONS, noteId, null, entities); - } } public void setRoles(String user, Set<String> roles, boolean broadcast) { @@ -198,9 +180,6 @@ public void setRoles(String user, Set<String> roles, boolean broadcast) { } roles = normalizeUsers(roles); userRoles.put(user, roles); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.SET_ROLES, null, user, roles); - } } public void clearPermission(String noteId, boolean broadcast) throws IOException { @@ -213,9 +192,6 @@ public void clearPermission(String noteId, boolean broadcast) throws IOException noteAuth.setWriters(new HashSet<>()); noteAuth.setOwners(new HashSet<>()); - if (broadcast) { - broadcastClusterEvent(ClusterEvent.CLEAR_PERMISSION, noteId, null, null); - } } public Set<String> getOwners(String noteId) { @@ -347,66 +323,4 @@ public boolean isPublic() { return zConf.isNotebookPublic(); } - @Override - public void onClusterEvent(String msg) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("onClusterEvent : {}", msg); - } - - ClusterMessage message = ClusterMessage.deserializeMessage(msg); - - String noteId = message.get("noteId"); - String user = message.get("user"); - String jsonSet = message.get("set"); - Gson gson = new Gson(); - Set<String> set = gson.fromJson(jsonSet, new TypeToken<Set<String>>() { - }.getType()); - - try { - switch (message.clusterEvent) { - case SET_READERS_PERMISSIONS: - setReaders(noteId, set, false); - break; - case SET_WRITERS_PERMISSIONS: - setWriters(noteId, set, false); - break; - case SET_OWNERS_PERMISSIONS: - setOwners(noteId, set, false); - break; - case SET_RUNNERS_PERMISSIONS: - setRunners(noteId, set, false); - break; - case SET_ROLES: - setRoles(user, set, false); - break; - case CLEAR_PERMISSION: - clearPermission(noteId, false); - break; - default: - LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg); - break; - } - } catch (IOException e) { - LOGGER.warn("Fail to broadcast msg", e); - } - } - - // broadcast cluster event - private void broadcastClusterEvent(ClusterEvent event, String noteId, - String user, Set<String> set) { - if (!zConf.isClusterMode()) { - return; - } - ClusterMessage message = new ClusterMessage(event); - message.put("noteId", noteId); - message.put("user", user); - - Gson gson = new Gson(); - String json = gson.toJson(set, new TypeToken<Set<String>>() { - }.getType()); - message.put("set", json); - String msg = ClusterMessage.serializeMessage(message); - ClusterManagerServer.getInstance(zConf).broadcastClusterEvent( - ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, msg); - } }
Vulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
6- github.com/apache/zeppelin/pull/4841ghsapatchWEB
- github.com/advisories/GHSA-7pgf-ppxw-8624ghsaADVISORY
- lists.apache.org/thread/moyym04993c8owh4h0qj98r43tbo8qddghsavendor-advisoryWEB
- nvd.nist.gov/vuln/detail/CVE-2024-41169ghsaADVISORY
- www.openwall.com/lists/oss-security/2025/07/13/1ghsaWEB
- issues.apache.org/jira/browse/ZEPPELIN-6101ghsaissue-trackingWEB
News mentions
0No linked articles in our index yet.