001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.proxy;
018
019import org.apache.activemq.Service;
020import org.apache.activemq.transport.CompositeTransport;
021import org.apache.activemq.transport.Transport;
022import org.apache.activemq.transport.TransportAcceptListener;
023import org.apache.activemq.transport.TransportFactory;
024import org.apache.activemq.transport.TransportFilter;
025import org.apache.activemq.transport.TransportServer;
026import org.apache.activemq.util.ServiceStopper;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import java.io.IOException;
031import java.net.URI;
032import java.net.URISyntaxException;
033import java.util.Iterator;
034import java.util.concurrent.CopyOnWriteArrayList;
035
036/**
037 * @org.apache.xbean.XBean
038 */
039public class ProxyConnector implements Service {
040
041    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnector.class);
042    private TransportServer server;
043    private URI bind;
044    private URI remote;
045    private URI localUri;
046    private String name;
047
048    /**
049     * Should we proxy commands to the local broker using VM transport as well?
050     */
051    private boolean proxyToLocalBroker = true;
052
053    private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
054
055    @Override
056    public void start() throws Exception {
057
058        this.getServer().setAcceptListener(new TransportAcceptListener() {
059            @Override
060            public void onAccept(Transport localTransport) {
061                ProxyConnection connection = null;
062                try {
063                    Transport remoteTransport = createRemoteTransport(localTransport);
064                    connection = new ProxyConnection(localTransport, remoteTransport);
065                    connection.start();
066                    connections.add(connection);
067                } catch (Exception e) {
068                    onAcceptError(e);
069                    try {
070                        if (connection != null) {
071                            connection.stop();
072                        }
073                    } catch (Exception eoc) {
074                        LOG.error("Could not close broken connection: ", eoc);
075                    }
076                }
077            }
078
079            @Override
080            public void onAcceptError(Exception error) {
081                LOG.error("Could not accept connection: ", error);
082            }
083        });
084        getServer().start();
085        LOG.info("Proxy Connector {} started", getName());
086    }
087
088    @Override
089    public void stop() throws Exception {
090        ServiceStopper ss = new ServiceStopper();
091        if (this.server != null) {
092            ss.stop(this.server);
093        }
094
095        for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
096            LOG.info("Connector stopped: Stopping proxy.");
097            ss.stop(iter.next());
098        }
099        connections.clear();
100        ss.throwFirstException();
101        LOG.info("Proxy Connector {} stopped", getName());
102    }
103
104    // Properties
105    // -------------------------------------------------------------------------
106
107    public URI getLocalUri() {
108        return localUri;
109    }
110
111    public void setLocalUri(URI localURI) {
112        this.localUri = localURI;
113    }
114
115    public URI getBind() {
116        return bind;
117    }
118
119    public void setBind(URI bind) {
120        this.bind = bind;
121    }
122
123    public URI getRemote() {
124        return remote;
125    }
126
127    public void setRemote(URI remote) {
128        this.remote = remote;
129    }
130
131    public TransportServer getServer() throws IOException, URISyntaxException {
132        if (server == null) {
133            server = createServer();
134        }
135        return server;
136    }
137
138    public void setServer(TransportServer server) {
139        this.server = server;
140    }
141
142    protected TransportServer createServer() throws IOException, URISyntaxException {
143        if (bind == null) {
144            throw new IllegalArgumentException("You must specify either a server or the bind property");
145        }
146        return TransportFactory.bind(bind);
147    }
148
149    private Transport createRemoteTransport(final Transport local) throws Exception {
150        Transport transport = TransportFactory.compositeConnect(remote);
151        CompositeTransport ct = transport.narrow(CompositeTransport.class);
152        if (ct != null && localUri != null && proxyToLocalBroker) {
153            ct.add(false, new URI[] { localUri });
154        }
155
156        // Add a transport filter so that we can track the transport life cycle
157        transport = new TransportFilter(transport) {
158            @Override
159            public void stop() throws Exception {
160                LOG.info("Stopping proxy.");
161                super.stop();
162                ProxyConnection dummy = new ProxyConnection(local, this);
163                LOG.debug("Removing proxyConnection {}", dummy.toString());
164                connections.remove(dummy);
165            }
166        };
167        return transport;
168    }
169
170    public String getName() {
171        if (name == null) {
172            if (server != null) {
173                name = server.getConnectURI().toString();
174            } else {
175                name = "proxy";
176            }
177        }
178        return name;
179    }
180
181    public void setName(String name) {
182        this.name = name;
183    }
184
185    public boolean isProxyToLocalBroker() {
186        return proxyToLocalBroker;
187    }
188
189    public void setProxyToLocalBroker(boolean proxyToLocalBroker) {
190        this.proxyToLocalBroker = proxyToLocalBroker;
191    }
192
193    protected Integer getConnectionCount() {
194        return connections.size();
195    }
196}