Critical severity9.8NVD Advisory· Published Jun 30, 2016· Updated May 6, 2026
CVE-2016-2141
CVE-2016-2141
Description
It was found that JGroups did not require necessary headers for encrypt and auth protocols from new nodes joining the cluster. An attacker could use this flaw to bypass security restrictions, and use this vulnerability to send and receive messages within the cluster, leading to information disclosure, message spoofing, or further possible attacks.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.jgroups:jgroupsMaven | >= 3.3.0.Alpha1, < 3.6.10.Final | 3.6.10.Final |
org.jgroups:jgroupsMaven | < 3.2.16.Final | 3.2.16.Final |
Patches
1eeaf5241cce4- Backport of JGRP-2055 to 3.2 branch (SYM/ASYM_ENCRYPT) JGRP-2074
40 files changed · +3802 −327
bin/jgroups.sh+1 −1 modified@@ -26,7 +26,7 @@ JG_FLAGS="-Dresolve.dns=false -Djgroups.bind_addr=$IP_ADDR -Djboss.tcpping.initi JG_FLAGS="$JG_FLAGS -Djava.net.preferIPv4Stack=true -Djgroups.timer.num_threads=4" FLAGS="-server -Xmx600M -Xms600M" FLAGS="$FLAGS -XX:CompileThreshold=10000 -XX:+AggressiveHeap -XX:ThreadStackSize=64K -XX:SurvivorRatio=8" -FLAGS="$FLAGS -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=31" +FLAGS="$FLAGS -XX:TargetSurvivorRatio=90" FLAGS="$FLAGS -Xshare:off" # JMX="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=7777 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" JMX="-Dcom.sun.management.jmxremote"
build.xml+37 −0 modified@@ -385,6 +385,43 @@ threadcount="5" /> </target> + <!--listeners="org.jgroups.util.JUnitXMLReporter"--> + + <target name="encrypt" description="Runs SYM_ENCRYPT_Test only" depends="define-testng-task"> + <mkdir dir="${tmp.dir}/test-results/xml/functional"/> + + <testng classpathref="jg.classpath" + suitename="encrypt-test" + groups="encr" + parallel="classes" + threadcount="5" + usedefaultlisteners="false" + outputdir="${tmp.dir}/test-results/xml/functional" + timeout="${unittest.timeout}" + timeOut="${unittest.timeout}" + verbose="1" + listeners="org.jgroups.util.JUnitXMLReporter" + configfailurepolicy="continue"> + + <classfileset dir="${compile.dir}"/> + + <jvmarg value="-Djgroups.bind_addr=${jgroups.bind_addr}"/> + <jvmarg value="-Djgroups.udp.ip_ttl=0"/> + <jvmarg value="-Djgroups.tcpping.initial_hosts=${jgroups.tcpping.initial_hosts}"/> + <jvmarg value="-Djgroups.tunnel.gossip_router_hosts=${jgroups.tunnel.gossip_router_hosts}"/> + <jvmarg value="-Dtests.tmp.dir=${tmp.dir}"/> + <jvmarg value="-Dlog4j.configuration=file:${conf.dir}/log4j.properties"/> + <jvmarg value="-Dlog4j.configurationFile=${conf.dir}/log4j2.xml"/> + <jvmarg value="-Djava.net.preferIPv4Stack=${java.net.preferIPv4Stack}"/> + <jvmarg value="-Djava.net.preferIPv6Addresses=${java.net.preferIPv6Addresses}"/> + <jvmarg value="-Xms400M"/> + <jvmarg value="-Xmx800M"/> + <!--<additional-args/>--> + </testng> + + + </target> + <target name="byteman" depends="postcompile,define-testng-task" description="Runs the byteman tests"> <mkdir dir="${tmp.dir}/test-results/xml/byteman"/>
conf/asym-encrypt.xml+29 −0 added@@ -0,0 +1,29 @@ + +<config xmlns="urn:org:jgroups" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> + <UDP /> + <PING/> + <MERGE3/> + <FD_ALL timeout="5000"/> + <FD_SOCK/> + <VERIFY_SUSPECT/> + + <!-- Asymmetric encryption using public/private encryption to fetch the shared secret key --> + <ASYM_ENCRYPT + encrypt_entire_message="true" + sym_keylength="128" + sym_algorithm="AES/ECB/PKCS5Padding" + asym_keylength="512" + asym_algorithm="RSA"/> + + <pbcast.NAKACK2/> + <UNICAST2/> + <pbcast.STABLE/> + <FRAG2/> + <!-- AUTH below is required by ASYM_ENCRYPT --> + <AUTH auth_class="org.jgroups.auth.MD5Token" + auth_value="chris" + token_hash="MD5"/> + <pbcast.GMS join_timeout="2000" /> +</config>
conf/jg-magic-map.xml+1 −1 modified@@ -24,7 +24,7 @@ <class id="53" name="org.jgroups.protocols.COMPRESS$CompressHeader"/> <class id="54" name="org.jgroups.protocols.FcHeader"/> <class id="56" name="org.jgroups.protocols.TpHeader"/> - <class id="57" name="org.jgroups.protocols.ENCRYPT$EncryptHeader"/> + <class id="57" name="org.jgroups.protocols.EncryptHeader"/> <class id="58" name="org.jgroups.protocols.SEQUENCER$SequencerHeader"/> <class id="59" name="org.jgroups.protocols.FD_SIMPLE$FdHeader"/> <class id="61" name="org.jgroups.protocols.FD_ALL$HeartbeatHeader"/>
conf/jg-protocol-ids.xml+4 −0 modified@@ -58,6 +58,10 @@ <class id="59" name="org.jgroups.protocols.SWIFT_PING"/> <class id="60" name="org.jgroups.protocols.relay.RELAY2"/> <class id="61" name="org.jgroups.protocols.FORWARD_TO_COORD"/> + <class id="71" name="org.jgroups.protocols.MERGE2"/> + <class id="75" name="org.jgroups.protocols.TP"/> + <class id="76" name="org.jgroups.protocols.SYM_ENCRYPT"/> + <class id="77" name="org.jgroups.protocols.ASYM_ENCRYPT"/> <!-- IDs reserved for building blocks --> <class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
conf/sym-encrypt.xml+30 −0 added@@ -0,0 +1,30 @@ + + + +<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="urn:org:jgroups" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"> + <UDP /> + <PING/> + <MERGE3/> + <FD_ALL timeout="5000"/> + <FD_SOCK/> + <VERIFY_SUSPECT/> + + <!-- Symmetric encryption with a keystore --> + <SYM_ENCRYPT + provider="SunJCE" + sym_algorithm="AES" + encrypt_entire_message="true" + keystore_name="/home/bela/JGroups/keystore/defaultStore.keystore" + store_password="changeit" alias="myKey"/> + <pbcast.NAKACK2/> + <UNICAST2/> + <pbcast.STABLE/> + <FRAG2/> + <!-- AUTH below is optional --> + <AUTH auth_class="org.jgroups.auth.MD5Token" + auth_value="chris" + token_hash="MD5"/> + <pbcast.GMS join_timeout="2000" /> +</config>
doc/design/MaliciousAttacks.txt+91 −0 added@@ -0,0 +1,91 @@ + +Use of encryption and authentication protocols to fend off malicious attacks +============================================================================ +Author: Bela Ban, April 2016 +JIRA: https://issues.jboss.org/browse/JGRP-2021 + +The following discussion refers to the changes made in JGroups 4.0. These have been backported to the 3.6 branch, but +the syntax looks different. However, the concepts are the same. + + + +Types of attacks handled +------------------------ +Malicious attacks essentially include (1) non-authorized nodes being able to join a cluster and (2) non-members being +able to communicate with cluster members. + +(1) is handled by AUTH which allows only authenticated nodes to join a cluster. + +(2) is handled by the encryption protocol (SYM_ENCRYPT or ASYM_ENCRYPT) which encrypts messages between cluster +members such that a non-member cannot understand them. + + + +Authentication +-------------- +Authentication is performed by AUTH. Its main use is to make sure only authenticated members can join a cluster. +Other scenarios where a check is performed are: +* Merging: make sure only authenticated members can merge into a new cluster +* View installation (if enabled): views and merge views can only be installed by authenticated members + +So authentication makes sure that rogue nodes will never be able to be members of a cluster, be it via joining or +merging. Note that while AUTH is optional with SYM_ENCRYPT, it is required by ASYM_ENCRYPT: there's a sanity check that +will prevent a member to start if ASYM_ENCRYPT is present but AUTH is absent. + + + +Authorization +------------- +There is currently no authorization in JGroups. Once a member is admitted to the cluster (via authentication), +it can send and receive messages to anyone. + + + +Encryption +---------- +This is based on a shared secret key that all members of a cluster have. The key is either acquired from a shared +keystore (symmetric encryption, below) or a new joiner fetches it from the coordinator via public/private key exchange +(asymmetric encryption, below). + +A sent message is encrypted with the shared secret key by the sender and decrypted with the same secret key by the +receiver(s). + +By default, the entire message (including the headers) is encrypted, but it is also possible to only encrypt the payload +(this is configurable). If the headers are not encrypted, it is possible to use replay attacks, because the +sequence numbers (seqnos) of a message are seen. For example, if a seqno is 50, then an attacker might copy the message, +and increment the seqno. This is prevented by copying and _signing_ the message. + +A message can be signed, which is a hash over the encrypted message, encrypted with the secret key. If the hash shipped +with a message doesn't match the hash computed over the received message, the message will be discarded by a receiver, +and no attempt is made to decrypt it. + +The cost of encrypting the entire message includes serializing the entire message (including headers, flags, destination +address etc) and encrypting it into the buffer of a new message (to the same destination). If message signing is enabled, +the cost of computing a hashcode and encrypting it is added to the above cost. + +Attributes present in both symmetric and asymmetric encryption include sign_msgs and encrypt_entire_message. + + +Symmetric encryption +-------------------- +This is done by SYM_ENCRYPT. The configuration includes mainly attributes that define the keystore, e.g. keystore_name +(name of the keystore, needs to be found on the classpath), store_password, key_password and alias. + + +Asymmetric encryption +--------------------- +Contrary to SYM_ENCRYPT, the secret key is not fetched from a shared keystore, but from the current coordinator C. After +new member P joined the cluster (passing the join check done by AUTH), P sends a request to get the secret key +(including P's public key) to C. + +C then sends the secret key back to P, encrypted with P's public key, and P decrypts it with its private key +and installs it. From then on, P encrypts and decrypts messages using the secret key. + +When a member leaves, C can optionally (based on change_key_on_leave) create a new secret key, and every cluster member +needs to fetch it again, using the public/private key exchange described above. + + + + + +
.gitignore+1 −0 modified@@ -2,6 +2,7 @@ *.iws *.ipr *.iml +.ant-targets-build.xml .project .classpath .settings/
src/org/jgroups/auth/AuthToken.java+10 −1 modified@@ -10,6 +10,7 @@ * Abstract AuthToken class used by implementations of AUTH, e.g. SimpleToken, X509Token * * @author Chris Mills + * @author Bela Ban */ public abstract class AuthToken implements Streamable { protected final Log log = LogFactory.getLog(this.getClass()); @@ -21,7 +22,10 @@ public void setAuth(AUTH auth) { this.auth = auth; } - public void init() {} + public void init() throws Exception {} + public void start() throws Exception {} + public void stop() {} + public void destroy() {} /** * Used to return the full package and class name of the implementation. This is used by the @@ -31,6 +35,11 @@ public void init() {} */ public abstract String getName(); + + /** The size of the marshalled AuthToken */ + public abstract int size(); + + /** * This method should be implemented to perform the actual authentication of joining members. *
src/org/jgroups/auth/FixedMembershipToken.java+52 −77 modified@@ -1,42 +1,25 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2005, JBoss Inc., and individual contributors as indicated - * by the @authors tag. See the copyright.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ package org.jgroups.auth; import org.jgroups.Event; import org.jgroups.Message; import org.jgroups.PhysicalAddress; import org.jgroups.annotations.Property; +import org.jgroups.stack.IpAddress; +import org.jgroups.util.Bits; import org.jgroups.util.Util; import java.io.DataInput; import java.io.DataOutput; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; /** * <p> - * The FixedMemberShipToken object predefines a list of IP addresses and ports that can join the - * group. + * The FixedMemberShipToken object predefines a list of IP addresses and ports that can join the group. * </p> * <p> * Configuration parameters for this example are shown below: @@ -50,15 +33,19 @@ * @author Chris Mills (millsy@jboss.com) */ public class FixedMembershipToken extends AuthToken { - private List<String> memberList = null; - private String token = "emptyToken"; + private final List<InetSocketAddress> memberList = new ArrayList<InetSocketAddress>(); + private String token = "emptyToken"; @Property private String fixed_members_seperator = ","; public FixedMembershipToken() { } + public FixedMembershipToken(String token) { + this.token=token; + } + public String getName() { return "org.jgroups.auth.FixedMembershipToken"; } @@ -68,82 +55,70 @@ public void setFixedMembersSeparator(String value) { fixed_members_seperator = value; } + /** Check if I'm in memberList, too */ + /*public void start() throws Exception { + super.start(); + IpAddress self=(IpAddress)auth.getPhysicalAddress(); + if(!isInMembersList(self)) + throw new IllegalStateException("own physical address " + self + " is not in members (" + memberList + ")"); + } */ + public boolean authenticate(AuthToken token, Message msg) { if ((token != null) && (token instanceof FixedMembershipToken) && (this.memberList != null)) { - PhysicalAddress src = (PhysicalAddress) auth.down(new Event(Event.GET_PHYSICAL_ADDRESS, - msg.getSrc())); + PhysicalAddress src = (PhysicalAddress) auth.down(new Event(Event.GET_PHYSICAL_ADDRESS, msg.getSrc())); if (src == null) { - if (log.isErrorEnabled()) - log.error("didn't find physical address for " + msg.getSrc()); + log.error(Util.getMessage("DidnTFindPhysicalAddressFor") + msg.getSrc()); return false; } + return isInMembersList((IpAddress)src); + } - String sourceAddressWithPort = src.toString(); - String sourceAddressWithoutPort = sourceAddressWithPort.substring(0, - sourceAddressWithPort.indexOf(":")); - - if (log.isDebugEnabled()) { - log.debug("AUTHToken received from " + sourceAddressWithPort); - } + if (log.isWarnEnabled()) + log.warn("Invalid AuthToken instance - wrong type or null"); + return false; + } - for (String member : memberList) { - if (hasPort(member)) { - if (member.equals(sourceAddressWithPort)) - return true; - } else { - if (member.equals(sourceAddressWithoutPort)) - return true; - } - } + public boolean isInMembersList(IpAddress sender) { + if(memberList == null || sender == null) return false; - } - if (log.isWarnEnabled()) { - log.warn("Invalid AuthToken instance - wrong type or null"); + for(InetSocketAddress addr: memberList) { + if(match(sender, addr)) + return true; } return false; } - private static boolean hasPort(String member) { - return member.contains(":"); + public static boolean match(IpAddress sender, InetSocketAddress addr) { + return !(sender == null || addr == null) + && addr.getAddress().equals(sender.getIpAddress()) + && (addr.getPort() == 0 || addr.getPort() == sender.getPort()); } + @Property(name = "fixed_members_value") - public void setMemberList(String list) { - memberList = new ArrayList<String>(); + public void setMemberList(String list) throws UnknownHostException { + memberList.clear(); StringTokenizer memberListTokenizer = new StringTokenizer(list, fixed_members_seperator); while (memberListTokenizer.hasMoreTokens()) { - memberList.add(memberListTokenizer.nextToken().replace('/', ':')); + String tmp=memberListTokenizer.nextToken().trim(); + int index=tmp.lastIndexOf('/'); + int port=index != -1? Integer.parseInt(tmp.substring(index+1)) : 0; + String addr_str=index != -1? tmp.substring(0, index) : tmp; + InetAddress addr=InetAddress.getByName(addr_str); + memberList.add(new InetSocketAddress(addr, port)); } } - /** - * Required to serialize the object to pass across the wire - * - * - * - * @param out - * @throws java.io.IOException - */ public void writeTo(DataOutput out) throws Exception { - if (log.isDebugEnabled()) { - log.debug("SimpleToken writeTo()"); - } - Util.writeString(this.token, out); + Bits.writeString(this.token,out); } - /** - * Required to deserialize the object when read in from the wire - * - * - * - * @param in - * @throws Exception - */ public void readFrom(DataInput in) throws Exception { - if (log.isDebugEnabled()) { - log.debug("SimpleToken readFrom()"); - } - this.token = Util.readString(in); + this.token = Bits.readString(in); + } + + public int size() { + return Util.size(token); } }
src/org/jgroups/auth/MD5Token.java+15 −31 modified@@ -1,12 +1,13 @@ package org.jgroups.auth; -import java.io.DataInput; -import java.io.DataOutput; - import org.jgroups.Message; import org.jgroups.annotations.Property; +import org.jgroups.util.Bits; import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; + /** * <p> * This is an example of using a preshared token that is encrypted using an MD5/SHA hash for @@ -26,7 +27,7 @@ */ public class MD5Token extends AuthToken { - @Property + @Property(exposeAsManagedAttribute=false) private String auth_value = null; @Property(name = "token_hash") @@ -84,9 +85,7 @@ private String hash(String token) { if (hashedToken == null) { // failed to encrypt - if (log.isWarnEnabled()) { - log.warn("Failed to hash token - sending in clear text"); - } + log.warn("Failed to hash token - sending in clear text"); return token; } return hashedToken; @@ -98,38 +97,23 @@ public boolean authenticate(AuthToken token, Message msg) { // Found a valid Token to authenticate against MD5Token serverToken = (MD5Token) token; - if ((this.auth_value != null) && (serverToken.auth_value != null) - && (this.auth_value.equalsIgnoreCase(serverToken.auth_value))) { - // validated - if (log.isDebugEnabled()) { - log.debug("MD5Token match"); - } - return true; - } else { - // if(log.isWarnEnabled()){ - // log.warn("Authentication failed on MD5Token"); - // } - return false; - } + return (this.auth_value != null) && (serverToken.auth_value != null) + && (this.auth_value.equalsIgnoreCase(serverToken.auth_value)); } - if (log.isWarnEnabled()) { - log.warn("Invalid AuthToken instance - wrong type or null"); - } + log.warn("Invalid AuthToken instance - wrong type or null"); return false; } public void writeTo(DataOutput out) throws Exception { - if (log.isDebugEnabled()) { - log.debug("MD5Token writeTo()"); - } - Util.writeString(this.auth_value, out); + Bits.writeString(this.auth_value,out); } public void readFrom(DataInput in) throws Exception { - if (log.isDebugEnabled()) { - log.debug("MD5Token readFrom()"); - } - this.auth_value = Util.readString(in); + this.auth_value = Bits.readString(in); + } + + public int size() { + return Util.size(this.auth_value); } }
src/org/jgroups/auth/RegexMembership.java+6 −22 modified@@ -1,24 +1,4 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2005, JBoss Inc., and individual contributors as indicated - * by the @authors tag. See the copyright.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ + package org.jgroups.auth; import org.jgroups.Address; @@ -64,7 +44,7 @@ public String getName() { } - public void init() { + public void init() throws Exception { super.init(); if(!match_ip_address && !match_logical_name) throw new IllegalArgumentException("either match_ip_address or match_logical_address has to be true"); @@ -118,4 +98,8 @@ public void writeTo(DataOutput out) throws Exception { */ public void readFrom(DataInput in) throws Exception { } + + public int size() { + return 0; + } }
src/org/jgroups/auth/SimpleToken.java+15 −7 modified@@ -1,13 +1,13 @@ package org.jgroups.auth; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - import org.jgroups.Message; import org.jgroups.annotations.Property; +import org.jgroups.util.Bits; import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; + /** * <p> * This is an example of using a preshared token for authentication purposes. All members of the @@ -25,7 +25,7 @@ */ public class SimpleToken extends AuthToken { - @Property + @Property(exposeAsManagedAttribute=false) private String auth_value = null; public SimpleToken() { // need an empty constructor @@ -85,7 +85,7 @@ public void writeTo(DataOutput out) throws Exception { if (log.isDebugEnabled()) { log.debug("SimpleToken writeTo()"); } - Util.writeString(this.auth_value, out); + Bits.writeString(this.auth_value,out); } /** @@ -100,6 +100,14 @@ public void readFrom(DataInput in) throws Exception { if (log.isDebugEnabled()) { log.debug("SimpleToken readFrom()"); } - this.auth_value = Util.readString(in); + this.auth_value = Bits.readString(in); + } + + public int size() { + return Util.size(auth_value); + } + + public String toString() { + return "auth_value=" + auth_value; } } \ No newline at end of file
src/org/jgroups/auth/X509Token.java+38 −30 modified@@ -1,25 +1,21 @@ package org.jgroups.auth; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.security.InvalidKeyException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.PrivateKey; -import java.security.UnrecoverableEntryException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; +import org.jgroups.Message; +import org.jgroups.annotations.Property; +import org.jgroups.util.Util; import javax.crypto.BadPaddingException; import javax.crypto.Cipher; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; - -import org.jgroups.Message; -import org.jgroups.annotations.Property; -import org.jgroups.util.Util; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.IOException; +import java.security.*; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; /** * <p> @@ -60,19 +56,19 @@ public class X509Token extends AuthToken { private boolean valueSet = false; @Property - private String keystore_type = "JKS"; + protected String keystore_type = "JKS"; @Property - private String cert_alias = null; + protected String cert_alias = null; @Property - private String keystore_path = null; + protected String keystore_path = null; - @Property - private String auth_value = null; + @Property(exposeAsManagedAttribute=false) + protected String auth_value = null; @Property - private String cipher_type = "RSA"; + protected String cipher_type = "RSA"; private byte[] encryptedToken = null; @@ -87,26 +83,32 @@ public X509Token() { // need an empty constructor } - @Property(name = "cert_password") + @Property(name = "cert_password",exposeAsManagedAttribute=false) public void setCertPassword(String pwd) { this.cert_password = pwd.toCharArray(); } - @Property(name = "keystore_password") + @Property(name = "keystore_password",exposeAsManagedAttribute=false) public void setKeyStorePassword(String pwd) { this.keystore_password = pwd.toCharArray(); if (cert_password == null) cert_password = keystore_password; } + /** To be used for testing only */ + public X509Token encryptedToken(byte[] buf) { + encryptedToken=buf; + return this; + } + public String getName() { return "org.jgroups.auth.X509Token"; } public boolean authenticate(AuthToken token, Message msg) { if (!this.valueSet) { if (log.isErrorEnabled()) { - log.error("X509Token not setup correctly - check token attrs"); + log.error(Util.getMessage("X509TokenNotSetupCorrectlyCheckTokenAttrs")); } return false; } @@ -116,7 +118,7 @@ public boolean authenticate(AuthToken token, Message msg) { X509Token serverToken = (X509Token) token; if (!serverToken.valueSet) { if (log.isErrorEnabled()) { - log.error("X509Token - recieved token not valid"); + log.error(Util.getMessage("X509TokenReceivedTokenNotValid")); } return false; } @@ -160,19 +162,25 @@ public void readFrom(DataInput in) throws Exception { this.valueSet = true; } + public int size() { + return Util.size(encryptedToken); + } + /** * Used during setup to get the certification from the keystore and encrypt the auth_value with * the private key - * - * @return true if the certificate was found and the string encypted correctly otherwise returns - * false */ public void setCertificate() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, UnrecoverableEntryException { KeyStore store = KeyStore.getInstance(this.keystore_type); - java.io.FileInputStream fis = new java.io.FileInputStream(this.keystore_path); - store.load(fis, this.keystore_password); + InputStream inputStream=null; + inputStream=Thread.currentThread() + .getContextClassLoader() + .getResourceAsStream(this.keystore_path); + if(inputStream == null) + inputStream=new FileInputStream(this.keystore_path); + store.load(inputStream, this.keystore_password); this.cipher = Cipher.getInstance(this.cipher_type); this.certificate = (X509Certificate) store.getCertificate(this.cert_alias);
src/org/jgroups/Channel.java+2 −0 modified@@ -397,6 +397,8 @@ public synchronized void clearChannelListeners() { * * */ public void setReceiver(Receiver r) { + if(receiver != null && r != null) + getLog().warn(String.format("%s: receiver already set", getAddress())); receiver=r; }
src/org/jgroups/demos/KeyStoreGenerator.java+14 −3 modified@@ -3,12 +3,12 @@ import org.jgroups.util.Util; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; import java.io.FileOutputStream; import java.io.OutputStream; import java.security.KeyStore; - -import javax.crypto.KeyGenerator; -import javax.crypto.SecretKey; +import java.security.NoSuchAlgorithmException; /** * Generates a keystore file that has a SecretKey in it. It is not possible to @@ -126,6 +126,17 @@ public static SecretKey initSymKey() throws Exception { } + public static SecretKey createSecretKey() throws Exception { + return createSecretKey(symAlg, keySize); + } + + public static SecretKey createSecretKey(String sym_alg, int key_size) throws NoSuchAlgorithmException { + // KeyGenerator keyGen=KeyGenerator.getInstance(getAlgorithm(sym_alg)); + KeyGenerator keyGen=KeyGenerator.getInstance(sym_alg); + keyGen.init(key_size); + return keyGen.generateKey(); + } + private static String getAlgorithm(String s) { int index=s.indexOf("/"); if(index == -1)
src/org/jgroups/Event.java+4 −6 modified@@ -77,14 +77,12 @@ public Event(int type, Object arg) { this.arg=arg; } - public final int getType() { + public final int getType() { return type; } - - - public Object getArg() { - return arg; - } + public int type() {return type;} + public Object getArg() {return arg;} + public <T extends Object> T arg() {return (T)arg;}
src/org/jgroups/protocols/ASYM_ENCRYPT.java+396 −0 added@@ -0,0 +1,396 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.annotations.MBean; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.ManagedOperation; +import org.jgroups.annotations.Property; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.util.AsciiString; +import org.jgroups.util.Util; + +import javax.crypto.Cipher; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import java.security.*; +import java.security.spec.X509EncodedKeySpec; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * Encrypts and decrypts communication in JGroups by using a secret key distributed to all cluster members by the + * key server (coordinator) using asymmetric (public/private key) encryption.<br> + * + * The secret key is identical for all cluster members and is used to encrypt messages when sending and decrypt them + * when receiving messages. + * + * This protocol is typically placed under {@link org.jgroups.protocols.pbcast.NAKACK2}, so that most important + * headers are encrypted as well, to prevent replay attacks.<br> + * + * The current keyserver (always the coordinator) generates a secret key. When a new member joins, it asks the keyserver + * for the secret key. The keyserver encrypts the secret key with the joiner's public key and the joiner decrypts it with + * its private key and then installs it and starts encrypting and decrypting messages with the secret key.<br> + * + * View changes that identify a new keyserver will result in a new secret key being generated and then distributed to + * all cluster members. This overhead can be substantial in an application with a reasonable member churn.<br> + * + * This protocol is suited to an application that does not ship with a known key but instead it is generated and + * distributed by the keyserver. + * + * Since messages can only get encrypted and decrypted when the secret key was received from the keyserver, messages + * other then join and merge requests/responses are dropped when the secret key isn't yet available. Join and merge + * requests / responses are handled by {@link AUTH}. + * + * @author Bela Ban + * @author Steve Woodcock + */ +@MBean(description="Asymmetric encryption protocol. The secret key for encryption and decryption of messages is fetched " + + "from a key server (the coordinator) via asymmetric encryption") +public class ASYM_ENCRYPT extends EncryptBase { + protected static final short GMS_ID=ClassConfigurator.getProtocolId(GMS.class); + + @Property(description="When a member leaves the view, change the secret key, preventing old members from eavesdropping", + writable=false) + protected boolean change_key_on_leave=true; + protected volatile Address key_server_addr; + @ManagedAttribute(description="True if this member is the current key server, false otherwise") + protected volatile boolean is_key_server; + protected KeyPair key_pair; // to store own's public/private Key + protected Cipher asym_cipher; // decrypting cypher for secret key requests + // queue all up msgs until the secret key has been received/created + @ManagedAttribute(description="whether or not to queue received messages (until the secret key was received)") + protected volatile boolean queue_up_msgs=true; + // queues a bounded number of messages received during a null secret key (or fetching the key from a new coord) + protected final BlockingQueue<Message> up_queue=new ArrayBlockingQueue<Message>(100); + + protected volatile long last_key_request; + + + public KeyPair keyPair() {return key_pair;} + public Cipher asymCipher() {return asym_cipher;} + public Address keyServerAddr() {return key_server_addr;} + public ASYM_ENCRYPT keyServerAddr(Address key_srv) {this.key_server_addr=key_srv; return this;} + + @ManagedAttribute(description="Number of received messages currently queued") + public int numQueuedMessages() {return up_queue.size();} + + @ManagedOperation(description="Triggers a request for the secret key to the current keyserver") + public void sendKeyRequest() { + if(key_server_addr == null) { + log.error(String.format("%s: key server is currently not set", local_addr)); + return; + } + sendKeyRequest(key_server_addr); + } + + public void init() throws Exception { + initKeyPair(); + super.init(); + } + + public void stop() { + drainUpQueue(); + super.stop(); + } + + public Object down(Event evt) { + if(evt.type() == Event.MSG) { + Message msg=evt.arg(); + if(skip(msg)) + return down_prot.down(evt); + } + return super.down(evt); + } + + public Object up(Event evt) { + if(evt.type() == Event.MSG) { + Message msg=evt.arg(); + if(skip(msg)) + return up_prot.up(evt); + } + return super.up(evt); + } + + + + /** Checks if a message needs to be encrypted/decrypted. Join and merge requests/responses don't need to be + * encrypted as they're authenticated by {@link AUTH} */ + protected static boolean skip(Message msg) { + GMS.GmsHeader hdr=(GMS.GmsHeader)msg.getHeader(GMS_ID); + if(hdr == null) return false; + switch(hdr.getType()) { + case GMS.GmsHeader.JOIN_REQ: + case GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER: + case GMS.GmsHeader.JOIN_RSP: + case GMS.GmsHeader.MERGE_REQ: + case GMS.GmsHeader.MERGE_RSP: + case GMS.GmsHeader.VIEW_ACK: + case GMS.GmsHeader.INSTALL_MERGE_VIEW: + return true; + } + return false; + } + + + @Override protected Object handleUpEvent(Message msg, EncryptHeader hdr) { + switch(hdr.type()) { + case EncryptHeader.SECRET_KEY_REQ: + handleSecretKeyRequest(msg); + break; + case EncryptHeader.SECRET_KEY_RSP: + handleSecretKeyResponse(msg, hdr.version()); + break; + default: + log.warn(String.format("%s: received unknown encrypt header of type %d", local_addr, hdr.type())); + break; + } + return null; + } + + @Override protected boolean process(Message msg) { + if(queue_up_msgs || secret_key == null) { + up_queue.offer(msg); + log.trace(String.format("%s: queuing %s message from %s as secret key hasn't been retrieved from keyserver %s yet, hdrs: %s", + local_addr, msg.dest() == null? "mcast" : "unicast", msg.src(), key_server_addr, msg.printHeaders())); + if(last_key_request == 0 || System.currentTimeMillis() - last_key_request > 2000) { + last_key_request=System.currentTimeMillis(); + sendKeyRequest(); + } + return false; + } + return true; + } + + protected void handleSecretKeyRequest(final Message msg) { + if(!inView(msg.src(), "%s: key requester %s is not in current view %s; ignoring key request")) + return; + log.debug(String.format("%s: received key request from %s", local_addr, msg.getSrc())); + try { + PublicKey tmpKey=generatePubKey(msg.getBuffer()); + sendSecretKey(secret_key, tmpKey, msg.getSrc()); + } + catch(Exception e) { + log.warn(String.format("%s: unable to reconstitute peer's public key", local_addr)); + } + } + + + protected void handleSecretKeyResponse(final Message msg, final byte[] key_version) { + if(!inView(msg.src(), "%s: ignoring secret key sent by %s which is not in current view %s")) + return; + try { + SecretKey tmp=decodeKey(msg.getBuffer()); + if(tmp == null) + sendKeyRequest(key_server_addr); // unable to understand response, let's try again + else { + // otherwise set the returned key as the shared key + log.debug(String.format("%s: received secret key from keyserver %s", local_addr, msg.getSrc())); + setKeys(tmp, key_version); + } + } + catch(Exception e) { + log.warn(local_addr + ": unable to process received public key", e); + } + } + + + /** Initialise the symmetric key if none is supplied in a keystore */ + protected SecretKey createSecretKey() throws Exception { + KeyGenerator keyGen=null; + // see if we have a provider specified + if(provider != null && !provider.trim().isEmpty()) + keyGen=KeyGenerator.getInstance(getAlgorithm(sym_algorithm), provider); + else + keyGen=KeyGenerator.getInstance(getAlgorithm(sym_algorithm)); + // generate the key using the defined init properties + keyGen.init(sym_keylength); + return keyGen.generateKey(); + } + + + + /** Generates the public/private key pair from the init params */ + protected void initKeyPair() throws Exception { + // generate keys according to the specified algorithms + // generate publicKey and Private Key + KeyPairGenerator KpairGen=null; + if(provider != null && !provider.trim().isEmpty()) + KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asym_algorithm), provider); + else + KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asym_algorithm)); + KpairGen.initialize(asym_keylength,new SecureRandom()); + key_pair=KpairGen.generateKeyPair(); + + // set up the Cipher to decrypt secret key responses encrypted with our key + if(provider != null && !provider.trim().isEmpty()) + asym_cipher=Cipher.getInstance(asym_algorithm, provider); + else + asym_cipher=Cipher.getInstance(asym_algorithm); + asym_cipher.init(Cipher.DECRYPT_MODE, key_pair.getPrivate()); + } + + + @Override protected synchronized void handleView(View v) { + boolean left_mbrs=change_key_on_leave && this.view != null && !v.containsMembers(this.view.getMembersRaw()); + super.handleView(v); + + Address tmpKeyServer=v.getCoord(); // the coordinator is the keyserver + if(tmpKeyServer.equals(local_addr)) { + if(!is_key_server || left_mbrs) + becomeKeyServer(tmpKeyServer, left_mbrs); + } + else + handleNewKeyServer(tmpKeyServer, v instanceof MergeView, left_mbrs); + } + + + protected void becomeKeyServer(Address tmpKeyServer, boolean left_mbrs) { + if(log.isDebugEnabled()) { + if(!is_key_server) + log.debug(String.format("%s: I'm the new key server", local_addr)); + else if(left_mbrs) + log.debug(String.format("%s: creating new secret key because members left", local_addr)); + } + key_server_addr=tmpKeyServer; + is_key_server=true; + try { + this.secret_key=createSecretKey(); + initSymCiphers(sym_algorithm, secret_key); + drainUpQueue(); + } + catch(Exception ex) { + log.error(local_addr + ": failed creating secret key and initializing ciphers", ex); + } + } + + /** If the keyserver changed, send a request for the secret key to the keyserver */ + protected void handleNewKeyServer(Address newKeyServer, boolean merge_view, boolean left_mbrs) { + if(keyServerChanged(newKeyServer) || merge_view || left_mbrs) { + secret_key=null; + sym_version=null; + queue_up_msgs=true; + key_server_addr=newKeyServer; + is_key_server=false; + log.debug(String.format("%s: sending request for secret key to the new keyserver %s", local_addr, key_server_addr)); + sendKeyRequest(key_server_addr); + } + } + + protected boolean keyServerChanged(Address newKeyServer) { + return !equals(key_server_addr, newKeyServer); + } + + + + protected void setKeys(SecretKey key, byte[] version) throws Exception { + if(Arrays.equals(this.sym_version, version)) + return; + Cipher decoding_cipher=secret_key != null? decoding_ciphers.take() : null; + // put the previous key into the map, keep the cipher: no leak, as we'll clear decoding_ciphers in initSymCiphers() + if(decoding_cipher != null) + key_map.put(new AsciiString(version), decoding_cipher); + secret_key=key; + initSymCiphers(key.getAlgorithm(), key); + sym_version=version; + drainUpQueue(); + } + + + protected void sendSecretKey(SecretKey secret_key, PublicKey public_key, Address source) throws Exception { + byte[] encryptedKey=encryptSecretKey(secret_key, public_key); + Message newMsg=new Message(source, local_addr, encryptedKey) + .putHeader(this.id, new EncryptHeader(EncryptHeader.SECRET_KEY_RSP, symVersion())); + log.debug(String.format("%s: sending secret key to %s", local_addr, source)); + down_prot.down(new Event(Event.MSG,newMsg)); + } + + /** Encrypts the current secret key with the requester's public key (the requester will decrypt it with its private key) */ + protected byte[] encryptSecretKey(SecretKey secret_key, PublicKey public_key) throws Exception { + Cipher tmp; + if (provider != null && !provider.trim().isEmpty()) + tmp=Cipher.getInstance(asym_algorithm, provider); + else + tmp=Cipher.getInstance(asym_algorithm); + tmp.init(Cipher.ENCRYPT_MODE, public_key); + + // encrypt current secret key + return tmp.doFinal(secret_key.getEncoded()); + } + + + /** send client's public key to server and request server's public key */ + protected void sendKeyRequest(Address key_server) { + Message newMsg=new Message(key_server, local_addr, key_pair.getPublic().getEncoded()) + .putHeader(this.id,new EncryptHeader(EncryptHeader.SECRET_KEY_REQ, sym_version)); + down_prot.down(new Event(Event.MSG,newMsg)); + } + + + protected SecretKeySpec decodeKey(byte[] encodedKey) throws Exception { + byte[] keyBytes; + + synchronized(this) { + keyBytes=asym_cipher.doFinal(encodedKey); + } + + try { + SecretKeySpec keySpec=new SecretKeySpec(keyBytes, getAlgorithm(sym_algorithm)); + Cipher temp; + if (provider != null && !provider.trim().isEmpty()) + temp=Cipher.getInstance(sym_algorithm, provider); + else + temp=Cipher.getInstance(sym_algorithm); + temp.init(Cipher.SECRET_KEY, keySpec); + return keySpec; + } + catch(Exception e) { + log.error(Util.getMessage("FailedDecodingKey"), e); + return null; + } + } + + // doesn't have to be 100% correct: leftover messages wll be delivered later and will be discarded as dupes, as + // retransmission is likely to have kicked in before anyway + protected void drainUpQueue() { + queue_up_msgs=false; + Message queued_msg; + while((queued_msg=up_queue.poll()) != null) { + try { + Message decrypted_msg=decryptMessage(null, queued_msg.copy()); + if(decrypted_msg != null) + up_prot.up(new Event(Event.MSG, decrypted_msg)); + } + catch(Exception ex) { + log.error(String.format("failed decrypting message from %s: %s", queued_msg.src(), ex)); + } + } + } + + + @Override protected void handleUnknownVersion() { + if(!is_key_server) + sendKeyRequest(key_server_addr); + } + + /** Used to reconstitute public key sent in byte form from peer */ + protected PublicKey generatePubKey(byte[] encodedKey) { + PublicKey pubKey=null; + try { + KeyFactory KeyFac=KeyFactory.getInstance(getAlgorithm(asym_algorithm)); + X509EncodedKeySpec x509KeySpec=new X509EncodedKeySpec(encodedKey); + pubKey=KeyFac.generatePublic(x509KeySpec); + } + catch(Exception e) { + e.printStackTrace(); + } + return pubKey; + } + + protected static boolean equals(Object a, Object b) { + return (a == b) || (a != null && a.equals(b)); + } + +}
src/org/jgroups/protocols/AuthHeader.java+70 −22 modified@@ -1,46 +1,94 @@ package org.jgroups.protocols; +import org.jgroups.Global; import org.jgroups.Header; import org.jgroups.auth.AuthToken; -import org.jgroups.util.Util; +import org.jgroups.conf.ClassConfigurator; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; /** * AuthHeader is a holder object for the token that is passed from the joiner to the coordinator * @author Chris Mills + * @author Bela Ban */ public class AuthHeader extends Header { - private AuthToken token=null; + protected AuthToken token=null; - public AuthHeader(){ - } - /** - * Sets the token value to that of the passed in token object - * @param token the new authentication token - */ - public void setToken(AuthToken token){ - this.token = token; + + public AuthHeader() { } - /** - * Used to get the token from the AuthHeader - * @return the token found inside the AuthHeader - */ - public AuthToken getToken(){ - return this.token; + public AuthHeader(AuthToken token) { + this.token=token; } + public void setToken(AuthToken token) {this.token = token;} + public AuthToken getToken() {return this.token;} + public AuthHeader token(AuthToken token) {this.token=token; return this;} + public AuthToken token() {return this.token;} + + public void writeTo(DataOutput out) throws Exception { - Util.writeAuthToken(this.token, out); + writeAuthToken(out, token); } public void readFrom(DataInput in) throws Exception { - this.token = Util.readAuthToken(in); + this.token=readAuthToken(in); } - public int size(){ - //need to fix this - return Util.sizeOf(this); + + public int size() { + return sizeOf(token); } + + public String toString() { + return "token=" + token; + } + + + protected static void writeAuthToken(DataOutput out, AuthToken tok) throws Exception { + out.writeByte(tok == null? 0 : 1); + if(tok == null) return; + short id=ClassConfigurator.getMagicNumber(tok.getClass()); + out.writeShort(id); + if(id < 0) { + String classname=tok.getClass().getName(); + out.writeUTF(classname); + } + tok.writeTo(out); + } + + protected static AuthToken readAuthToken(DataInput in) throws Exception { + if(in.readByte() == 0) return null; + short id=in.readShort(); + Class<?> clazz; + if(id >= 0) { + clazz=ClassConfigurator.get(id); + } + else { + String classname=in.readUTF(); + clazz=Class.forName(classname); + } + AuthToken retval=(AuthToken)clazz.newInstance(); + retval.readFrom(in); + return retval; + } + + protected static int sizeOf(AuthToken tok) { + int retval=Global.BYTE_SIZE; // null token ? + if(tok == null) return retval; + + retval+=Global.SHORT_SIZE; + short id=ClassConfigurator.getMagicNumber(tok.getClass()); + if(id < 0) { + String classname=tok.getClass().getName(); + retval+=classname.length() +2; + } + retval+=tok.size(); + return retval; + } + + }
src/org/jgroups/protocols/AUTH.java+160 −99 modified@@ -1,56 +1,76 @@ package org.jgroups.protocols; -import org.jgroups.Address; -import org.jgroups.Event; -import org.jgroups.Message; +import org.jgroups.*; import org.jgroups.annotations.MBean; -import org.jgroups.conf.ClassConfigurator; import org.jgroups.annotations.Property; import org.jgroups.auth.AuthToken; import org.jgroups.auth.X509Token; +import org.jgroups.conf.ClassConfigurator; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.JoinRsp; import org.jgroups.stack.Protocol; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; /** - * The AUTH protocol adds a layer of authentication to JGroups + * The AUTH protocol adds a layer of authentication to JGroups. It intercepts join and merge requests and rejects them + * if the joiner or merger is not permitted to join a or merge into a cluster. AUTH should be placed right below + * {@link GMS} in the configuration. * @author Chris Mills - * @autho Bela Ban + * @author Bela Ban */ @MBean(description="Provides authentication of joiners, to prevent un-authorized joining of a cluster") public class AUTH extends Protocol { + /** Interface to provide callbacks for handling up events */ + public interface UpHandler { + /** + * Called when an up event has been received + * @param evt the event + * @return true if the event should be passed up, else false + */ + boolean handleUpEvent(Event evt); + } + + + /** Used on the coordinator to authentication joining member requests against */ + protected AuthToken auth_token; + + protected static final short GMS_ID=ClassConfigurator.getProtocolId(GMS.class); + + /** List of UpHandler which are called when an up event has been received. Usually used by AuthToken impls */ + protected final List<UpHandler> up_handlers=new ArrayList<UpHandler>(); + + protected Address local_addr; - /** - * used on the coordinator to authentication joining member requests against - */ - protected AuthToken auth_token=null; - protected static final short gms_id=ClassConfigurator.getProtocolId(GMS.class); + public AUTH() {name="AUTH";} + protected volatile boolean authenticate_coord=true; - public AUTH() { - name="AUTH"; + @Property(description="Do join or merge responses from the coordinator also need to be authenticated") + public AUTH setAuthCoord( boolean authenticateCoord) { + this.authenticate_coord= authenticateCoord; return this; } - - - @Property(name="auth_class") + @Property(name="auth_class",description="The fully qualified name of the class implementing the AuthToken interface") public void setAuthClass(String class_name) throws Exception { Object obj=Class.forName(class_name).newInstance(); auth_token=(AuthToken)obj; auth_token.setAuth(this); } - public String getAuthClass() {return auth_token != null? auth_token.getClass().getName() : null;} - - public AuthToken getAuthToken() {return auth_token;} - public void setAuthToken(AuthToken token) {this.auth_token=token;} + public String getAuthClass() {return auth_token != null? auth_token.getClass().getName() : null;} + public AuthToken getAuthToken() {return auth_token;} + public AUTH setAuthToken(AuthToken token) {this.auth_token=token; return this;} + public AUTH register(UpHandler handler) {up_handlers.add(handler); return this;} + public AUTH unregister(UpHandler handler) {up_handlers.remove(handler);return this;} + public Address getAddress() {return local_addr;} + public PhysicalAddress getPhysicalAddress() {return getTransport().getPhysicalAddress();} protected List<Object> getConfigurableObjects() { @@ -62,65 +82,125 @@ protected List<Object> getConfigurableObjects() { public void init() throws Exception { super.init(); + if(auth_token == null) + throw new IllegalStateException("no authentication mechanism configured"); if(auth_token instanceof X509Token) { X509Token tmp=(X509Token)auth_token; tmp.setCertificate(); } auth_token.init(); } + public void start() throws Exception { + super.start(); + if(auth_token != null) + auth_token.start(); + } + public void stop() { + if(auth_token != null) + auth_token.stop(); + super.stop(); + } + + public void destroy() { + if(auth_token != null) + auth_token.destroy(); + super.destroy(); + } /** - * An event was received from the layer below. Usually the current layer will want to examine - * the event type and - depending on its type - perform some computation - * (e.g. removing headers from a MSG event type, or updating the internal membership list - * when receiving a VIEW_CHANGE event). - * Finally the event is either a) discarded, or b) an event is sent down - * the stack using <code>down_prot.down()</code> or c) the event (or another event) is sent up - * the stack using <code>up_prot.up()</code>. + * An event was received from the layer below. Usually the current layer will want to examine the event type and + * - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating + * the internal membership list when receiving a VIEW_CHANGE event). + * Finally the event is either a) discarded, or b) an event is sent down the stack using {@code down_prot.down()} + * or c) the event (or another event) is sent up the stack using {@code up_prot.up()}. */ public Object up(Event evt) { - GMS.GmsHeader hdr=getGMSHeader(evt); - if(hdr == null) - return up_prot.up(evt); + switch(evt.getType()) { + case Event.MSG: + Message msg=(Message)evt.getArg(); - if(hdr.getType() == GMS.GmsHeader.JOIN_REQ || hdr.getType() == GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER - || hdr.getType() == GMS.GmsHeader.MERGE_REQ) { // we found a join or merge message - now try and get the AUTH Header - Message msg=(Message)evt.getArg(); - if((msg.getHeader(this.id) != null) && (msg.getHeader(this.id) instanceof AuthHeader)) { - AuthHeader authHeader=(AuthHeader)msg.getHeader(this.id); - - if(authHeader != null) { - //Now we have the AUTH Header we need to validate it - if(this.auth_token.authenticate(authHeader.getToken(), msg)) { - return up_prot.up(evt); - } - else { - if(log.isWarnEnabled()) - log.warn("failed to validate AuthHeader token"); - sendRejectionMessage(hdr.getType(), msg.getSrc(), "Authentication failed"); - return null; - } - } - else { - //Invalid AUTH Header - need to send failure message - if(log.isWarnEnabled()) - log.warn("AUTH failed to get valid AuthHeader from Message"); - sendRejectionMessage(hdr.getType(), msg.getSrc(), "Failed to find valid AuthHeader in Message"); - return null; + // If we have a join or merge request --> authenticate, else pass up + GMS.GmsHeader gms_hdr=getGMSHeader(evt); + if(gms_hdr != null && needsAuthentication(gms_hdr)) { + AuthHeader auth_hdr=(AuthHeader)msg.getHeader(id); + if(auth_hdr == null) + throw new IllegalStateException(String.format("found %s from %s but no AUTH header", gms_hdr, msg.src())); + if(!handleAuthHeader(gms_hdr, auth_hdr, msg)) // authentication failed + return null; // don't pass up } - } - else { - sendRejectionMessage(hdr.getType(), msg.getSrc(), "Failed to find an AuthHeader in Message"); - return null; - } + break; } + if(!callUpHandlers(evt)) + return null; + return up_prot.up(evt); } + /** + * An event is to be sent down the stack. The layer may want to examine its type and perform + * some action on it, depending on the event's type. If the event is a message MSG, then + * the layer may need to add a header to it (or do nothing at all) before sending it down + * the stack using {@code down_prot.down()}. In case of a GET_ADDRESS event (which tries to + * retrieve the stack's address from one of the bottom layers), the layer may need to send + * a new response event back up the stack using {@code up_prot.up()}. + */ + public Object down(Event evt) { + GMS.GmsHeader hdr = getGMSHeader(evt); + if(hdr != null && needsAuthentication(hdr)) { + // we found a join request message - now add an AUTH Header + Message msg=(Message)evt.getArg(); + msg.putHeader(this.id, new AuthHeader(this.auth_token)); + } + + if(evt.getType() == Event.SET_LOCAL_ADDRESS) + local_addr=(Address)evt.getArg(); + + return down_prot.down(evt); + } + + + + protected boolean needsAuthentication(GMS.GmsHeader hdr) { + switch(hdr.getType()) { + case GMS.GmsHeader.JOIN_REQ: + case GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER: + case GMS.GmsHeader.MERGE_REQ: + return true; + case GMS.GmsHeader.JOIN_RSP: + case GMS.GmsHeader.MERGE_RSP: + case GMS.GmsHeader.INSTALL_MERGE_VIEW: + return this.authenticate_coord; + default: + return false; + } + } + + + /** + * Handles a GMS header + * @param gms_hdr + * @param msg + * @return true if the message should be passed up, or else false + */ + protected boolean handleAuthHeader(GMS.GmsHeader gms_hdr, AuthHeader auth_hdr, Message msg) { + if(needsAuthentication(gms_hdr)) { + if(this.auth_token.authenticate(auth_hdr.getToken(), msg)) + return true; // authentication passed, send message up the stack + else { + log.warn(String.format("%s: failed to validate AuthHeader (token: %s) from %s; dropping message", + local_addr, auth_token.getClass().getSimpleName(), msg.src())); + sendRejectionMessage(gms_hdr.getType(), msg.getSrc(), "authentication failed"); + return false; + } + } + return true; + } + + protected void sendRejectionMessage(byte type, Address dest, String error_msg) { switch(type) { case GMS.GmsHeader.JOIN_REQ: @@ -130,54 +210,38 @@ protected void sendRejectionMessage(byte type, Address dest, String error_msg) { case GMS.GmsHeader.MERGE_REQ: sendMergeRejectionMessage(dest); break; - default: - log.error("type " + type + " unknown"); - break; } } protected void sendJoinRejectionMessage(Address dest, String error_msg) { if(dest == null) return; - Message msg = new Message(dest, null, null); JoinRsp joinRes=new JoinRsp(error_msg); // specify the error message on the JoinRsp - - GMS.GmsHeader gmsHeader=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, joinRes); - msg.putHeader(gms_id, gmsHeader); + Message msg = new Message(dest).putHeader(GMS_ID, new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP)) + .setBuffer(GMS.marshal(joinRes)); + if(this.authenticate_coord) + msg.putHeader(this.id, new AuthHeader(this.auth_token)); down_prot.down(new Event(Event.MSG, msg)); } protected void sendMergeRejectionMessage(Address dest) { - Message msg=new Message(dest, null, null); - msg.setFlag(Message.OOB); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.setMergeRejected(true); - msg.putHeader(gms_id, hdr); - if(log.isDebugEnabled()) log.debug("merge response=" + hdr); + Message msg=new Message(dest).setFlag(Message.Flag.OOB).putHeader(GMS_ID, hdr); + if(this.authenticate_coord) + msg.putHeader(this.id, new AuthHeader(this.auth_token)); + log.debug("merge response=" + hdr); down_prot.down(new Event(Event.MSG, msg)); } - /** - * An event is to be sent down the stack. The layer may want to examine its type and perform - * some action on it, depending on the event's type. If the event is a message MSG, then - * the layer may need to add a header to it (or do nothing at all) before sending it down - * the stack using <code>down_prot.down()</code>. In case of a GET_ADDRESS event (which tries to - * retrieve the stack's address from one of the bottom layers), the layer may need to send - * a new response event back up the stack using <code>up_prot.up()</code>. - */ - public Object down(Event evt) { - GMS.GmsHeader hdr = getGMSHeader(evt); - if((hdr != null) && (hdr.getType() == GMS.GmsHeader.JOIN_REQ - || hdr.getType() == GMS.GmsHeader.JOIN_REQ_WITH_STATE_TRANSFER - || hdr.getType() == GMS.GmsHeader.MERGE_REQ)) { - //we found a join request message - now add an AUTH Header - Message msg = (Message)evt.getArg(); - AuthHeader authHeader = new AuthHeader(); - authHeader.setToken(this.auth_token); - msg.putHeader(this.id, authHeader); + protected boolean callUpHandlers(Event evt) { + boolean pass_up=true; + for(UpHandler handler: up_handlers) { + if(!handler.handleUpEvent(evt)) + pass_up=false; } - return down_prot.down(evt); + return pass_up; } /** @@ -186,16 +250,13 @@ public Object down(Event evt) { * @return A GmsHeader object or null if the event contains a message of a different type */ protected static GMS.GmsHeader getGMSHeader(Event evt){ - Message msg; - switch(evt.getType()){ - case Event.MSG: - msg = (Message)evt.getArg(); - Object obj = msg.getHeader(gms_id); - if(obj == null || !(obj instanceof GMS.GmsHeader)){ - return null; - } - return (GMS.GmsHeader)obj; - } + return evt.getType() == Event.MSG? getGMSHeader((Message)evt.getArg()) : null; + } + + protected static GMS.GmsHeader getGMSHeader(Message msg){ + Header hdr = msg.getHeader(GMS_ID); + if(hdr instanceof GMS.GmsHeader) + return (GMS.GmsHeader)hdr; return null; } }
src/org/jgroups/protocols/DISCARD.java+2 −2 modified@@ -63,8 +63,8 @@ public boolean isDiscardAll() { return discard_all; } - public void setDiscardAll(boolean discard_all) { - this.discard_all=discard_all; + public DISCARD setDiscardAll(boolean discard_all) { + this.discard_all=discard_all; return this; } public boolean isExcludeItself() {
src/org/jgroups/protocols/EncryptBase.java+370 −0 added@@ -0,0 +1,370 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.Property; +import org.jgroups.stack.Protocol; +import org.jgroups.util.*; + +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.zip.Adler32; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * Super class of symmetric ({@link SYM_ENCRYPT}) and asymmetric ({@link ASYM_ENCRYPT}) encryption protocols. + * @author Bela Ban + */ +public abstract class EncryptBase extends Protocol { + protected static final String DEFAULT_SYM_ALGO="AES"; + + + /* ----------------------------------------- Properties -------------------------------------------------- */ + @Property(description="Cryptographic Service Provider") + protected String provider; + + @Property(description="Cipher engine transformation for asymmetric algorithm. Default is RSA") + protected String asym_algorithm="RSA"; + + @Property(description="Cipher engine transformation for symmetric algorithm. Default is AES") + protected String sym_algorithm=DEFAULT_SYM_ALGO; + + @Property(description="Initial public/private key length. Default is 512") + protected int asym_keylength=512; + + @Property(description="Initial key length for matching symmetric algorithm. Default is 128") + protected int sym_keylength=128; + + @Property(description="Number of ciphers in the pool to parallelize encrypt and decrypt requests",writable=false) + protected int cipher_pool_size=8; + + @Property(description="If true, the entire message (including payload and headers) is encrypted, else only the payload") + protected boolean encrypt_entire_message=true; + + @Property(description="If true, all messages are digitally signed by adding an encrypted checksum of the encrypted " + + "message to the header. Ignored if encrypt_entire_message is false") + protected boolean sign_msgs=true; + + @Property(description="When sign_msgs is true, by default CRC32 is used to create the checksum. If use_adler is " + + "true, Adler32 will be used") + protected boolean use_adler; + + protected volatile Address local_addr; + + protected volatile View view; + + // Cipher pools used for encryption and decryption. Size is cipher_pool_size + protected BlockingQueue<Cipher> encoding_ciphers, decoding_ciphers; + + // version filed for secret key + protected volatile byte[] sym_version; + + // shared secret key to encrypt/decrypt messages + protected volatile SecretKey secret_key; + + // map to hold previous keys so we can decrypt some earlier messages if we need to + protected final Map<AsciiString,Cipher> key_map=new WeakHashMap<AsciiString,Cipher>(); + + + + public int asymKeylength() {return asym_keylength;} + public <T extends EncryptBase> T asymKeylength(int len) {this.asym_keylength=len; return (T)this;} + public int symKeylength() {return sym_keylength;} + public <T extends EncryptBase> T symKeylength(int len) {this.sym_keylength=len; return (T)this;} + public SecretKey secretKey() {return secret_key;} + public <T extends EncryptBase> T secretKey(SecretKey key) {this.secret_key=key; return (T)this;} + public String symAlgorithm() {return sym_algorithm;} + public <T extends EncryptBase> T symAlgorithm(String alg) {this.sym_algorithm=alg; return (T)this;} + public String asymAlgorithm() {return asym_algorithm;} + public <T extends EncryptBase> T asymAlgorithm(String alg) {this.asym_algorithm=alg; return (T)this;} + public byte[] symVersion() {return sym_version;} + public <T extends EncryptBase> T symVersion(byte[] v) {this.sym_version=Arrays.copyOf(v, v.length); return (T)this;} + public <T extends EncryptBase> T localAddress(Address addr) {this.local_addr=addr; return (T)this;} + public boolean encryptEntireMessage() {return encrypt_entire_message;} + public <T extends EncryptBase> T encryptEntireMessage(boolean b) {this.encrypt_entire_message=b; return (T)this;} + public boolean signMessages() {return this.sign_msgs;} + public <T extends EncryptBase> T signMessages(boolean flag) {this.sign_msgs=flag; return (T)this;} + public boolean adler() {return use_adler;} + public <T extends EncryptBase> T adler(boolean flag) {this.use_adler=flag; return (T)this;} + @ManagedAttribute public String version() {return Util.byteArrayToHexString(sym_version);} + + public void init() throws Exception { + int tmp=Util.getNextHigherPowerOfTwo(cipher_pool_size); + if(tmp != cipher_pool_size) { + log.warn(String.format("%s: setting cipher_pool_size (%d) to %d (power of 2) for faster modulo operation", local_addr, cipher_pool_size, tmp)); + cipher_pool_size=tmp; + } + encoding_ciphers=new ArrayBlockingQueue<Cipher>(cipher_pool_size); + decoding_ciphers=new ArrayBlockingQueue<Cipher>(cipher_pool_size); + initSymCiphers(sym_algorithm, secret_key); + } + + + public Object down(Event evt) { + switch(evt.getType()) { + case Event.MSG: + Message msg=evt.arg(); + try { + if(secret_key == null) { + log.trace(String.format("%s: discarded %s message to %s as secret key is null, hdrs: %s", + local_addr, msg.dest() == null? "mcast" : "unicast", msg.dest(), msg.printHeaders())); + return null; + } + encryptAndSend(msg); + } + catch(Exception e) { + log.warn(local_addr + ": unable to send message down", e); + } + return null; + + case Event.VIEW_CHANGE: + handleView((View)evt.getArg()); + break; + + case Event.SET_LOCAL_ADDRESS: + local_addr=evt.arg(); + break; + } + return down_prot.down(evt); + } + + + public Object up(Event evt) { + switch(evt.getType()) { + case Event.VIEW_CHANGE: + handleView((View)evt.getArg()); + break; + case Event.MSG: + Message msg=evt.arg(); + try { + return handleUpMessage(msg); + } + catch(Exception e) { + log.warn(local_addr + ": exception occurred decrypting message", e); + } + return null; + } + return up_prot.up(evt); + } + + + + + /** Initialises the ciphers for both encryption and decryption using the generated or supplied secret key */ + protected synchronized void initSymCiphers(String algorithm, SecretKey secret) throws Exception { + if(secret == null) + return; + encoding_ciphers.clear(); + decoding_ciphers.clear(); + for(int i=0; i < cipher_pool_size; i++ ) { + encoding_ciphers.add(createCipher(Cipher.ENCRYPT_MODE, secret, algorithm)); + decoding_ciphers.add(createCipher(Cipher.DECRYPT_MODE, secret, algorithm)); + }; + + //set the version + MessageDigest digest=MessageDigest.getInstance("MD5"); + digest.reset(); + digest.update(secret.getEncoded()); + + byte[] tmp=digest.digest(); + sym_version=Arrays.copyOf(tmp, tmp.length); + log.debug(String.format("%s: created %d symmetric ciphers with secret key (%d bytes)", local_addr, cipher_pool_size, sym_version.length)); + } + + + protected Cipher createCipher(int mode, SecretKey secret_key, String algorithm) throws Exception { + Cipher cipher=provider != null && !provider.trim().isEmpty()? + Cipher.getInstance(algorithm, provider) : Cipher.getInstance(algorithm); + cipher.init(mode, secret_key); + return cipher; + } + + + protected Object handleUpMessage(Message msg) throws Exception { + EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id); + if(hdr == null) { + log.error(String.format("%s: received message without encrypt header from %s; dropping it", local_addr, msg.src())); + return null; + } + switch(hdr.type()) { + case EncryptHeader.ENCRYPT: + return handleEncryptedMessage(msg); + default: + return handleUpEvent(msg,hdr); + } + } + + + protected Object handleEncryptedMessage(Message msg) throws Exception { + if(!process(msg)) + return null; + + // try and decrypt the message - we need to copy msg as we modify its + // buffer (http://jira.jboss.com/jira/browse/JGRP-538) + Message tmpMsg=decryptMessage(null, msg.copy()); // need to copy for possible xmits + if(tmpMsg != null) + return up_prot.up(new Event(Event.MSG, tmpMsg)); + log.warn(String.format("%s: unrecognized cipher; discarding message from %s", local_addr, msg.src())); + return null; + } + + protected Object handleUpEvent(Message msg, EncryptHeader hdr) { + return null; + } + + /** Whether or not to process this received message */ + protected boolean process(Message msg) {return true;} + + protected void handleView(View view) { + this.view=view; + } + + protected boolean inView(Address sender, String error_msg) { + View curr_view=this.view; + if(curr_view == null || curr_view.containsMember(sender)) + return true; + log.error(String.format(error_msg, local_addr, sender, curr_view)); + return false; + } + + protected Checksum createChecksummer() {return use_adler? new Adler32() : new CRC32();} + + + /** Does the actual work for decrypting - if version does not match current cipher then tries the previous cipher */ + protected Message decryptMessage(Cipher cipher, Message msg) throws Exception { + EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id); + if(!Arrays.equals(hdr.version(), sym_version)) { + cipher=key_map.get(new AsciiString(hdr.version())); + if(cipher == null) { + handleUnknownVersion(); + return null; + } + log.trace(String.format("%s: decrypting msg from %s using previous cipher version", local_addr, msg.src())); + return _decrypt(cipher, msg, hdr); + } + return _decrypt(cipher, msg, hdr); + } + + protected Message _decrypt(final Cipher cipher, Message msg, EncryptHeader hdr) throws Exception { + byte[] decrypted_msg; + + if(!encrypt_entire_message && msg.getLength() == 0) + return msg; + + if(encrypt_entire_message && sign_msgs) { + byte[] signature=hdr.signature(); + if(signature == null) { + log.error(String.format("%s: dropped message from %s as the header did not have a checksum", local_addr, msg.src())); + return null; + } + + long msg_checksum=decryptChecksum(cipher, signature, 0, signature.length); + long actual_checksum=computeChecksum(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); + if(actual_checksum != msg_checksum) { + log.error(String.format("%s: dropped message from %s as the message's checksum (%d) did not match the computed checksum (%d)", + local_addr, msg.src(), msg_checksum, actual_checksum)); + return null; + } + } + + if(cipher == null) + decrypted_msg=code(msg.getRawBuffer(), msg.getOffset(), msg.getLength(), true); + else + decrypted_msg=cipher.doFinal(msg.getRawBuffer(), msg.getOffset(), msg.getLength()); + + if(!encrypt_entire_message) { + msg.setBuffer(decrypted_msg); + return msg; + } + + Message ret=Util.streamableFromBuffer(Message.class,decrypted_msg,0,decrypted_msg.length); + if(ret.getDest() == null) + ret.setDest(msg.getDest()); + if(ret.getSrc() == null) + ret.setSrc(msg.getSrc()); + return ret; + } + + + protected void encryptAndSend(Message msg) throws Exception { + EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, symVersion()); + if(encrypt_entire_message) { + if(msg.getSrc() == null) + msg.setSrc(local_addr); + + Buffer serialized_msg=Util.streamableToBuffer(msg); + byte[] encrypted_msg=code(serialized_msg.getBuf(),serialized_msg.getOffset(),serialized_msg.getLength(),false); + + if(sign_msgs) { + long checksum=computeChecksum(encrypted_msg, 0, encrypted_msg.length); + byte[] checksum_array=encryptChecksum(checksum); + hdr.signature(checksum_array); + } + + // exclude existing headers, they will be seen again when we decrypt and unmarshal the msg at the receiver + Message tmp=msg.copy(false, false).setBuffer(encrypted_msg).putHeader(this.id,hdr); + down_prot.down(new Event(Event.MSG, tmp)); + return; + } + + // copy neeeded because same message (object) may be retransmitted -> prevent double encryption + Message msgEncrypted=msg.copy(false).putHeader(this.id, hdr); + if(msg.getLength() > 0) + msgEncrypted.setBuffer(code(msg.getRawBuffer(),msg.getOffset(),msg.getLength(),false)); + down_prot.down(new Event(Event.MSG,msgEncrypted)); + } + + + protected byte[] code(byte[] buf, int offset, int length, boolean decode) throws Exception { + BlockingQueue<Cipher> queue=decode? decoding_ciphers : encoding_ciphers; + Cipher cipher=queue.take(); + try { + return cipher.doFinal(buf, offset, length); + } + finally { + queue.offer(cipher); + } + } + + protected long computeChecksum(byte[] input, int offset, int length) { + Checksum checksummer=createChecksummer(); + checksummer.update(input, offset, length); + return checksummer.getValue(); + } + + protected byte[] encryptChecksum(long checksum) throws Exception { + byte[] checksum_array=new byte[Global.LONG_SIZE]; + Bits.writeLong(checksum, checksum_array, 0); + return code(checksum_array, 0, checksum_array.length, false); + } + + protected long decryptChecksum(final Cipher cipher, byte[] input, int offset, int length) throws Exception { + byte[] decrypted_checksum; + if(cipher == null) + decrypted_checksum=code(input, offset, length, true); + else + decrypted_checksum=cipher.doFinal(input, offset, length); + return Bits.readLong(decrypted_checksum, 0); + } + + + /* Get the algorithm name from "algorithm/mode/padding" taken from original ENCRYPT */ + protected static String getAlgorithm(String s) { + int index=s.indexOf('/'); + return index == -1? s : s.substring(0, index); + } + + + /** Called when the version shipped in the header can't be found */ + protected void handleUnknownVersion() {} + + + +}
src/org/jgroups/protocols/EncryptHeader.java+65 −0 added@@ -0,0 +1,65 @@ +package org.jgroups.protocols; + +import org.jgroups.Global; +import org.jgroups.Header; +import org.jgroups.util.Util; + +import java.io.DataInput; +import java.io.DataOutput; + +/** + * @author Bela Ban + * @since 3.6.10 + */ +public class EncryptHeader extends Header { + public static final byte ENCRYPT = 1 << 0; + public static final byte SECRET_KEY_REQ = 1 << 1; + public static final byte SECRET_KEY_RSP = 1 << 2; + + protected byte type; + protected byte[] version; + protected byte[] signature; // the encrypted checksum + + + public EncryptHeader() {} + + + public EncryptHeader(byte type, byte[] version) { + this.type=type; + this.version=version; + } + + public byte type() {return type;} + public byte getType() {return type;} + public byte[] version() {return version;} + public byte[] getVersion() {return version;} + public byte[] signature() {return signature;} + public EncryptHeader signature(byte[] s) {this.signature=s; return this;} + + public void writeTo(DataOutput out) throws Exception { + out.writeByte(type); + Util.writeByteBuffer(version, 0, version != null? version.length : 0, out); + Util.writeByteBuffer(signature, 0, signature != null? signature.length : 0, out); + } + + public void readFrom(DataInput in) throws Exception { + type=in.readByte(); + version=Util.readByteBuffer(in); + signature=Util.readByteBuffer(in); + } + + public String toString() { + return String.format("[%s version=%s]", typeToString(type), (version != null? version.length + " bytes" : "n/a")); + } + + public int size() {return Global.BYTE_SIZE + Util.size(version) + Util.size(signature) /*+ Util.size(payload) */;} + + protected static String typeToString(byte type) { + switch(type) { + case ENCRYPT: return "ENCRYPT"; + case SECRET_KEY_REQ: return "SECRET_KEY_REQ"; + case SECRET_KEY_RSP: return "SECRET_KEY_RSP"; + default: return "<unrecognized type " + type; + } + } +}
src/org/jgroups/protocols/FRAG2.java+1 −0 modified@@ -70,6 +70,7 @@ public class FRAG2 extends Protocol { public int getFragSize() {return frag_size;} public void setFragSize(int s) {frag_size=s;} + public FRAG2 fragSize(int s) {setFragSize(s); return this;} public long getNumberOfSentMessages() {return num_sent_msgs.get();} public long getNumberOfSentFragments() {return num_sent_frags.get();} public long getNumberOfReceivedMessages() {return num_received_msgs.get();}
src/org/jgroups/protocols/pbcast/GMS.java+5 −0 modified@@ -174,6 +174,7 @@ public Tuple<View,Digest> getViewAndDigest() { public int getNumMembers() {return members.size();} public long getJoinTimeout() {return join_timeout;} public void setJoinTimeout(long t) {join_timeout=t;} + public GMS joinTimeout(long t) {setJoinTimeout(t); return this;} public long getMergeTimeout() {return merge_timeout;} public void setMergeTimeout(long timeout) {merge_timeout=timeout;} public long getMaxJoinAttempts() {return max_join_attempts;} @@ -975,6 +976,10 @@ public String[] supportedKeys() { return new String[]{"fix-digests"}; } + public static Buffer marshal(JoinRsp join_rsp) { + return Util.streamableToBuffer(join_rsp); + } + /* ------------------------------- Private Methods --------------------------------- */ final void initState() {
src/org/jgroups/protocols/pbcast/STABLE.java+3 −1 modified@@ -55,7 +55,7 @@ public class STABLE extends Protocol { * This should be set to a very small number (> 0 !) if <code>max_bytes</code> is used */ @Property(description="Delay before stability message is sent. Default is 6000 msec") - private long stability_delay=6000; + private long stability_delay=1000; /** * Total amount of bytes from incoming messages (default = 0 = disabled). @@ -145,6 +145,8 @@ public void setDesiredAverageGossip(long gossip_interval) { desired_avg_gossip=gossip_interval; } + public STABLE desiredAverageGossip(long t) {desired_avg_gossip=t; return this;} + public long getMaxBytes() { return max_bytes; }
src/org/jgroups/protocols/SYM_ENCRYPT.java+124 −0 added@@ -0,0 +1,124 @@ +package org.jgroups.protocols; + +import org.jgroups.annotations.MBean; +import org.jgroups.annotations.Property; +import org.jgroups.util.Util; + +import javax.crypto.SecretKey; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; + +/** + * Encrypts and decrypts communication in JGroups by using a secret key shared by all cluster members.<p> + * + * The secret key is identical for all cluster members and is injected into this protocol at startup, e.g. by reading + * it from a keystore. Messages are sent by encrypting them with the secret key and received by decrypting them with + * the secret key. Note that all cluster members must be shipped with the same keystore file<p> + * + * This protocol is typically placed under {@link org.jgroups.protocols.pbcast.NAKACK2}, so that most important + * headers are encrypted as well, to prevent replay attacks.<p> + * + * A possible configuration looks like this:<br><br> + * {@code <SYM_ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit" alias="myKey"/>} + * <br> + * <br> + * In order to use SYM_ENCRYPT layer in this manner, it is necessary to have the secret key already generated in a + * keystore file. The directory containing the keystore file must be on the application's classpath. You cannot create a + * secret key keystore file using the keytool application shipped with the JDK. A java file called KeyStoreGenerator is + * included in the demo package that can be used from the command line (or IDE) to generate a suitable keystore. + * + * @author Bela Ban + * @author Steve Woodcock + */ +@MBean(description="Symmetric encryption protocol. The (shared) shared secret key is configured up front, " + + "e.g. via a key store, or injection") +public class SYM_ENCRYPT extends EncryptBase { + + /* ----------------------------------------- Properties -------------------------------------------------- */ + @Property(description="File on classpath that contains keystore repository") + protected String keystore_name; + + @Property(description="Password used to check the integrity/unlock the keystore. Change the default", + exposeAsManagedAttribute=false) + protected String store_password="changeit"; // JDK default + + @Property(description="Password for recovering the key. Change the default", exposeAsManagedAttribute=false) + protected String key_password; // allows to assign keypwd=storepwd if not set (https://issues.jboss.org/browse/JGRP-1375) + + + @Property(name="alias", description="Alias used for recovering the key. Change the default",exposeAsManagedAttribute=false) + protected String alias="mykey"; // JDK default + + + public String keystoreName() {return this.keystore_name;} + public SYM_ENCRYPT keystoreName(String n) {this.keystore_name=n; return this;} + public String alias() {return alias;} + public SYM_ENCRYPT alias(String a) {this.alias=a; return this;} + public String storePassword() {return store_password;} + public SYM_ENCRYPT storePassword(String pwd) {this.store_password=pwd; return this;} + + + + + public void init() throws Exception { + if(key_password == null && store_password != null) { + key_password=store_password; + log.debug(String.format("%s: key_password used is same as store_password", local_addr)); + } + readSecretKeyFromKeystore(); + super.init(); + } + + /** + * Initialisation if a supplied key is defined in the properties. This supplied key must be in a keystore which + * can be generated using the keystoreGenerator file in demos. The keystore must be on the classpath to find it. + */ + protected void readSecretKeyFromKeystore() throws Exception { + InputStream inputStream=null; + // must not use default keystore type - as it does not support secret keys + KeyStore store=KeyStore.getInstance("JCEKS"); + + SecretKey tempKey=null; + try { + if(this.secret_key == null) { // in case the secret key was set before, e.g. via injection in a unit test + // load in keystore using this thread's classloader + inputStream=Thread.currentThread().getContextClassLoader().getResourceAsStream(keystore_name); + if(inputStream == null) + inputStream=new FileInputStream(keystore_name); + // we can't find a keystore here - + if(inputStream == null) + throw new Exception("Unable to load keystore " + keystore_name + " ensure file is on classpath"); + // we have located a file lets load the keystore + try { + store.load(inputStream, store_password.toCharArray()); + // loaded keystore - get the key + tempKey=(SecretKey)store.getKey(alias, key_password.toCharArray()); + } + catch(IOException e) { + throw new Exception("Unable to load keystore " + keystore_name + ": " + e); + } + catch(NoSuchAlgorithmException e) { + throw new Exception("No Such algorithm " + keystore_name + ": " + e); + } + catch(CertificateException e) { + throw new Exception("Certificate exception " + keystore_name + ": " + e); + } + + if(tempKey == null) + throw new Exception("Unable to retrieve key '" + alias + "' from keystore " + keystore_name); + this.secret_key=tempKey; + if(sym_algorithm.equals(DEFAULT_SYM_ALGO)) + sym_algorithm=tempKey.getAlgorithm(); + } + } + finally { + Util.close(inputStream); + } + } + + +}
src/org/jgroups/util/AsciiString.java+84 −0 added@@ -0,0 +1,84 @@ +package org.jgroups.util; + +/** + * Simple string implemented as a byte[] array. Each character's higher 8 bits are truncated and + * only the lower 8 bits are stored. AsciiString is mutable for efficiency reasons, but the chars array should never + * be changed ! + * @author Bela Ban + * @since 3.5 + */ +public class AsciiString implements Comparable<AsciiString> { + protected final byte[] val; + + public AsciiString() { + val=new byte[]{}; + } + + public AsciiString(String str) { + int length=str != null? str.length() : 0; + this.val=new byte[length]; + for(int i=0; i < length; i++) + val[i]=(byte)str.charAt(i); + } + + public AsciiString(AsciiString str) { + this.val=str.val; + } + + public AsciiString(byte[] val) { + this.val=val; // mutable, used only for creation + } + + public AsciiString(int length) { + this.val=new byte[length]; + } + + public byte[] chars() {return val;} // mutable + + public int length() { + return val.length; + } + + public int compareTo(AsciiString str) { + if(str == null) return 1; + if(chars().hashCode() == str.val.hashCode()) + return 0; + + int len1=val.length; + int len2=str.val.length; + int lim=Math.min(len1, len2); + byte[] v1=val; + byte[] v2=str.val; + + int k = 0; + while (k < lim) { + byte c1 =v1[k]; + byte c2 =v2[k]; + if (c1 != c2) + return c1 > c2? 1 : -1; + k++; + } + return len1 > len2? 1 : len1 < len2? -1 : 0; + } + + + + public boolean equals(Object obj) { + return obj instanceof AsciiString && compareTo((AsciiString)obj) == 0; + } + + public int hashCode() { + int h=0; + for(int i=0; i < val.length; i++) + h=31 * h + val[i]; + return h; + } + + public String toString() { + return new String(val); + } + + + + +}
src/org/jgroups/util/Bits.java+773 −0 added@@ -0,0 +1,773 @@ +package org.jgroups.util; + +import org.jgroups.Global; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Class (similar to (and partly copied from) java.nio.Bits) containing helper methods to encode variables + * (e.g. ints, long, List<Address> etc) to memory (byte buffer) or output streams and read variables + * from memory or input streams.<p/> + * The write methods write a type (e.g. an int or a char) to a buffer ({@link ByteBuffer} or output stream, using + * <a href="https://developers.google.com/protocol-buffers/docs/encoding">variable-length encoding</a>. If + * there are not enough byte in the buffer to write a type, a {@link java.nio.BufferOverflowException} is thrown. + * If the variable cannot be written to the output stream, an IOException is thrown. + * <p/> + * The read methods read a variable-length encoded type from a buffer or input stream. If there are fewer bytes in + * the buffer than needed to read the type, a {@link java.nio.BufferUnderflowException} is thrown. If the read fails, + * an IOException is thrown. + * <p/> + * The size() methods return the number of bytes used to encode the given type with variable-length encoding. + * <p/> + * There are additional helper methods to write/read custom JGroups types, e.g. address lists, Views etc + * <p/> + * Note that methods to read/write atomic types (char, int etc) should only be used if variable-length encoding is + * desired; otherwise {@link DataOutput#writeInt(int)} or {@link ByteBuffer#putInt(int)} should be used instead. + * <p/> + * At the time of writing this (Feb 2014), most methods have not yet been implemented. + * @author Bela Ban + * @author Sanne Grinovero + * @since 3.5 + */ +public final class Bits { + + private Bits() { + throw new InstantiationError( "Must not instantiate this class" ); + } + + // -------------------- char ------------------------ // + + // No compression of chars as they only use 2 bytes + public static char makeChar(byte[] buf, int offset) { + return (char) ((buf[offset + 1] & 0xFF) + (buf[offset] << 8)); + } + + public static void writeChar(char c, byte[] buf, int offset) { + buf[offset+1]=(byte)c; + buf[offset]=(byte)(c >>> 8); + } + + public static char readChar(byte[] buf, int offset) { + return makeChar(buf, offset); + } + + // -------------------- short ----------------------- // + + // No implementations as encoding a char doesn't use much space + + public static short makeShort(byte a, byte b) { + return (short)((a << 8) | (b & 0xff)); + } + + public static short makeShort(byte a) { + return (short) (a & 0xff); + } + + public static void writeShort(short s, byte[] buf, int offset) { + buf[offset+1]=(byte)s; + buf[offset]=(byte)(s >>> 8); + } + + public static short readShort(byte[] buf, int offset) { + return (short)((buf[offset+1] & 0xFF) + (buf[offset] << 8)); + } + + // --------------------- int ------------------------ // + + + /** + * Writes an int to a ByteBuffer + * @param num the int to be written + * @param buf the buffer + */ + public static void writeInt(int num, ByteBuffer buf) { + if(num == 0) { + buf.put((byte)0); + return; + } + final byte bytes_needed=bytesRequiredFor(num); + buf.put(bytes_needed); + for(int i=0; i < bytes_needed; i++) + buf.put(getByteAt(num, i)); + } + + /** + * Writes an int to an output stream + * @param num the int to be written + * @param out the output stream + */ + public static void writeInt(int num, DataOutput out) throws IOException { + if(num == 0) { + out.write(0); + return; + } + final byte bytes_needed=bytesRequiredFor(num); + out.write(bytes_needed); + for(int i=0; i < bytes_needed; i++) + out.write(getByteAt(num, i)); + } + + public static void writeInt(int num, byte[] buf, int offset) { + buf[offset+3]=(byte)num; + buf[offset+2]=(byte)(num >>> 8); + buf[offset+1]=(byte)(num >>> 16); + buf[offset]=(byte)(num >>> 24); + } + + public static void writeIntCompressed(int num, byte[] buf, int offset) { + if(num == 0) { + buf[offset]=0; + return; + } + final byte bytes_needed=bytesRequiredFor(num); + buf[offset++]=bytes_needed; + for(int i=0; i < bytes_needed; i++) + buf[offset++]=getByteAt(num, i); + } + + + /** + * Reads an int from a buffer. + * @param buf the buffer + * @return the int read from the buffer + */ + public static int readInt(ByteBuffer buf) { + byte len=buf.get(); + if(len == 0) + return 0; + byte[] retval=new byte[len]; + buf.get(retval, 0, len); + return makeInt(retval, 0, len); + } + + /** + * Reads an int from an input stream + * @param in the input stream + * @return the int read from the input stream + */ + public static int readInt(DataInput in) throws IOException { + byte len=in.readByte(); + if(len == 0) + return 0; + byte[] buf=new byte[len]; + in.readFully(buf, 0, len); + return makeInt(buf, 0, len); + } + + public static int readInt(byte[] buf, int offset) { + return ((buf[offset+3] & 0xFF)) + + ((buf[offset+2] & 0xFF) << 8) + + ((buf[offset+1] & 0xFF) << 16) + + ((buf[offset]) << 24); + } + + public static int readIntCompressed(byte[] buf, int offset) { + byte len=buf[offset++]; + if(len == 0) + return 0; + byte[] buffer=new byte[len]; + for(int i=0; i < len; i++) + buffer[i]=buf[offset++]; + return makeInt(buffer, 0, len); + } + + + /** + * Computes the size of a variable-length encoded int + * @param num the int + * @return the number of bytes needed to variable-length encode num + */ + public static int size(int num) { + return (byte)(num == 0? 1 : bytesRequiredFor(num) +1); + } + + + + + // -------------------- long ------------------------ // + + /** + * Writes a long to a ByteBuffer + * @param num the long to be written + * @param buf the buffer + */ + public static void writeLong(long num, ByteBuffer buf) { + if(num == 0) { + buf.put((byte)0); + return; + } + final byte bytes_needed=bytesRequiredFor(num); + buf.put(bytes_needed); + for(int i=0; i < bytes_needed; i++) + buf.put(getByteAt(num, i)); + } + + /** + * Writes a long to out in variable-length encoding. Note that currently variable-length encoding is <em>not</em> + * used (a similar mechanism is used); this will be implemented later. + * @param num the long + * @param out the output stream to write num to + * @throws IOException + */ + public static void writeLong(final long num, final DataOutput out) throws IOException { + if(num == 0) { + out.write(0); + return; + } + final byte bytes_needed=bytesRequiredFor(num); + out.write(bytes_needed); + for(int i=0; i < bytes_needed; i++) + out.write(getByteAt(num, i)); + } + + public static void writeLong(long num, byte[] buf, int offset) { + buf[offset+7]=(byte)num; + buf[offset+6]=(byte)(num >>> 8); + buf[offset+5]=(byte)(num >>> 16); + buf[offset+4]=(byte)(num >>> 24); + buf[offset+3]=(byte)(num >>> 32); + buf[offset+2]=(byte)(num >>> 40); + buf[offset+1]=(byte)(num >>> 48); + buf[offset]=(byte)(num >>> 56); + } + + public static void writeLongCompressed(long num, byte[] buf, int offset) { + if(num == 0) { + buf[offset]=0; + return; + } + final byte bytes_needed=bytesRequiredFor(num); + buf[offset++]=bytes_needed; + for(int i=0; i < bytes_needed; i++) + buf[offset++]=getByteAt(num, i); + } + + + + /** + * Reads a long from a buffer. + * @param buf the buffer + * @return the long read from the buffer + */ + public static long readLong(ByteBuffer buf) { + byte len=buf.get(); + if(len == 0) + return 0; + byte[] retval=new byte[len]; + buf.get(retval, 0, len); + return makeLong(retval, 0, len); + } + + /** + * Reads a variable-length encoded long from an input stream. Note that currently variable-length encoding is <em>not</em> + * used (a similar mechanism is used); this will be implemented later. + * @param in the input stream + * @return the long read from the input stream + * @throws IOException + */ + public static long readLong(DataInput in) throws IOException { + byte len=in.readByte(); + if(len == 0) + return 0; + byte[] buf=new byte[len]; + in.readFully(buf, 0, len); + return makeLong(buf, 0, len); + } + + public static long readLong(byte[] buf, int offset) { + return ((buf[offset+7] & 0xFFL)) + + ((buf[offset+6] & 0xFFL) << 8) + + ((buf[offset+5] & 0xFFL) << 16) + + ((buf[offset+4] & 0xFFL) << 24) + + ((buf[offset+3] & 0xFFL) << 32) + + ((buf[offset+2] & 0xFFL) << 40) + + ((buf[offset+1] & 0xFFL) << 48) + + (((long) buf[offset]) << 56); + } + + public static long readLongCompressed(byte[] buf, int offset) { + byte len=buf[offset++]; + if(len == 0) + return 0; + byte[] buffer=new byte[len]; + for(int i=0; i < len; i++) + buffer[i]=buf[offset++]; + return makeLong(buffer, 0, len); + } + + /** + * Computes the size of a variable-length encoded long. Note that this is <em>not</em> currently using + * variable-length encoding (will be implemented later). + * @param num the long + * @return the number of bytes needed to variable-length encode num + */ + public static int size(long num) { + return (byte)(num == 0? 1 : bytesRequiredFor(num) +1); + } + + + + + + // ------------------ long seq ---------------------- // + + /** + * Writes 2 sequence numbers (seqnos) in compressed format to buf. + * The seqnos are non-negative and hr is guaranteed to be >= hd. + * <p/> + * Once variable-length encoding has been implemented, this method will probably get dropped as we can simply + * write the 2 longs individually. + * @param hd the highest delivered seqno. Guaranteed to be a positive number + * @param hr the highest received seqno. Guaranteed to be a positive number. Greater than or equal to hd + * @param buf the buffer to write to + */ + public static void writeLongSequence(long hd, long hr, ByteBuffer buf) { + if(hr < hd) + throw new IllegalArgumentException("hr (" + hr + ") has to be >= hd (" + hd + ")"); + + if(hd == 0 && hr == 0) { + buf.put((byte)0); + return; + } + + long delta=hr - hd; + + // encode highest_delivered followed by delta + byte bytes_for_hd=bytesRequiredFor(hd), bytes_for_delta=bytesRequiredFor(delta); + byte bytes_needed=encodeLength(bytes_for_hd, bytes_for_delta); + buf.put(bytes_needed); + + for(int i=0; i < bytes_for_hd; i++) + buf.put(getByteAt(hd, i)); + + for(int i=0; i < bytes_for_delta; i++) + buf.put(getByteAt(delta, i)); + } + + /** + * Writes 2 sequence numbers (seqnos) in compressed format to an output stream. + * The seqnos are non-negative and hr is guaranteed to be >= hd. + * <p/> + * Once variable-length encoding has been implemented, this method will probably get dropped as we can simply + * write the 2 longs individually. + * @param hd the highest delivered seqno. Guaranteed to be a positive number + * @param hr the highest received seqno. Guaranteed to be a positive number. Greater than or equal to hd + * @param out the output stream to write to + */ + public static void writeLongSequence(long hd, long hr, DataOutput out) throws IOException { + if(hr < hd) + throw new IllegalArgumentException("hr (" + hr + ") has to be >= hd (" + hd + ")"); + + if(hd == 0 && hr == 0) { + out.write(0); + return; + } + + long delta=hr - hd; + + // encode highest_delivered followed by delta + byte bytes_for_hd=bytesRequiredFor(hd), bytes_for_delta=bytesRequiredFor(delta); + byte bytes_needed=encodeLength(bytes_for_hd, bytes_for_delta); + out.write(bytes_needed); + + for(int i=0; i < bytes_for_hd; i++) + out.write(getByteAt(hd, i)); + + for(int i=0; i < bytes_for_delta; i++) + out.write(getByteAt(delta, i)); + } + + /** + * Reads 2 compressed longs from buf. + * <p/> + * Once variable-length encoding has been implemented, this method will probably get dropped as we can simply + * read the 2 longs individually. + * @param buf the buffer to read from + * @return an array of 2 longs (hd and hr) + */ + public static long[] readLongSequence(ByteBuffer buf) { + byte len=buf.get(); + if(len == 0) + return new long[]{0,0}; + + byte[] lengths=decodeLength(len); + long[] seqnos=new long[2]; + byte[] retval=new byte[lengths[0] + lengths[1]]; + buf.get(retval, 0, retval.length); + seqnos[0]=makeLong(retval, 0, lengths[0]); + seqnos[1]=makeLong(retval, lengths[0], lengths[1]) + seqnos[0]; + return seqnos; + } + + /** + * Reads 2 compressed longs from in. + * Reads 2 compressed longs from buf. + * <p/> + * Once variable-length encoding has been implemented, this method will probably get dropped as we can simply + * read the 2 longs individually. + * @param in the input stream to read from + * @return an array of 2 longs (hd and hr) + */ + public static long[] readLongSequence(DataInput in) throws IOException { + byte len=in.readByte(); + if(len == 0) + return new long[]{0,0}; + + byte[] lengths=decodeLength(len); + long[] seqnos=new long[2]; + byte[] buf=new byte[lengths[0] + lengths[1]]; + in.readFully(buf, 0, buf.length); + seqnos[0]=makeLong(buf, 0, lengths[0]); + seqnos[1]=makeLong(buf, lengths[0], lengths[1]) + seqnos[0]; + return seqnos; + } + + + + public static byte size(long hd, long hr) { + if(hd == 0 && hr == 0) + return 1; + + byte num_bytes_for_hd=bytesRequiredFor(hd), num_bytes_for_delta=bytesRequiredFor(hr - hd); + return (byte)(num_bytes_for_hd + num_bytes_for_delta + 1); + } + + public static long makeLong(byte[] buf, int offset, int bytes_to_read) { + long retval=0; + for(int i=0; i < bytes_to_read; i++) { + byte b=buf[offset + i]; + retval |= ((long)b & 0xff) << (i * 8); + } + return retval; + } + + public static int makeInt(byte[] buf, int offset, int bytes_to_read) { + int retval=0; + for(int i=0; i < bytes_to_read; i++) { + byte b=buf[offset + i]; + retval |= ((int)b & 0xff) << (i * 8); + } + return retval; + } + + + + // -------------------- float ----------------------- // + + /** + * Writes a float to a ByteBuffer + * @param num the float to be written + * @param buf the buffer + */ + public static void writeFloat(float num, ByteBuffer buf) { + writeInt(Float.floatToIntBits(num), buf); + } + + /** + * Writes a float to an output stream + * @param num the float to be written + * @param out the output stream + */ + public static void writeFloat(float num, DataOutput out) throws IOException { + writeInt(Float.floatToIntBits(num), out); + } + + public static void writeFloat(float num, byte[] buf, int offset) { + writeInt(Float.floatToIntBits(num), buf, offset); + } + + /** + * Reads a a float from a buffer. + * @param buf the buffer + * @return the float read from the buffer + */ + public static float readFloat(ByteBuffer buf) { + return Float.intBitsToFloat(readInt(buf)); + } + + /** + * Reads a a float from an input stream. + * @param in the input stream + * @return the float read from the input stream + */ + public static float readFloat(DataInput in) throws IOException { + return Float.intBitsToFloat(readInt(in)); + } + + public static float readFloat(byte[] buf, int offset) { + return Float.intBitsToFloat(readInt(buf, offset)); + } + + + /** + * Computes the size of a variable-length encoded float + * @param num the float + * @return the number of bytes needed to variable-length encode num + */ + public static int size(float num) { + return size(Float.floatToIntBits(num)); + } + + + + // -------------------- double ---------------------- // + + /** + * Writes a double to a ByteBuffer + * @param num the double to be written + * @param buf the buffer + */ + public static void writeDouble(double num, ByteBuffer buf) { + writeLong(Double.doubleToLongBits(num), buf); + } + + /** + * Writes a double to an output stream + * @param num the double to be written + * @param out the output stream + */ + public static void writeDouble(double num, DataOutput out) throws IOException { + writeLong(Double.doubleToLongBits(num), out); + } + + public static void writeDouble(double num, byte[] buf, int offset) { + writeLong(Double.doubleToLongBits(num), buf, offset); + } + + + /** + * Reads a double from a buffer. + * @param buf the buffer + * @return the double read from the buffer + */ + public static double readDouble(ByteBuffer buf) { + return Double.longBitsToDouble(readLong(buf)); + } + + /** + * Reads a double from an input stream + * @param in the input stream + * @return the double read from the input stream + */ + public static double readDouble(DataInput in) throws IOException { + return Double.longBitsToDouble(readLong(in)); + } + + public static double readDouble(byte[] buf, int offset) { + return Double.longBitsToDouble(readLong(buf, offset)); + } + + /** + * Computes the size of a variable-length encoded double + * @param num the double + * @return the number of bytes needed to variable-length encode num + */ + public static int size(double num) { + return size(Double.doubleToLongBits(num)); + } + + + + + // -------------------- String ---------------------- // + + /** + * Writes a string to buf. The length of the string is written first, followed by the chars (as single-byte values). + * Multi-byte values are truncated: only the lower byte of each multi-byte char is written, similar to + * {@link DataOutput#writeChars(String)}. + * @param s the string + * @param buf the buffer + */ + public static void writeString(String s, ByteBuffer buf) { + buf.put((byte)(s != null? 1 : 0)); + if(s != null) { + byte[] bytes=s.getBytes(); + writeInt(bytes.length, buf); + buf.put(bytes); + } + } + + /** + * Writes a string to buf. The length of the string is written first, followed by the chars (as single-byte values). + * Multi-byte values are truncated: only the lower byte of each multi-byte char is written, similar to + * {@link DataOutput#writeChars(String)}. + * @param s the string + * @param out the output stream + */ + public static void writeString(String s, DataOutput out) throws IOException { + if(s != null) { + out.write(1); + out.writeUTF(s); + } + else + out.write(0); + } + + /** + * Reads a string from buf. The length is read first, followed by the chars. Each char is a single byte + * @param buf the buffer + * @return the string read from buf + */ + public static String readString(ByteBuffer buf) { + if(buf.get() == 0) + return null; + int len=readInt(buf); + byte[] bytes=new byte[len]; + buf.get(bytes); + return new String(bytes); + } + + /** + * Reads a string from buf. The length is read first, followed by the chars. Each char is a single byte + * @param in the input stream + * @return the string read from buf + */ + public static String readString(DataInput in) throws IOException { + int b=in.readByte(); + if(b == 1) + return in.readUTF(); + return null; + } + + + /** + * Measures the number of bytes required to encode a string, taking multibyte characters into account. Measures + * strings written by {@link DataOutput#writeUTF(String)}. + * @param str the string + * @return the number of bytes required for encoding str + */ + public static int sizeUTF(String str) { + int len=str != null? str.length() : 0, utflen=2; + if(len == 0) + return utflen; + for(int i = 0; i < len; i++) { + int c=str.charAt(i); + if((c >= 0x0001) && (c <= 0x007F)) + utflen++; + else if (c > 0x07FF) + utflen += 3; + else + utflen += 2; + } + return utflen; + } + + public static int size(String str) { + if(str == null) + return Global.BYTE_SIZE; + byte[] bytes=str.getBytes(); + return Global.BYTE_SIZE + size(bytes.length) + bytes.length; + } + + + + // ------------------ AsciiString ------------------- // + /** + * Writes an AsciiString to buf. The length of the string is written first, followed by the chars (as single-byte values). + * @param s the string + * @param buf the buffer + */ + public static void writeAsciiString(AsciiString s, ByteBuffer buf) { + short length=(short)(s != null? s.length() : -1); + buf.putShort(length); + if(s != null) + buf.put(s.chars()); + } + + /** + * Writes an AsciiString to buf. The length of the string is written first, followed by the chars (as single-byte values). + * @param s the string + * @param out the output stream + */ + public static void writeAsciiString(AsciiString s, DataOutput out) throws IOException { + short length=(short)(s != null? s.length() : -1); + out.writeShort(length); + if(s != null) + out.write(s.chars()); + } + + /** + * Reads an AsciiString from buf. The length is read first, followed by the chars. Each char is a single byte + * @param buf the buffer + * @return the string read from buf + */ + public static AsciiString readAsciiString(ByteBuffer buf) { + short len=buf.getShort(); + if(len < 0) + return null; + AsciiString retval=new AsciiString(len); + buf.get(retval.chars()); + return retval; + } + + /** + * Reads an AsciiString from buf. The length is read first, followed by the chars. Each char is a single byte + * @param in the input stream + * @return the string read from buf + */ + public static AsciiString readAsciiString(DataInput in) throws IOException { + short len=in.readShort(); + if(len < 0) + return null; + AsciiString retval=new AsciiString(len); + in.readFully(retval.chars()); + return retval; + } + + + /** + * Measures the number of bytes required to encode an AsciiSring. + * @param str the string + * @return the number of bytes required for encoding str + */ + public static int size(AsciiString str) { + return str == null? Global.SHORT_SIZE : Global.SHORT_SIZE + str.length(); + } + + + + /** + * Encodes the number of bytes needed into a single byte. The first number is encoded in the first nibble (the + * first 4 bits), the second number in the second nibble + * @param len1 The number of bytes needed to store a long. Must be between 0 and 8 + * @param len2 The number of bytes needed to store a long. Must be between 0 and 8 + * @return The byte storing the 2 numbers len1 and len2 + */ + protected static byte encodeLength(byte len1, byte len2) { + byte retval=len2; + retval |= (len1 << 4); + return retval; + } + + protected static byte[] decodeLength(byte len) { + return new byte[]{(byte)((len & 0xff) >> 4),(byte)(len & ~0xf0)}; // 0xf0 is the first nibble set (11110000) + } + + protected static byte bytesRequiredFor(long number) { + if(number >> 56 != 0) return 8; + if(number >> 48 != 0) return 7; + if(number >> 40 != 0) return 6; + if(number >> 32 != 0) return 5; + if(number >> 24 != 0) return 4; + if(number >> 16 != 0) return 3; + if(number >> 8 != 0) return 2; + return 1; + } + + protected static byte bytesRequiredFor(int number) { + if(number >> 24 != 0) return 4; + if(number >> 16 != 0) return 3; + if(number >> 8 != 0) return 2; + return 1; + } + + + static protected byte getByteAt(long num, int index) { + return (byte)((num >> (index * 8))); + } + + +}
src/org/jgroups/util/ByteArrayDataInputStream.java+282 −0 added@@ -0,0 +1,282 @@ +package org.jgroups.util; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; + +/** + * Implements {@link java.io.DataInput} over a byte[] buffer. This class is not thread safe. + * @author Bela Ban + * @since 3.5 + */ +public class ByteArrayDataInputStream implements DataInput { + protected final byte[] buf; + protected int pos; // current position to read next byte from buf + + // index of last byte to be read, reading beyond will return -1 or throw EOFException. Limit has to be < buf.length + protected final int limit; + + public ByteArrayDataInputStream(byte[] buf) { + this(buf, 0, buf != null? buf.length : 0); + } + + public ByteArrayDataInputStream(byte[] buf, int offset, int length) { + this.buf=buf; + this.limit=Math.min(buf.length, offset+length); + this.pos=checkBounds(offset); + } + + public ByteArrayDataInputStream(ByteBuffer buffer) { + int offset=buffer.hasArray()? buffer.arrayOffset() + buffer.position() : buffer.position(), + len=buffer.remaining(); + if(!buffer.isDirect()) { + this.buf=buffer.array(); + this.pos=offset; + this.limit=offset+len; + } + else { // by default use a copy; but of course implementers of Receiver can override this + byte[] tmp=new byte[len]; + buffer.get(tmp, 0, len); + this.buf=tmp; + this.pos=0; + this.limit=len; + } + } + + public ByteArrayDataInputStream position(int pos) { + this.pos=checkBounds(pos); return this; + } + + public int position() {return pos;} + public int limit() {return limit;} + public int capacity() {return buf.length;} + + + + /** + * Reads the next byte of data from buf. The value byte is returned as an <code>int</code> in the range + * <code>0</code> to <code>255</code>. If no byte is available because the end of the buffer has been reached, + * the value <code>-1</code> is returned. + * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. + */ + public int read() { + return (pos < limit) ? (buf[pos++] & 0xff) : -1; + } + + public int read(byte b[], int off, int len) { + if (b == null) + throw new NullPointerException(); + + if(off < 0 || len < 0 || len > b.length - off) + throw new IndexOutOfBoundsException(); + + if(pos >= limit) + return -1; + + int avail=limit - pos; + if(len > avail) + len=avail; + if(len <= 0) + return 0; + + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + public void readFully(byte[] b, int off, int len) throws IOException { + if (len < 0) + throw new IndexOutOfBoundsException(); + int n = 0; + while (n < len) { + int count=read(b, off + n, len - n); + if (count < 0) + throw new EOFException(); + n += count; + } + } + + public int skipBytes(int n) { + int k = limit - pos; + if (n < k) + k = n < 0 ? 0 : n; + pos += k; + return k; + } + + public boolean readBoolean() throws IOException { + int ch=read(); + if(ch < 0) + throw new EOFException(); + return ch != 0; + } + + public byte readByte() throws IOException { + int ch=read(); + if (ch < 0) + throw new EOFException(); + return (byte)(ch); + } + + public int readUnsignedByte() throws IOException { + int ch=read(); + if (ch < 0) + throw new EOFException(); + return ch; + } + + public short readShort() throws IOException { + int ch1=read(); + int ch2=read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return (short)((ch1 << 8) + (ch2 << 0)); + } + + public int readUnsignedShort() throws IOException { + int ch1=read(); + int ch2=read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return (ch1 << 8) + (ch2 << 0); + } + + public char readChar() throws IOException { + int ch1=read(); + int ch2=read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return (char)((ch1 << 8) + (ch2 << 0)); + } + + public int readInt() throws IOException { + int ch1=read(); + int ch2=read(); + int ch3=read(); + int ch4=read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) + throw new EOFException(); + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + public long readLong() throws IOException { + return (((long)read() << 56) + + ((long)(read() & 0xff) << 48) + + ((long)(read() & 0xff) << 40) + + ((long)(read() & 0xff) << 32) + + ((long)(read() & 0xff) << 24) + + ((read() & 0xff) << 16) + + ((read() & 0xff) << 8) + + ((read() & 0xff) << 0)); + } + + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + public String readLine() throws IOException { + StringBuilder sb=new StringBuilder(35); + int ch; + + while(true) { + ch=read(); + if(ch == -1) + return sb.length() == 0? null : sb.toString(); + if(ch == '\r') + ; + else { + if(ch == '\n') + break; + sb.append((char)ch); + } + } + return sb.toString(); + } + + public String readUTF() throws IOException { + int utflen=readUnsignedShort(); + byte[] bytearr=new byte[utflen]; + char[] chararr=new char[utflen]; + + if(((short)utflen) == -1) + return null; + + int c, char2, char3; + int count = 0; + int chararr_count=0; + + readFully(bytearr,0,utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) break; + count++; + chararr[chararr_count++]=(char)c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: + /* 0xxxxxxx*/ + count++; + chararr[chararr_count++]=(char)c; + break; + case 12: case 13: + /* 110x xxxx 10xx xxxx*/ + count += 2; + if (count > utflen) + throw new UTFDataFormatException( + "malformed input: partial character at end"); + char2 = (int) bytearr[count-1]; + if ((char2 & 0xC0) != 0x80) + throw new UTFDataFormatException( + "malformed input around byte " + count); + chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | + (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) + throw new UTFDataFormatException( + "malformed input: partial character at end"); + char2 = (int) bytearr[count-2]; + char3 = (int) bytearr[count-1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) + throw new UTFDataFormatException( + "malformed input around byte " + (count-1)); + chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | + ((char2 & 0x3F) << 6) | + ((char3 & 0x3F) << 0)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException( + "malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararr_count); + } + + public String toString() { + return "pos=" + pos + " lim=" + limit + " cap=" + buf.length; + } + + protected int checkBounds(int pos) { + if(pos < 0 || pos >= limit) + throw new IndexOutOfBoundsException("pos=" + pos + ", limit=" + limit); + return pos; + } +}
src/org/jgroups/util/ByteArrayDataOutputStream.java+206 −0 added@@ -0,0 +1,206 @@ +package org.jgroups.util; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Implements {@link java.io.DataOutput} over a byte[] buffer. The byte[] buffer expands when needed; however, it + * doesn't double but only expands minimally, to accommodate the additional data. + * It is therefore recommended to always size the buffer to the actual number of bytes needed. + * This class is not thread safe. + * @author Bela Ban + * @since 3.5 + */ +public class ByteArrayDataOutputStream implements DataOutput { + protected byte[] buf; + protected int pos; + protected boolean grow_exponentially; // if true, the buffer will double every time + + public ByteArrayDataOutputStream() { + this(32, false); + } + + public ByteArrayDataOutputStream(int capacity) { + this(capacity, false); + } + + public ByteArrayDataOutputStream(int capacity, boolean grow_exponentially) { + this.buf=new byte[capacity]; + this.grow_exponentially=grow_exponentially; + } + + public ByteArrayDataOutputStream position(int pos) {this.pos=checkBounds(pos); return this;} + public int position() {return pos;} + public byte[] buffer() {return buf;} + public Buffer getBuffer() {return new Buffer(buf, 0, pos);} + public ByteBuffer getByteBuffer() {return ByteBuffer.wrap(buf, 0, pos);} + public boolean growExponentially() {return grow_exponentially;} + public ByteArrayDataOutputStream growExponentially(boolean b) {grow_exponentially=b; return this;} + + + public void write(int b) { + ensureCapacity(1); + buf[pos++]=(byte)b; + } + + public void write(byte[] b) { + write(b, 0, b.length); + } + + public void write(byte[] b, int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) + throw new IndexOutOfBoundsException(String.format("off=%d, len=%d, b.length=%d", off, len, b.length)); + ensureCapacity(len); + + System.arraycopy(b, off, buf, pos, len); + pos+=len; + } + + public void writeBoolean(boolean v) { + write(v ? 1 : 0); + } + + public void writeByte(int v) { + write(v); + } + + public void writeShort(int v) { + write((v >>> 8) & 0xFF); + write((v >>> 0) & 0xFF); + } + + public void writeChar(int v) { + write((v >>> 8) & 0xFF); + write((v >>> 0) & 0xFF); + } + + public void writeInt(int v) { + write((v >>> 24) & 0xFF); + write((v >>> 16) & 0xFF); + write((v >>> 8) & 0xFF); + write((v >>> 0) & 0xFF); + } + + public void writeLong(long v) { + write((byte)(v >>> 56)); + write((byte)(v >>> 48)); + write((byte)(v >>> 40)); + write((byte)(v >>> 32)); + write((byte)(v >>> 24)); + write((byte)(v >>> 16)); + write((byte)(v >>> 8)); + write((byte)(v >>> 0)); + } + + public void writeFloat(float v) { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) { + int len=s != null? s.length() : 0; + if(len > 0) + ensureCapacity(len); + for(int i = 0 ; i < len ; i++) + write((byte)s.charAt(i)); + } + + public void writeChars(String s) { + int len=s != null? s.length() : 0; + if(len > 0) + ensureCapacity(len *2); // 2 bytes per char + + for(int i = 0 ; i < len ; i++) { + int v = s.charAt(i); + writeChar(v); + } + } + + public void writeUTF(String str) { + int strlen=str != null? str.length() : 0; + if(strlen > 0) + ensureCapacity(strlen *2 + 2); + + int utflen = 0; + int c, count = 0; + + if(str == null) { + writeShort(-1); + return; + } + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + utflen++; + else if (c > 0x07FF) + utflen += 3; + else + utflen += 2; + } + + if (utflen > 65535) + throw new IllegalArgumentException("encoded string too long: " + utflen + " bytes"); + + byte[] bytearr=new byte[utflen+2]; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); + + int i=0; + for (i=0; i<strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) break; + bytearr[count++] = (byte) c; + } + + for (;i < strlen; i++){ + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + write(bytearr, 0, utflen+2); + } + + public String toString() { + return "pos=" + pos + " lim=" + buf.length; + } + + protected int checkBounds(int pos) { + if(pos < 0 || pos > buf.length) + throw new IndexOutOfBoundsException("pos=" + pos + ", length=" + buf.length); + return pos; + } + + /** Grows the buffer; whether it grow linearly or exponentially depends on grow_exponentially */ + protected void ensureCapacity(int bytes) { + int minCapacity=pos+bytes; + + if(minCapacity - buf.length > 0) { + int newCapacity=this.grow_exponentially? buf.length << 1 : buf.length + bytes + 32; + if(newCapacity - minCapacity < 0) + newCapacity=minCapacity; + if(newCapacity < 0) { + if(minCapacity < 0) // overflow + throw new OutOfMemoryError(); + newCapacity=Integer.MAX_VALUE; + } + // System.out.printf("growing buffer from %d -> %d (pos=%d, bytes=%d)\n", buf.length, newCapacity, pos, bytes); + buf=Arrays.copyOf(buf, newCapacity); + } + } +}
src/org/jgroups/util/MyReceiver.java+5 −3 modified@@ -3,27 +3,29 @@ import org.jgroups.Message; import org.jgroups.ReceiverAdapter; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * Generic receiver for a JChannel * @author Bela Ban * @since 3.3 */ public class MyReceiver<T> extends ReceiverAdapter { - protected final List<T> list=new ArrayList<T>(); + protected final List<T> list=new CopyOnWriteArrayList<T>(); protected String name; protected boolean verbose; + protected boolean raw_msgs; public void receive(Message msg) { - T obj=(T)msg.getObject(); + T obj=raw_msgs? (T)msg : (T)msg.getObject(); list.add(obj); if(verbose) { System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj); } } + public MyReceiver rawMsgs(boolean flag) {this.raw_msgs=flag; return this;} public List<T> list() {return list;} public MyReceiver<T> verbose(boolean flag) {verbose=flag; return this;} public String name() {return name;}
src/org/jgroups/util/Util.java+78 −6 modified@@ -10,6 +10,8 @@ import org.jgroups.protocols.*; import org.jgroups.protocols.pbcast.FLUSH; import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.STABLE; import org.jgroups.protocols.relay.SiteMaster; import org.jgroups.protocols.relay.SiteUUID; import org.jgroups.stack.IpAddress; @@ -204,6 +206,11 @@ public static void assertNull(String message, Object val) { assert val == null; } + public static int getNextHigherPowerOfTwo(int num) { + if(num <= 0) return 1; + int highestBit=Integer.highestOneBit(num); + return num <= highestBit? highestBit : highestBit << 1; + } public static String bold(String msg) { StringBuilder sb=new StringBuilder("\033[1m"); @@ -221,6 +228,29 @@ public static String getMessage(String key, Object ... args) { } + /** + * Returns a default stack for testing with transport = SHARED_LOOPBACK + * @param additional_protocols Any number of protocols to add to the top of the returned protocol list + * @return + */ + public static Protocol[] getTestStack(Protocol... additional_protocols) { + Protocol[] protocols={ + new SHARED_LOOPBACK(), + new PING(), + new NAKACK2(), + new UNICAST2(), + new STABLE(), + new GMS().joinTimeout(1000), + new FRAG2().fragSize(8000) + }; + + if(additional_protocols == null) + return protocols; + Protocol[] tmp=Arrays.copyOf(protocols,protocols.length + additional_protocols.length); + System.arraycopy(additional_protocols, 0, tmp, protocols.length, additional_protocols.length); + return tmp; + } + /** * Blocks until all channels have the same view * @param timeout How long to wait (max in ms) @@ -745,6 +775,22 @@ public static byte[] streamableToByteBuffer(Streamable obj) throws Exception { return result; } + public static <T extends Streamable> T streamableFromBuffer(Class<T> clazz,byte[] buffer,int offset,int length) throws Exception { + DataInput in=new ByteArrayDataInputStream(buffer,offset,length); + return (T)Util.readStreamable(clazz,in); + } + + public static Buffer streamableToBuffer(Streamable obj) { + final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512); + try { + Util.writeStreamable(obj,out); + return out.getBuffer(); + } + catch(Exception ex) { + return null; + } + } + public static byte[] collectionToByteBuffer(Collection<Address> c) throws Exception { byte[] result=null; @@ -764,10 +810,10 @@ public static void writeAuthToken(AuthToken token, DataOutput out) throws Except } public static AuthToken readAuthToken(DataInput in) throws Exception { - try{ - String type = Util.readString(in); - Object obj = Class.forName(type).newInstance(); - AuthToken token = (AuthToken) obj; + try { + String type=Util.readString(in); + Object obj=Class.forName(type).newInstance(); + AuthToken token=(AuthToken)obj; token.readFrom(in); return token; } @@ -777,6 +823,31 @@ public static AuthToken readAuthToken(DataInput in) throws Exception { } + public static String byteArrayToHexString(byte[] b) { + if(b == null) + return "null"; + StringBuilder sb = new StringBuilder(b.length * 2); + for (int i = 0; i < b.length; i++){ + int v = b[i] & 0xff; + if (v < 16) { sb.append('0'); } + sb.append(Integer.toHexString(v)); + } + return sb.toString().toUpperCase(); + } + + /** Compares 2 byte arrays, elements are treated as unigned */ + public static int compare(byte[] left,byte[] right) { + for(int i=0, j=0; i < left.length && j < right.length; i++,j++) { + int a=(left[i] & 0xff); + int b=(right[j] & 0xff); + if(a != b) { + return a - b; + } + } + return left.length - right.length; + } + + public static void writeView(View view, DataOutput out) throws Exception { if(view == null) { out.writeBoolean(false); @@ -914,10 +985,11 @@ public static int size(String s) { } public static int size(byte[] buf) { - int retval=Global.BYTE_SIZE + Global.INT_SIZE; + /* int retval=Global.BYTE_SIZE + Global.INT_SIZE; if(buf != null) retval+=buf.length; - return retval; + return retval;*/ + return buf == null? Global.BYTE_SIZE : Global.BYTE_SIZE + Global.INT_SIZE + buf.length; } private static Address readOtherAddress(DataInput in) throws Exception {
src/org/jgroups/View.java+36 −0 modified@@ -66,6 +66,10 @@ public View(Address creator, long id, List<Address> members) { } + public static View create(Address coord, long id, Address ... members) { + return new View(new ViewId(coord, id), Arrays.asList(members)); + } + /** * Returns the view ID of this view * if this view was created with the empty constructur, null will be returned @@ -85,6 +89,8 @@ public Address getCreator() { return vid.getCreator(); } + public Address getCoord() {return !members.isEmpty()? members.get(0) : null;} + /** * Returns a reference to the List of members (ordered) * Do NOT change this list, hence your will invalidate the view @@ -96,6 +102,15 @@ public List<Address> getMembers() { return Collections.unmodifiableList(members); } + /** Returns the underlying list. The caller <em>must not</em> modify the contents. Should not be used by + * application code ! This method may be removed at any time, so don't use it ! + */ + public List<Address> getMembersRaw() { + return members; + } + + + /** * Returns true, if this view contains a certain member * @@ -107,6 +122,27 @@ public boolean containsMember(Address mbr) { return mbr != null && members.contains(mbr); } + /** Returns true if all mbrs are elements of this view, false otherwise */ + public boolean containsMembers(Address ... mbrs) { + if(mbrs == null || members == null) + return false; + for(Address mbr: mbrs) { + if(!containsMember(mbr)) + return false; + } + return true; + } + + public boolean containsMembers(Collection<Address> mbrs) { + if(mbrs == null || members == null) + return false; + for(Address mbr: mbrs) { + if(!containsMember(mbr)) + return false; + } + return true; + } + public int compareTo(View o) { return vid.compareTo(o.vid);
tests/junit-functional/org/jgroups/protocols/ASYM_ENCRYPT_Test.java+322 −0 added@@ -0,0 +1,322 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.auth.MD5Token; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.JoinRsp; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.STABLE; +import org.jgroups.stack.ProtocolStack; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.crypto.SecretKey; +import java.util.Arrays; + +/** + * Tests use cases for {@link ASYM_ENCRYPT} described in https://issues.jboss.org/browse/JGRP-2021. + * @author Bela Ban + * @since 4.0 + */ +@Test(groups=Global.FUNCTIONAL,singleThreaded=true) +public class ASYM_ENCRYPT_Test extends EncryptTest { + protected static final short ASYM_ENCRYPT_ID; + + static { + ASYM_ENCRYPT_ID=ClassConfigurator.getProtocolId(ASYM_ENCRYPT.class); + } + + + @BeforeMethod protected void init() throws Exception { + super.init(getClass().getSimpleName()); + } + + @AfterMethod protected void destroy() { + super.destroy(); + } + + /** Calling methods in superclass. Kludge because TestNG doesn't call methods in superclass correctly **/ + public void testRegularMessageReception() throws Exception { + super.testRegularMessageReception(); + } + + public void testRegularMessageReceptionWithEmptyMessages() throws Exception { + super.testRegularMessageReceptionWithEmptyMessages(); + } + + public void testChecksum() throws Exception { + super.testChecksum(); + } + + public void testRogueMemberJoin() throws Exception { + super.testRogueMemberJoin(); + } + + public void testMessageSendingByRogue() throws Exception { + super.testMessageSendingByRogue(); + } + + public void testMessageSendingByRogueUsingEncryption() throws Exception { + super.testMessageSendingByRogueUsingEncryption(); + } + + public void testMessageReceptionByRogue() throws Exception { + super.testMessageReceptionByRogue(); + } + + public void testCapturingOfMessageByNonMemberAndResending() throws Exception { + super.testCapturingOfMessageByNonMemberAndResending(); + } + + public void testRogueViewInstallation() throws Exception { + super.testRogueViewInstallation(); + } + + + + /** + * A non-member sends a {@link EncryptHeader#SECRET_KEY_REQ} request to the key server. Asserts that the rogue member + * doesn't get the secret key. If it did, it would be able to decrypt all messages from cluster members! + */ + public void nonMemberGetsSecretKeyFromKeyServer() throws Exception { + Util.close(rogue); + + rogue=new JChannel(Util.getTestStack()).name("rogue"); + DISCARD discard=new DISCARD().setDiscardAll(true); + rogue.getProtocolStack().insertProtocol(discard, ProtocolStack.ABOVE, TP.class); + CustomENCRYPT encrypt=new CustomENCRYPT(); + encrypt.init(); + + rogue.getProtocolStack().insertProtocol(encrypt, ProtocolStack.BELOW, NAKACK2.class); + rogue.connect(cluster_name); // creates a singleton cluster + + assert rogue.getView().size() == 1; + GMS gms=(GMS)rogue.getProtocolStack().findProtocol(GMS.class); + View rogue_view=new View(a.getAddress(), a.getView().getViewId().getId(), + Arrays.asList(a.getAddress(),b.getAddress(),c.getAddress(),rogue.getAddress())); + gms.installView(rogue_view); + + + // now fabricate a KEY_REQUEST message and send it to the key server (A) + Message newMsg=new Message(a.getAddress(), rogue.getAddress(), encrypt.keyPair().getPublic().getEncoded()) + .putHeader(encrypt.getId(),new EncryptHeader(EncryptHeader.SECRET_KEY_REQ, encrypt.symVersion())); + + discard.setDiscardAll(false); + System.out.printf("-- sending KEY_REQUEST to key server %s\n", a.getAddress()); + encrypt.getDownProtocol().down(new Event(Event.MSG, newMsg)); + for(int i=0; i < 10; i++) { + SecretKey secret_key=encrypt.key; + if(secret_key != null) + break; + Util.sleep(500); + } + + discard.setDiscardAll(true); + gms.installView(View.create(rogue.getAddress(), 20, rogue.getAddress())); + System.out.printf("-- secret key is %s (should be null)\n", encrypt.key); + assert encrypt.key == null : String.format("should not have received secret key %s", encrypt.key); + } + + + + /** Verifies that a non-member (non-coord) cannot send a JOIN-RSP to a member */ + public void nonMemberInjectingJoinResponse() throws Exception { + Util.close(rogue); + rogue=create("rogue"); + ProtocolStack stack=rogue.getProtocolStack(); + AUTH auth=(AUTH)stack.findProtocol(AUTH.class); + auth.setAuthToken(new MD5Token("unknown_pwd")); + GMS gms=(GMS)stack.findProtocol(GMS.class); + gms.setMaxJoinAttempts(1); + DISCARD discard=new DISCARD().setDiscardAll(true); + stack.insertProtocol(discard, ProtocolStack.ABOVE, TP.class); + rogue.connect(cluster_name); + assert rogue.getView().size() == 1; + discard.setDiscardAll(false); + stack.removeProtocol(NAKACK2.class, UNICAST2.class); + + View rogue_view=View.create(a.getAddress(), a.getView().getViewId().getId() +5, + a.getAddress(),b.getAddress(),c.getAddress(),rogue.getAddress()); + JoinRsp join_rsp=new JoinRsp(rogue_view, null); + GMS.GmsHeader gms_hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP); + Message rogue_join_rsp=new Message(b.getAddress(), rogue.getAddress()).putHeader(GMS_ID, gms_hdr) + .setBuffer(GMS.marshal(join_rsp)).setFlag(Message.Flag.NO_RELIABILITY); // bypasses NAKACK2 / UNICAST3 + rogue.down(new Event(Event.MSG, rogue_join_rsp)); + for(int i=0; i < 10; i++) { + if(b.getView().size() > 3) + break; + Util.sleep(500); + } + assert b.getView().size() == 3 : String.format("B's view is %s, but should be {A,B,C}", b.getView()); + } + + + + /** The rogue node has an incorrect {@link AUTH} config (secret) and can thus not join */ + public void rogueMemberCannotJoinDueToAuthRejection() throws Exception { + Util.close(rogue); + rogue=create("rogue"); + AUTH auth=(AUTH)rogue.getProtocolStack().findProtocol(AUTH.class); + auth.setAuthToken(new MD5Token("unknown_pwd")); + GMS gms=(GMS)rogue.getProtocolStack().findProtocol(GMS.class); + gms.setMaxJoinAttempts(2); + rogue.connect(cluster_name); + System.out.printf("Rogue's view is %s\n", rogue.getView()); + assert rogue.getView().size() == 1 : String.format("rogue should have a singleton view of itself, but doesn't: %s", rogue.getView()); + } + + + public void mergeViewInjectionByNonMember() throws Exception { + Util.close(rogue); + rogue=create("rogue"); + AUTH auth=(AUTH)rogue.getProtocolStack().findProtocol(AUTH.class); + auth.setAuthToken(new MD5Token("unknown_pwd")); + GMS gms=(GMS)rogue.getProtocolStack().findProtocol(GMS.class); + gms.setMaxJoinAttempts(1); + rogue.connect(cluster_name); + + MergeView merge_view=new MergeView(a.getAddress(), a.getView().getViewId().getId()+5, + Arrays.asList(a.getAddress(), b.getAddress(), c.getAddress(), rogue.getAddress()), null); + GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW, merge_view); + Message merge_view_msg=new Message(null).putHeader(GMS_ID, hdr) + .setFlag(Message.Flag.NO_RELIABILITY); + System.out.printf("** %s: trying to install MergeView %s in all members\n", rogue.getAddress(), merge_view); + rogue.down(new Event(Event.MSG, merge_view_msg)); + + // check if A, B or C installed the MergeView sent by rogue: + for(int i=0; i < 10; i++) { + boolean rogue_views_installed=false; + + for(JChannel ch: Arrays.asList(a,b,c)) + if(ch.getView().containsMember(rogue.getAddress())) + rogue_views_installed=true; + if(rogue_views_installed) + break; + Util.sleep(500); + } + for(JChannel ch: Arrays.asList(a,b,c)) + System.out.printf("%s: %s\n", ch.getAddress(), ch.getView()); + for(JChannel ch: Arrays.asList(a,b,c)) + assert !ch.getView().containsMember(rogue.getAddress()); + } + + + /** Tests that when {ABC} -> {AB}, neither A nor B can receive a message from non-member C */ + public void testMessagesByLeftMember() throws Exception { + View view=View.create(a.getAddress(), a.getView().getViewId().getId()+1, a.getAddress(),b.getAddress()); + for(JChannel ch: Arrays.asList(a,b)) { + GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class); + gms.installView(view); + }; + printView(a,b,c); + c.getProtocolStack().removeProtocol(NAKACK2.class); // to prevent A and B from discarding C as non-member + + Util.sleep(1000); // give members time to handle the new view + c.send(null, "hello world from left member C!"); + for(int i=0; i < 10; i++) { + if(ra.size() > 0 || rb.size() > 0) + break; + Util.sleep(500); + } + assert ra.size() == 0 : String.format("A: received msgs from non-member C: %s", print(ra.list())); + assert rb.size() == 0 : String.format("B: received msgs from non-member C: %s", print(rb.list())); + } + + /** Tests that a left member C cannot decrypt messages from the cluster */ + public void testEavesdroppingByLeftMember() throws Exception { + printSymVersion(a,b,c); + View view=View.create(a.getAddress(), a.getView().getViewId().getId()+1, a.getAddress(),b.getAddress()); + for(JChannel ch: Arrays.asList(a,b)) { + GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class); + gms.installView(view); + }; + printView(a,b,c); + c.getProtocolStack().removeProtocol(NAKACK2.class); // to prevent A and B from discarding C as non-member + Util.waitUntilAllChannelsHaveSameSize(10000, 500, a,b); + + // somewhat of a kludge as we don't have UNICAST3: if we didn't remove C's connection to A, C might retransmit + // the JOIN-REQ to A and get added to the cluster, so the code below would fail as C would be able to eavesdrop + // on A and B + UNICAST2 uni=(UNICAST2)c.getProtocolStack().findProtocol(UNICAST2.class); + uni.removeConnection(a.getAddress()); + Util.sleep(5000); // give members time to handle the new view + + printView(a,b,c); + printSymVersion(a,b,c); + a.send(null, "hello from A"); + b.send(null, "hello from B"); + + for(int i=0; i < 10; i++) { + if(rc.size() > 0) + break; + Util.sleep(500); + } + assert rc.size() == 0 : String.format("C: received msgs from cluster: %s", print(rc.list())); + } + + + protected JChannel create(String name) throws Exception { + JChannel ch=new JChannel(Util.getTestStack()).name(name); + ProtocolStack stack=ch.getProtocolStack(); + EncryptBase encrypt=createENCRYPT(); + stack.insertProtocol(encrypt, ProtocolStack.BELOW, NAKACK2.class); + AUTH auth=new AUTH(); + auth.setAuthCoord(true); + auth.setAuthToken(new MD5Token("mysecret")); // .setAuthCoord(false); + stack.insertProtocol(auth, ProtocolStack.BELOW, GMS.class); + stack.findProtocol(GMS.class).setValue("join_timeout", 1000); // .setValue("view_ack_collection_timeout", 10); + STABLE stable=((STABLE)stack.findProtocol(STABLE.class)); + stable.desiredAverageGossip(1000); + stable.setMaxBytes(500); + return ch; + } + + protected void printSymVersion(JChannel ... channels) { + for(JChannel ch: channels) { + ASYM_ENCRYPT encr=(ASYM_ENCRYPT)ch.getProtocolStack().findProtocol(ASYM_ENCRYPT.class); + byte[] sym_version=encr.symVersion(); + System.out.printf("sym-version %s: %s\n", ch.getAddress(), Util.byteArrayToHexString(sym_version)); + } + } + + protected void printView(JChannel ... channels) { + for(JChannel ch: channels) + System.out.printf("%s: %s\n", ch.getAddress(), ch.getView()); + } + + + // Note that setting encrypt_entire_message to true is critical here, or else some of the tests in this + // unit test would fail! + protected ASYM_ENCRYPT createENCRYPT() throws Exception { + ASYM_ENCRYPT encrypt=new ASYM_ENCRYPT().encryptEntireMessage(true).signMessages(true); + encrypt.init(); + return encrypt; + } + + + + protected static class CustomENCRYPT extends ASYM_ENCRYPT { + protected SecretKey key; + + public CustomENCRYPT() { + this.id=ASYM_ENCRYPT_ID; + } + + protected Object handleUpEvent(Message msg, EncryptHeader hdr) { + if(hdr.type() == EncryptHeader.SECRET_KEY_RSP) { + try { + key=decodeKey(msg.getBuffer()); + System.out.printf("received secret key %s !\n", key); + } + catch(Exception e) { + e.printStackTrace(); + } + } + return super.handleUpEvent(msg, hdr); + } + } + +}
tests/junit-functional/org/jgroups/protocols/ENCRYPTAsymmetricTest.java+11 −10 modified@@ -9,7 +9,6 @@ import org.jgroups.*; import org.jgroups.conf.ClassConfigurator; -import org.jgroups.protocols.ENCRYPT.EncryptHeader; import org.jgroups.stack.Protocol; import org.jgroups.util.Util; import org.testng.annotations.BeforeClass; @@ -21,10 +20,12 @@ import java.security.Security; import java.util.*; +import static javax.swing.text.html.HTML.Tag.HEAD; + /** * @author xenephon */ -@Test(groups=Global.FUNCTIONAL, sequential=false) +@Test(groups=Global.FUNCTIONAL) public class ENCRYPTAsymmetricTest { static final short ENCRYPT_ID=ClassConfigurator.getProtocolId(ENCRYPT.class); @@ -135,7 +136,7 @@ public static void testViewChangeBecomeKeyserver() throws Exception { encrypt.keyServer=false; Message msg=new Message(); msg.setBuffer(cipher.doFinal("hello".getBytes())); - msg.putHeader(ENCRYPT_ID, new EncryptHeader(EncryptHeader.ENCRYPT, symVersion)); + msg.putHeader(ENCRYPT_ID, new ENCRYPT.EncryptHeader(ENCRYPT.EncryptHeader.ENCRYPT, symVersion)); Event evt=new Event(Event.MSG, msg); @@ -158,7 +159,7 @@ public static void testViewChangeBecomeKeyserver() throws Exception { // send another encrypted message Message msg2=new Message(); msg2.setBuffer(cipher.doFinal("hello2".getBytes())); - msg2.putHeader(ENCRYPT_ID, new EncryptHeader(EncryptHeader.ENCRYPT, symVersion)); + msg2.putHeader(ENCRYPT_ID, new ENCRYPT.EncryptHeader(ENCRYPT.EncryptHeader.ENCRYPT, symVersion)); // we should have three messages now in our observer // that are decrypted @@ -223,7 +224,7 @@ public static void testViewChangeNewKeyServer() throws Exception { Cipher cipher=server.getSymEncodingCipher(); Message msg=new Message(); msg.setBuffer(cipher.doFinal("hello".getBytes())); - msg.putHeader(ENCRYPT_ID, new EncryptHeader(EncryptHeader.ENCRYPT, symVersion)); + msg.putHeader(ENCRYPT_ID, new ENCRYPT.EncryptHeader(ENCRYPT.EncryptHeader.ENCRYPT, symVersion)); Event evt=new Event(Event.MSG, msg); @@ -242,7 +243,7 @@ public static void testViewChangeNewKeyServer() throws Exception { Event sent=(Event)peerObserver.getDownMessages().get("message0"); - Util.assertEquals(((EncryptHeader)((Message)sent.getArg()).getHeader(ENCRYPT_ID)).getType(), EncryptHeader.KEY_REQUEST); + Util.assertEquals(((ENCRYPT.EncryptHeader)((Message)sent.getArg()).getHeader(ENCRYPT_ID)).getType(), ENCRYPT.EncryptHeader.KEY_REQUEST); Util.assertEquals(new String(((Message)sent.getArg()).getBuffer()), new String(peer.getKpair().getPublic().getEncoded())); // send this event to server @@ -251,7 +252,7 @@ public static void testViewChangeNewKeyServer() throws Exception { Event reply=(Event)serverObserver.getDownMessages().get("message1"); //assert that reply is the session key encrypted with peer's public key - Util.assertEquals(((EncryptHeader)((Message)reply.getArg()).getHeader(ENCRYPT_ID)).getType(), EncryptHeader.SECRETKEY); + Util.assertEquals(((ENCRYPT.EncryptHeader)((Message)reply.getArg()).getHeader(ENCRYPT_ID)).getType(), ENCRYPT.EncryptHeader.SECRETKEY); assert !peer.getDesKey().equals(server.getDesKey()); @@ -264,7 +265,7 @@ public static void testViewChangeNewKeyServer() throws Exception { // send another encrypted message to peer to test queue Message msg2=new Message(); msg2.setBuffer(cipher.doFinal("hello2".getBytes())); - msg2.putHeader(ENCRYPT_ID, new EncryptHeader(EncryptHeader.ENCRYPT, symVersion)); + msg2.putHeader(ENCRYPT_ID, new ENCRYPT.EncryptHeader(ENCRYPT.EncryptHeader.ENCRYPT, symVersion)); Event evt2=new Event(Event.MSG, msg2); @@ -362,7 +363,7 @@ public static void testViewChangeNewKeyServerNewKey() throws Exception { Event sent=(Event)peerObserver.getDownMessages().get("message0"); // ensure type and that request contains peers pub key - Util.assertEquals(((EncryptHeader)((Message)sent.getArg()).getHeader(ENCRYPT_ID)).getType(), EncryptHeader.KEY_REQUEST); + Util.assertEquals(((ENCRYPT.EncryptHeader)((Message)sent.getArg()).getHeader(ENCRYPT_ID)).getType(), ENCRYPT.EncryptHeader.KEY_REQUEST); Util.assertEquals(new String(((Message)sent.getArg()).getBuffer()), new String(peer.getKpair().getPublic().getEncoded())); //assume that server is no longer available and peer2 is new server @@ -383,7 +384,7 @@ public static void testViewChangeNewKeyServerNewKey() throws Exception { Event reply=(Event)peer2Observer.getDownMessages().get("message1"); //assert that reply is the session key encrypted with peer's public key - Util.assertEquals(((EncryptHeader)((Message)reply.getArg()).getHeader(ENCRYPT_ID)).getType(), EncryptHeader.SECRETKEY); + Util.assertEquals(((ENCRYPT.EncryptHeader)((Message)reply.getArg()).getHeader(ENCRYPT_ID)).getType(), ENCRYPT.EncryptHeader.SECRETKEY); assert !peer.getDesKey().equals(peer2.getDesKey());
tests/junit-functional/org/jgroups/protocols/EncryptTest.java+357 −0 added@@ -0,0 +1,357 @@ +package org.jgroups.protocols; + +import org.jgroups.*; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.demos.KeyStoreGenerator; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.protocols.pbcast.NakAckHeader2; +import org.jgroups.protocols.pbcast.STABLE; +import org.jgroups.util.Buffer; +import org.jgroups.util.ByteArrayDataOutputStream; +import org.jgroups.util.MyReceiver; +import org.jgroups.util.Util; +import org.testng.annotations.Test; + +import javax.crypto.SecretKey; +import java.lang.reflect.Field; +import java.util.List; + +import static java.util.Arrays.asList; + + +/** + * Base class for tests {@link SYM_ENCRYPT_Test} and {@link ASYM_ENCRYPT_Test} + * @author Bela Ban + * @since 4.0 + */ + +@Test(enabled=false) +public abstract class EncryptTest { + protected JChannel a,b,c,rogue; + protected MyReceiver<Message> ra, rb, rc, r_rogue; + protected String cluster_name; + protected static final short GMS_ID; + + static { + GMS_ID=ClassConfigurator.getProtocolId(GMS.class); + } + + protected void init(String cluster_name) throws Exception { + this.cluster_name=cluster_name; + a=create("A"); + a.connect(cluster_name); + a.setReceiver(ra=new MyReceiver<Message>().name("A").rawMsgs(true)); + + b=create("B"); + b.connect(cluster_name); + b.setReceiver(rb=new MyReceiver<Message>().name("B").rawMsgs(true)); + + c=create("C"); + c.connect(cluster_name); + c.setReceiver(rc=new MyReceiver<Message>().name("C").rawMsgs(true)); + + Util.waitUntilAllChannelsHaveSameSize(10000, 500, a,b,c); + rogue=createRogue("rogue"); + rogue.connect(cluster_name); + for(JChannel ch: asList(a,b,c)) + System.out.printf("%s: %s\n", ch.getAddress(), ch.getView()); + System.out.println(""); + } + + @Test(enabled=false) protected void destroy() { + Util.close(c, b, a, rogue); + } + + protected abstract JChannel create(String name) throws Exception; + + + + /** Tests A,B or C sending messages and their reception by everyone in cluster {A,B,C} */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + protected void testRegularMessageReception() throws Exception { + a.send(null, "Hello from A"); + b.send(null, "Hello from B"); + c.send(null, "Hello from C"); + for(int i=0; i < 20; i++) { + if(ra.size() == 3 && rb.size() == 3 && rc.size() == 3) + break; + stable(a,b,c); + Util.sleep(1000); + } + for(MyReceiver r: asList(ra,rb,rc)) + System.out.printf("%s: %s\n", r.name(), print(r.list())); + assertSize(3); + } + + /** Same as above, but all messages are 0-length */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testRegularMessageReceptionWithEmptyMessages() throws Exception { + a.send(new Message(null)); + b.send(new Message(null)); + c.send(new Message(null)); + for(int i=0; i < 20; i++) { + if(ra.size() == 3 && rb.size() == 3 && rc.size() == 3) + break; + stable(a,b,c); + Util.sleep(100); + } + for(MyReceiver r: asList(ra,rb,rc)) + System.out.printf("%s: %s\n", r.name(), print(r.list())); + assertSize(3); + } + + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testChecksum() throws Exception { + EncryptBase encrypt=(EncryptBase)a.getProtocolStack().findProtocol(EncryptBase.class); + + byte[] buffer="Hello world".getBytes(); + long checksum=encrypt.computeChecksum(buffer, 0, buffer.length); + byte[] checksum_array=encrypt.encryptChecksum(checksum); + + long actual_checksum=encrypt.decryptChecksum(null, checksum_array, 0, checksum_array.length); + assert checksum == actual_checksum : String.format("checksum: %d, actual: %d", checksum, actual_checksum); + } + + + /** A rogue member should not be able to join a cluster */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testRogueMemberJoin() throws Exception { + Util.close(rogue); + rogue=new JChannel(Util.getTestStack()).name("rogue"); + rogue.getProtocolStack().removeProtocol(EncryptBase.class); + GMS gms=(GMS)rogue.getProtocolStack().findProtocol(GMS.class); + gms.setMaxJoinAttempts(1); + rogue.connect(cluster_name); + for(int i=0; i < 10; i++) { + if(a.getView().size() > 3) + break; + Util.sleep(500); + } + for(JChannel ch: asList(a,b,c)) + System.out.printf("%s: view is %s\n", ch.getAddress(), ch.getView()); + for(JChannel ch: asList(a,b,c)) { + View view=ch.getView(); + assert view.size() == 3 : "view should be {A,B,C}: " + view; + } + } + + + /** Test that A,B,C do NOT receive any message sent by a rogue node which is not member of {A,B,C} */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testMessageSendingByRogue() throws Exception { + rogue.send(null, "message from rogue"); // tests single messages + Util.sleep(500); + for(int i=1; i <= 100; i++) // tests message batches + rogue.send(null, "msg #" + i + " from rogue"); + + for(int i=0; i < 10; i++) { + if(ra.size() > 0 || rb.size() > 0 || rc.size() > 0) + break; + Util.sleep(500); + } + assert ra.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(ra.list())); + assert rb.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rb.list())); + assert rc.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rc.list())); + } + + + /** + * R sends a message that has an encryption header and is encrypted with R's secret key (which of course is different + * from the cluster members' shared key as R doesn't know it). The cluster members should drop R's message as they + * shouldn't be able to decrypt it. + */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testMessageSendingByRogueUsingEncryption() throws Exception { + SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName("/tmp/ignored.keystore"); + encrypt.encryptEntireMessage(true).signMessages(true); + + SecretKey secret_key=KeyStoreGenerator.createSecretKey(); + Field secretKey=Util.getField(SYM_ENCRYPT.class, "secret_key"); + secretKey.setAccessible(true); + Util.setField(secretKey, encrypt, secret_key); + encrypt.init(); + + short encrypt_id=ClassConfigurator.getProtocolId(SYM_ENCRYPT.class); + EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, encrypt.symVersion()); + Message msg=new Message(null).putHeader(encrypt_id, hdr); + + byte[] buf="hello from rogue".getBytes(); + byte[] encrypted_buf=encrypt.code(buf, 0, buf.length, false); + msg.setBuffer(encrypted_buf); + long checksum=encrypt.computeChecksum(encrypted_buf, 0, encrypted_buf.length); + byte[] tmp=encrypt.encryptChecksum(checksum); + hdr.signature(tmp); + + rogue.send(msg); + + for(int i=0; i < 10; i++) { + if(ra.size() > 0 || rb.size() > 0 || rc.size() > 0) + break; + Util.sleep(500); + } + assert ra.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(ra.list())); + assert rb.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rb.list())); + assert rc.size() == 0 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rc.list())); + } + + + /** + * Tests that the non-member does NOT receive messages from cluster {A,B,C}. The de-serialization of a message's + * payload (encrypted with the secret key of the rogue non-member) will fail, so the message is never passed up + * to the application. + */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testMessageReceptionByRogue() throws Exception { + rogue.setReceiver(r_rogue=new MyReceiver().rawMsgs(true)); + a.setReceiver(null); b.setReceiver(null); c.setReceiver(null); + a.send(null, "Hello from A"); + b.send(null, "Hello from B"); + c.send(null, "Hello from C"); + for(int i=0; i < 10; i++) { + // retransmissions will add dupes to rogue as it doesn't have dupe elimination, so we could have more than + // 3 messages! + if(r_rogue.size() > 0) + break; + Util.sleep(500); + } + + // the non-member may have received some cluster messages, if the encrypted messages coincidentally didn't + // cause a deserialization exception, but it will not be able to read their contents: + if(r_rogue.size() > 0) { + System.out.printf("Rogue non-member received %d message(s), but it should not be able to read deserialize " + + "the contents (this should throw exceptions below):\n", r_rogue.size()); + for(Message msg: r_rogue.list()) { + try { + String payload=(String)msg.getObject(); + assert !payload.startsWith("Hello from"); + } + catch(Exception t) { + System.out.printf("caught exception trying to de-serialize garbage payload into a string: %s\n", t); + } + }; + } + } + + + /** + * Tests the scenario where the non-member R captures a message from some cluster member in {A,B,C}, then + * increments the NAKACK2 seqno and resends that message. The message must not be received by {A,B,C}; + * it should be discarded. + */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testCapturingOfMessageByNonMemberAndResending() throws Exception { + rogue.setReceiver(new ReceiverAdapter() { + public void receive(Message msg) { + System.out.printf("rogue: modifying and resending msg %s, hdrs: %s\n", msg, msg.printHeaders()); + rogue.setReceiver(null); // to prevent recursive cycle + try { + short prot_id=ClassConfigurator.getProtocolId(NAKACK2.class); + NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(prot_id); + if(hdr != null) { + long seqno=hdr.getSeqno(); + Util.setField(Util.getField(NakAckHeader2.class, "seqno"), hdr, seqno+1); + } + else { + System.out.printf("Rogue was not able to get the %s header, fabricating one with seqno=50\n", NAKACK2.class.getSimpleName()); + NakAckHeader2 hdr2=NakAckHeader2.createMessageHeader(50); + msg.putHeader(prot_id, hdr2); + } + + rogue.send(msg); + } + catch(Exception e) { + e.printStackTrace(); + } + } + }); + + a.send(null, "Hello world from A"); + + // everybody in {A,B,C} should receive this message, but NOT the rogue's resent message + for(int i=0; i < 10; i++) { + if(ra.size() > 1 || rb.size() > 1 || rc.size() > 1) + break; // this should NOT happen + Util.sleep(500); + } + + for(MyReceiver r: asList(ra,rb,rc)) + System.out.printf("%s: %s\n", r.name(), print(r.list())); + assert ra.size() == 1 : String.format("received msgs from non-member: '%s'; this should not be the case", print(ra.list())); + assert rb.size() == 1 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rb.list())); + assert rc.size() == 1 : String.format("received msgs from non-member: '%s'; this should not be the case", print(rc.list())); + } + + + + /** + * Tests the case where a non-member installs a new view {rogue,A,B,C}, making itself the coordinator and therefore + * controlling admission of new members to the cluster etc... + */ + // @Test(groups=Global.FUNCTIONAL,singleThreaded=true) + public void testRogueViewInstallation() throws Exception { + final Address rogue_addr=rogue.getAddress(); + View rogue_view=View.create(rogue_addr, a.getView().getViewId().getId()+1, + rogue_addr, a.getAddress(), b.getAddress(), c.getAddress()); + + Message view_change_msg=new Message().putHeader(GMS_ID, new GMS.GmsHeader(GMS.GmsHeader.VIEW)) + .setBuffer(marshal(rogue_view)); + rogue.send(view_change_msg); + + for(int i=0; i < 10; i++) { + if(a.getView().size() > 3) + break; + Util.sleep(500); + } + for(JChannel ch: asList(a,b,c)) { + View view=ch.getView(); + System.out.printf("%s: view is %s\n", ch.getAddress(), view); + assert !view.containsMember(rogue_addr) : "view contains rogue member: " + view; + }; + } + + + protected static JChannel createRogue(String name) throws Exception { + return new JChannel(new SHARED_LOOPBACK()).name(name); + } + + + protected static Buffer marshal(final View view) throws Exception { + ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(512); + out.writeShort(1); + if(view != null) + view.writeTo(out); + return out.getBuffer(); + } + + protected void assertSize(int expected_size) { + for(MyReceiver r: asList(ra,rb,rc)) + assert r.size() == expected_size : String.format("expected size: %d, actual size of %s: %d", expected_size, r.name(), r.size()); + } + + protected static String print(List<Message> msgs) { + StringBuilder sb=new StringBuilder(); + for(Message msg: msgs) + sb.append(msg.getObject()).append(" "); + return sb.toString(); + } + + protected static String print(byte[] buf, int offset, int length) { + StringBuilder sb=new StringBuilder("encrypted string: "); + for(int i=0; i < length; i++) { + int ch=buf[offset+i]; + sb.append(ch).append(' '); + } + return sb.toString(); + } + + + protected static void stable(JChannel ... channels) { + for(JChannel ch: channels) { + STABLE stable=(STABLE)ch.getProtocolStack().findProtocol(STABLE.class); + stable.gc(); + } + } + + +}
tests/junit-functional/org/jgroups/protocols/SYM_ENCRYPT_Test.java+94 −0 added@@ -0,0 +1,94 @@ +package org.jgroups.protocols; + +import org.jgroups.Global; +import org.jgroups.JChannel; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.stack.ProtocolStack; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests use cases for {@link SYM_ENCRYPT} described in https://issues.jboss.org/browse/JGRP-2021. + * Make sure you create the keystore before running this test (ant make-keystore). + * @author Bela Ban + * @since 4.0 + */ +@Test(groups=Global.FUNCTIONAL,singleThreaded=true) +public class SYM_ENCRYPT_Test extends EncryptTest { + protected static final String DEF_PWD="changeit"; + + @BeforeMethod protected void init() throws Exception { + super.init(getClass().getSimpleName()); + } + + @AfterMethod protected void destroy() { + super.destroy(); + } + + + /** Calling methods in superclass. Kludge because TestNG doesn't call methods in superclass correctly **/ + public void testRegularMessageReception() throws Exception { + super.testRegularMessageReception(); + } + + public void testRegularMessageReceptionWithEmptyMessages() throws Exception { + super.testRegularMessageReceptionWithEmptyMessages(); + } + + public void testChecksum() throws Exception { + super.testChecksum(); + } + + public void testRogueMemberJoin() throws Exception { + super.testRogueMemberJoin(); + } + + public void testMessageSendingByRogue() throws Exception { + super.testMessageSendingByRogue(); + } + + public void testMessageSendingByRogueUsingEncryption() throws Exception { + super.testMessageSendingByRogueUsingEncryption(); + } + + public void testMessageReceptionByRogue() throws Exception { + super.testMessageReceptionByRogue(); + } + + public void testCapturingOfMessageByNonMemberAndResending() throws Exception { + super.testCapturingOfMessageByNonMemberAndResending(); + } + + public void testRogueViewInstallation() throws Exception { + super.testRogueViewInstallation(); + } + + + + + protected JChannel create(String name) throws Exception { + JChannel ch=new JChannel(Util.getTestStack()).name(name); + SYM_ENCRYPT encrypt; + try { + encrypt=createENCRYPT("keystore/defaultStore.keystore", DEF_PWD); + } + catch(Throwable t) { + encrypt=createENCRYPT("defaultStore.keystore", DEF_PWD); + } + ch.getProtocolStack().insertProtocol(encrypt, ProtocolStack.BELOW, NAKACK2.class); + return ch; + } + + + // Note that setting encrypt_entire_message to true is critical here, or else some of the tests in this + // unit test would fail! + protected SYM_ENCRYPT createENCRYPT(String keystore_name, String store_pwd) throws Exception { + SYM_ENCRYPT encrypt=new SYM_ENCRYPT().keystoreName(keystore_name).alias("myKey") + .storePassword(store_pwd).encryptEntireMessage(true).signMessages(true); + encrypt.init(); + return encrypt; + } + +}
tests/junit-functional/org/jgroups/tests/SizeTest.java+4 −1 modified@@ -619,7 +619,10 @@ public static void testEncryptHeader() throws Exception { ENCRYPT.EncryptHeader hdr=new ENCRYPT.EncryptHeader((short)1, null); _testSize(hdr); hdr=new ENCRYPT.EncryptHeader((short)2, "Hello world"); - _testSize(hdr); + EncryptHeader hdr2=new EncryptHeader((byte)1, new byte[]{'b','e', 'l', 'a'}); + _testSize(hdr2); + hdr2=new EncryptHeader((byte)2, "Hello world".getBytes()); + _testSize(hdr2); }
tests/junit/org/jgroups/protocols/S3_PINGTest.java+4 −4 modified@@ -1,15 +1,15 @@ package org.jgroups.protocols; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - import org.jgroups.Global; import org.jgroups.protocols.S3_PING.Utils; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + @Test(groups={Global.STACK_INDEPENDENT}) public class S3_PINGTest { private S3_PING ping;
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
34- www.oracle.com/technetwork/security-advisory/cpuapr2019-5072813.htmlnvdPatchThird Party AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1435.htmlnvdVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1439.htmlnvdVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-2035.htmlnvdVendor AdvisoryWEB
- www.securitytracker.com/id/1036165nvdBroken LinkThird Party AdvisoryVDB Entry
- access.redhat.com/errata/RHSA-2016:1345nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1346nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1347nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1374nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1376nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1389nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1432nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1433nvdVendor AdvisoryWEB
- access.redhat.com/errata/RHSA-2016:1434nvdVendor AdvisoryWEB
- github.com/advisories/GHSA-rc7h-x6cq-988qghsaADVISORY
- issues.jboss.org/browse/JGRP-2021nvdIssue TrackingVendor AdvisoryWEB
- nvd.nist.gov/vuln/detail/CVE-2016-2141ghsaADVISORY
- rhn.redhat.com/errata/RHSA-2016-1328.htmlnvdVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1329.htmlnvdBroken LinkVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1330.htmlnvdVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1331.htmlnvdBroken LinkVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1332.htmlnvdVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1333.htmlnvdBroken LinkVendor AdvisoryWEB
- rhn.redhat.com/errata/RHSA-2016-1334.htmlnvdVendor AdvisoryWEB
- www.securityfocus.com/bid/91481nvdVDB Entry
- github.com/belaban/JGroups/commit/eeaf5241cce464ef21a2dfc4938729ade9ebef36ghsaWEB
- issues.redhat.com/browse/JGRP-2055ghsaWEB
- issues.redhat.com/browse/JGRP-2074ghsaWEB
- lists.apache.org/thread.html/ra18cac97416abc2958db0b107877c31da28d884fa6e70fd89c87384a@%3Cdev.geode.apache.org%3EghsaWEB
- lists.apache.org/thread.html/rb37cc937d4fc026fb56de4b4ec0d054aa4083c1a4edd0d8360c068a0@%3Cdev.geode.apache.org%3EghsaWEB
- web.archive.org/web/20161013163606/http://www.securityfocus.com/bid/91481ghsaWEB
- web.archive.org/web/20201207092245/http://www.securitytracker.com/id/1036165ghsaWEB
- lists.apache.org/thread.html/ra18cac97416abc2958db0b107877c31da28d884fa6e70fd89c87384a%40%3Cdev.geode.apache.org%3Envd
- lists.apache.org/thread.html/rb37cc937d4fc026fb56de4b4ec0d054aa4083c1a4edd0d8360c068a0%40%3Cdev.geode.apache.org%3Envd
News mentions
0No linked articles in our index yet.