001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicLong;
021
022import org.apache.activemq.Service;
023import org.apache.activemq.command.ActiveMQQueue;
024import org.apache.activemq.command.ActiveMQTopic;
025import org.apache.activemq.command.BrokerId;
026import org.apache.activemq.command.BrokerInfo;
027import org.apache.activemq.command.Command;
028import org.apache.activemq.command.ConnectionId;
029import org.apache.activemq.command.ConnectionInfo;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.command.ExceptionResponse;
032import org.apache.activemq.command.Message;
033import org.apache.activemq.command.MessageAck;
034import org.apache.activemq.command.MessageDispatch;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.command.Response;
037import org.apache.activemq.command.SessionInfo;
038import org.apache.activemq.command.ShutdownInfo;
039import org.apache.activemq.transport.DefaultTransportListener;
040import org.apache.activemq.transport.FutureResponse;
041import org.apache.activemq.transport.ResponseCallback;
042import org.apache.activemq.transport.Transport;
043import org.apache.activemq.util.IdGenerator;
044import org.apache.activemq.util.ServiceStopper;
045import org.apache.activemq.util.ServiceSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Forwards all messages from the local broker to the remote broker.
051 *
052 * @org.apache.xbean.XBean
053 *
054 */
055public class ForwardingBridge implements Service {
056
057    private static final IdGenerator ID_GENERATOR = new IdGenerator();
058    private static final Logger LOG = LoggerFactory.getLogger(ForwardingBridge.class);
059
060    final AtomicLong enqueueCounter = new AtomicLong();
061    final AtomicLong dequeueCounter = new AtomicLong();
062    ConnectionInfo connectionInfo;
063    SessionInfo sessionInfo;
064    ProducerInfo producerInfo;
065    ConsumerInfo queueConsumerInfo;
066    ConsumerInfo topicConsumerInfo;
067    BrokerId localBrokerId;
068    BrokerId remoteBrokerId;
069    BrokerInfo localBrokerInfo;
070    BrokerInfo remoteBrokerInfo;
071
072    private final Transport localBroker;
073    private final Transport remoteBroker;
074    private String clientId;
075    private int prefetchSize = 1000;
076    private boolean dispatchAsync;
077    private String destinationFilter = ">";
078    private NetworkBridgeListener bridgeFailedListener;
079    private boolean useCompression = false;
080
081    public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
082        this.localBroker = localBroker;
083        this.remoteBroker = remoteBroker;
084    }
085
086    public void start() throws Exception {
087        LOG.info("Starting a network connection between {} and {} has been established.", localBroker, remoteBroker);
088
089        localBroker.setTransportListener(new DefaultTransportListener() {
090            public void onCommand(Object o) {
091                Command command = (Command)o;
092                serviceLocalCommand(command);
093            }
094
095            public void onException(IOException error) {
096                serviceLocalException(error);
097            }
098        });
099
100        remoteBroker.setTransportListener(new DefaultTransportListener() {
101            public void onCommand(Object o) {
102                Command command = (Command)o;
103                serviceRemoteCommand(command);
104            }
105
106            public void onException(IOException error) {
107                serviceRemoteException(error);
108            }
109        });
110
111        localBroker.start();
112        remoteBroker.start();
113    }
114
115    protected void triggerStartBridge() throws IOException {
116        Thread thead = new Thread() {
117            public void run() {
118                try {
119                    startBridge();
120                } catch (IOException e) {
121                    LOG.error("Failed to start network bridge: ", e);
122                }
123            }
124        };
125        thead.start();
126    }
127
128    /**
129     * @throws IOException
130     */
131    final void startBridge() throws IOException {
132        connectionInfo = new ConnectionInfo();
133        connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
134        connectionInfo.setClientId(clientId);
135        localBroker.oneway(connectionInfo);
136        remoteBroker.oneway(connectionInfo);
137
138        sessionInfo = new SessionInfo(connectionInfo, 1);
139        localBroker.oneway(sessionInfo);
140        remoteBroker.oneway(sessionInfo);
141
142        queueConsumerInfo = new ConsumerInfo(sessionInfo, 1);
143        queueConsumerInfo.setDispatchAsync(dispatchAsync);
144        queueConsumerInfo.setDestination(new ActiveMQQueue(destinationFilter));
145        queueConsumerInfo.setPrefetchSize(prefetchSize);
146        queueConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
147        localBroker.oneway(queueConsumerInfo);
148
149        producerInfo = new ProducerInfo(sessionInfo, 1);
150        producerInfo.setResponseRequired(false);
151        remoteBroker.oneway(producerInfo);
152
153        if (connectionInfo.getClientId() != null) {
154            topicConsumerInfo = new ConsumerInfo(sessionInfo, 2);
155            topicConsumerInfo.setDispatchAsync(dispatchAsync);
156            topicConsumerInfo.setSubscriptionName("topic-bridge");
157            topicConsumerInfo.setRetroactive(true);
158            topicConsumerInfo.setDestination(new ActiveMQTopic(destinationFilter));
159            topicConsumerInfo.setPrefetchSize(prefetchSize);
160            topicConsumerInfo.setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
161            localBroker.oneway(topicConsumerInfo);
162        }
163
164        LOG.info("Network connection between {} and {} has been established.", localBroker, remoteBroker);
165    }
166
167    public void stop() throws Exception {
168        try {
169            if (connectionInfo != null) {
170                localBroker.request(connectionInfo.createRemoveCommand());
171                remoteBroker.request(connectionInfo.createRemoveCommand());
172            }
173            localBroker.setTransportListener(null);
174            remoteBroker.setTransportListener(null);
175            localBroker.oneway(new ShutdownInfo());
176            remoteBroker.oneway(new ShutdownInfo());
177        } finally {
178            ServiceStopper ss = new ServiceStopper();
179            ss.stop(localBroker);
180            ss.stop(remoteBroker);
181            ss.throwFirstException();
182        }
183    }
184
185    public void serviceRemoteException(Throwable error) {
186        LOG.info("Unexpected remote exception: {}", error.getMessage());
187        LOG.debug("Exception trace: ", error);
188    }
189
190    protected void serviceRemoteCommand(Command command) {
191        try {
192            if (command.isBrokerInfo()) {
193                synchronized (this) {
194                    remoteBrokerInfo = (BrokerInfo)command;
195                    remoteBrokerId = remoteBrokerInfo.getBrokerId();
196                    if (localBrokerId != null) {
197                        if (localBrokerId.equals(remoteBrokerId)) {
198                            LOG.info("Disconnecting loop back connection.");
199                            ServiceSupport.dispose(this);
200                        } else {
201                            triggerStartBridge();
202                        }
203                    }
204                }
205            } else {
206                LOG.warn("Unexpected remote command: {}", command);
207            }
208        } catch (IOException e) {
209            serviceLocalException(e);
210        }
211    }
212
213    public void serviceLocalException(Throwable error) {
214        LOG.info("Unexpected local exception: {}", error.getMessage());
215        LOG.debug("Exception trace: ", error);
216        fireBridgeFailed();
217    }
218
219    protected void serviceLocalCommand(Command command) {
220        try {
221            if (command.isMessageDispatch()) {
222
223                enqueueCounter.incrementAndGet();
224
225                final MessageDispatch md = (MessageDispatch)command;
226                Message message = md.getMessage();
227                message.setProducerId(producerInfo.getProducerId());
228
229                if (message.getOriginalTransactionId() == null) {
230                    message.setOriginalTransactionId(message.getTransactionId());
231                }
232                message.setTransactionId(null);
233
234                if (isUseCompression()) {
235                    message.compress();
236                }
237
238                if (!message.isResponseRequired()) {
239                    // If the message was originally sent using async send, we will preserve that
240                    // QOS by bridging it using an async send (small chance of message loss).
241                    remoteBroker.oneway(message);
242                    dequeueCounter.incrementAndGet();
243                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
244
245                } else {
246
247                    // The message was not sent using async send, so we should
248                    // only ack the local
249                    // broker when we get confirmation that the remote broker
250                    // has received the message.
251                    ResponseCallback callback = new ResponseCallback() {
252                        public void onCompletion(FutureResponse future) {
253                            try {
254                                Response response = future.getResult();
255                                if (response.isException()) {
256                                    ExceptionResponse er = (ExceptionResponse)response;
257                                    serviceLocalException(er.getException());
258                                } else {
259                                    dequeueCounter.incrementAndGet();
260                                    localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
261                                }
262                            } catch (IOException e) {
263                                serviceLocalException(e);
264                            }
265                        }
266                    };
267
268                    remoteBroker.asyncRequest(message, callback);
269                }
270
271                // Ack on every message since we don't know if the broker is
272                // blocked due to memory
273                // usage and is waiting for an Ack to un-block him.
274
275                // Acking a range is more efficient, but also more prone to
276                // locking up a server
277                // Perhaps doing something like the following should be policy
278                // based.
279                // if(
280                // md.getConsumerId().equals(queueConsumerInfo.getConsumerId())
281                // ) {
282                // queueDispatched++;
283                // if( queueDispatched > (queueConsumerInfo.getPrefetchSize()/2)
284                // ) {
285                // localBroker.oneway(new MessageAck(md,
286                // MessageAck.STANDARD_ACK_TYPE, queueDispatched));
287                // queueDispatched=0;
288                // }
289                // } else {
290                // topicDispatched++;
291                // if( topicDispatched > (topicConsumerInfo.getPrefetchSize()/2)
292                // ) {
293                // localBroker.oneway(new MessageAck(md,
294                // MessageAck.STANDARD_ACK_TYPE, topicDispatched));
295                // topicDispatched=0;
296                // }
297                // }
298            } else if (command.isBrokerInfo()) {
299                synchronized (this) {
300                    localBrokerInfo = (BrokerInfo)command;
301                    localBrokerId = localBrokerInfo.getBrokerId();
302                    if (remoteBrokerId != null) {
303                        if (remoteBrokerId.equals(localBrokerId)) {
304                            LOG.info("Disconnecting loop back connection.");
305                            ServiceSupport.dispose(this);
306                        } else {
307                            triggerStartBridge();
308                        }
309                    }
310                }
311            } else {
312                LOG.debug("Unexpected local command: {}", command);
313            }
314        } catch (IOException e) {
315            serviceLocalException(e);
316        }
317    }
318
319    public String getClientId() {
320        return clientId;
321    }
322
323    public void setClientId(String clientId) {
324        this.clientId = clientId;
325    }
326
327    public int getPrefetchSize() {
328        return prefetchSize;
329    }
330
331    public void setPrefetchSize(int prefetchSize) {
332        this.prefetchSize = prefetchSize;
333    }
334
335    public boolean isDispatchAsync() {
336        return dispatchAsync;
337    }
338
339    public void setDispatchAsync(boolean dispatchAsync) {
340        this.dispatchAsync = dispatchAsync;
341    }
342
343    public String getDestinationFilter() {
344        return destinationFilter;
345    }
346
347    public void setDestinationFilter(String destinationFilter) {
348        this.destinationFilter = destinationFilter;
349    }
350
351    public void setNetworkBridgeFailedListener(NetworkBridgeListener listener) {
352        this.bridgeFailedListener = listener;
353    }
354
355    private void fireBridgeFailed() {
356        NetworkBridgeListener l = this.bridgeFailedListener;
357        if (l != null) {
358            l.bridgeFailed();
359        }
360    }
361
362    public String getRemoteAddress() {
363        return remoteBroker.getRemoteAddress();
364    }
365
366    public String getLocalAddress() {
367        return localBroker.getRemoteAddress();
368    }
369
370    public String getLocalBrokerName() {
371        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
372    }
373
374    public String getRemoteBrokerName() {
375        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
376    }
377
378    public long getDequeueCounter() {
379        return dequeueCounter.get();
380    }
381
382    public long getEnqueueCounter() {
383        return enqueueCounter.get();
384    }
385
386    /**
387     * @param useCompression
388     *      True if forwarded Messages should have their bodies compressed.
389     */
390    public void setUseCompression(boolean useCompression) {
391        this.useCompression = useCompression;
392    }
393
394    /**
395     * @return the vale of the useCompression setting, true if forwarded messages will be compressed.
396     */
397    public boolean isUseCompression() {
398        return useCompression;
399    }
400}