001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.SslContext; 031import org.apache.activemq.command.DiscoveryEvent; 032import org.apache.activemq.transport.Transport; 033import org.apache.activemq.transport.TransportFactory; 034import org.apache.activemq.transport.discovery.DiscoveryAgent; 035import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 036import org.apache.activemq.transport.discovery.DiscoveryListener; 037import org.apache.activemq.util.IntrospectionSupport; 038import org.apache.activemq.util.ServiceStopper; 039import org.apache.activemq.util.ServiceSupport; 040import org.apache.activemq.util.URISupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * A network connector which uses a discovery agent to detect the remote brokers 046 * available and setup a connection to each available remote broker 047 * 048 * @org.apache.xbean.XBean element="networkConnector" 049 * 050 */ 051public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 052 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); 053 054 private DiscoveryAgent discoveryAgent; 055 private Map<String, String> parameters; 056 private final ConcurrentMap<URI, DiscoveryEvent> activeEvents = new ConcurrentHashMap<URI, DiscoveryEvent>(); 057 private URI discoveryUri; 058 public DiscoveryNetworkConnector() { 059 } 060 061 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 062 setUri(discoveryURI); 063 } 064 065 public void setUri(URI discoveryURI) throws IOException { 066 this.discoveryUri = discoveryURI; 067 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 068 try { 069 parameters = URISupport.parseParameters(discoveryURI); 070 // allow discovery agent to grab it's parameters 071 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); 072 } catch (URISyntaxException e) { 073 LOG.warn("failed to parse query parameters from discoveryURI: {}", discoveryURI, e); 074 } 075 } 076 077 public URI getUri() { 078 return discoveryUri; 079 } 080 081 @Override 082 public void onServiceAdd(DiscoveryEvent event) { 083 // Ignore events once we start stopping. 084 if (serviceSupport.isStopped() || serviceSupport.isStopping()) { 085 return; 086 } 087 String url = event.getServiceName(); 088 if (url != null) { 089 URI uri; 090 try { 091 uri = new URI(url); 092 } catch (URISyntaxException e) { 093 LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e); 094 return; 095 } 096 097 if (localURI.equals(uri)) { 098 LOG.debug("not connecting loopback: {}", uri); 099 return; 100 } 101 102 if (connectionFilter != null && !connectionFilter.connectTo(uri)) { 103 LOG.debug("connectionFilter disallows connection to: {}", uri); 104 return; 105 } 106 107 // Should we try to connect to that URI? 108 if (activeEvents.putIfAbsent(uri, event) != null) { 109 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: {}", uri); 110 return; 111 } 112 113 URI connectUri = uri; 114 try { 115 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); 116 } catch (URISyntaxException e) { 117 LOG.warn("could not apply query parameters: {} to: {}", new Object[]{ parameters, connectUri }, e); 118 } 119 120 LOG.info("Establishing network connection from {} to {}", localURI, connectUri); 121 122 Transport remoteTransport; 123 Transport localTransport; 124 try { 125 // Allows the transport to access the broker's ssl configuration. 126 SslContext.setCurrentSslContext(getBrokerService().getSslContext()); 127 try { 128 remoteTransport = TransportFactory.connect(connectUri); 129 } catch (Exception e) { 130 LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage()); 131 LOG.debug("Connection failure exception: ", e); 132 try { 133 discoveryAgent.serviceFailed(event); 134 } catch (IOException e1) { 135 LOG.debug("Failure while handling create remote transport failure event: {}", e1.getMessage(), e1); 136 } 137 return; 138 } 139 try { 140 localTransport = createLocalTransport(); 141 } catch (Exception e) { 142 ServiceSupport.dispose(remoteTransport); 143 LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage()); 144 LOG.debug("Connection failure exception: ", e); 145 146 try { 147 discoveryAgent.serviceFailed(event); 148 } catch (IOException e1) { 149 LOG.debug("Failure while handling create local transport failure event: {}", e1.getMessage(), e1); 150 } 151 return; 152 } 153 } finally { 154 SslContext.setCurrentSslContext(null); 155 } 156 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 157 try { 158 synchronized (bridges) { 159 bridges.put(uri, bridge); 160 } 161 bridge.start(); 162 } catch (Exception e) { 163 ServiceSupport.dispose(localTransport); 164 ServiceSupport.dispose(remoteTransport); 165 LOG.warn("Could not start network bridge between: {} and: {} due to: {}", new Object[]{ localURI, uri, e.getMessage() }); 166 LOG.debug("Start failure exception: ", e); 167 try { 168 // Will remove bridge and active event. 169 discoveryAgent.serviceFailed(event); 170 } catch (IOException e1) { 171 LOG.debug("Discovery agent failure while handling failure event: {}", e1.getMessage(), e1); 172 } 173 } 174 } 175 } 176 177 @Override 178 public void onServiceRemove(DiscoveryEvent event) { 179 String url = event.getServiceName(); 180 if (url != null) { 181 URI uri; 182 try { 183 uri = new URI(url); 184 } catch (URISyntaxException e) { 185 LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e); 186 return; 187 } 188 189 // Only remove bridge if this is the active discovery event for the URL. 190 if (activeEvents.remove(uri, event)) { 191 synchronized (bridges) { 192 bridges.remove(uri); 193 } 194 } 195 } 196 } 197 198 public DiscoveryAgent getDiscoveryAgent() { 199 return discoveryAgent; 200 } 201 202 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 203 this.discoveryAgent = discoveryAgent; 204 if (discoveryAgent != null) { 205 this.discoveryAgent.setDiscoveryListener(this); 206 } 207 } 208 209 @Override 210 protected void handleStart() throws Exception { 211 if (discoveryAgent == null) { 212 throw new IllegalStateException("You must configure the 'discoveryAgent' property"); 213 } 214 this.discoveryAgent.start(); 215 super.handleStart(); 216 } 217 218 @Override 219 protected void handleStop(ServiceStopper stopper) throws Exception { 220 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) { 221 NetworkBridge bridge = i.next(); 222 try { 223 bridge.stop(); 224 } catch (Exception e) { 225 stopper.onException(this, e); 226 } 227 } 228 bridges.clear(); 229 activeEvents.clear(); 230 try { 231 this.discoveryAgent.stop(); 232 } catch (Exception e) { 233 stopper.onException(this, e); 234 } 235 236 super.handleStop(stopper); 237 } 238 239 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 240 class DiscoverNetworkBridgeListener extends MBeanNetworkListener { 241 242 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { 243 super(brokerService, DiscoveryNetworkConnector.this, connectorName); 244 } 245 246 @Override 247 public void bridgeFailed() { 248 if (!serviceSupport.isStopped()) { 249 try { 250 discoveryAgent.serviceFailed(event); 251 } catch (IOException e) { 252 } 253 } 254 255 } 256 } 257 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName()); 258 259 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); 260 result.setBrokerService(getBrokerService()); 261 return configureBridge(result); 262 } 263 264 @Override 265 public String toString() { 266 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); 267 } 268}