001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedHashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026 027import javax.jms.JMSException; 028import javax.transaction.xa.XAException; 029 030import org.apache.activemq.ActiveMQMessageAudit; 031import org.apache.activemq.broker.jmx.ManagedRegionBroker; 032import org.apache.activemq.broker.region.Destination; 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.BaseCommand; 035import org.apache.activemq.command.ConnectionInfo; 036import org.apache.activemq.command.LocalTransactionId; 037import org.apache.activemq.command.Message; 038import org.apache.activemq.command.MessageAck; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.state.ProducerState; 043import org.apache.activemq.store.TransactionRecoveryListener; 044import org.apache.activemq.store.TransactionStore; 045import org.apache.activemq.transaction.LocalTransaction; 046import org.apache.activemq.transaction.Synchronization; 047import org.apache.activemq.transaction.Transaction; 048import org.apache.activemq.transaction.XATransaction; 049import org.apache.activemq.util.IOExceptionSupport; 050import org.apache.activemq.util.WrappedException; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * This broker filter handles the transaction related operations in the Broker 056 * interface. 057 * 058 * 059 */ 060public class TransactionBroker extends BrokerFilter { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class); 063 064 // The prepared XA transactions. 065 private TransactionStore transactionStore; 066 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>(); 067 068 public TransactionBroker(Broker next, TransactionStore transactionStore) { 069 super(next); 070 this.transactionStore = transactionStore; 071 } 072 073 // //////////////////////////////////////////////////////////////////////////// 074 // 075 // Life cycle Methods 076 // 077 // //////////////////////////////////////////////////////////////////////////// 078 079 /** 080 * Recovers any prepared transactions. 081 */ 082 public void start() throws Exception { 083 transactionStore.start(); 084 try { 085 final ConnectionContext context = new ConnectionContext(); 086 context.setBroker(this); 087 context.setInRecoveryMode(true); 088 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 089 context.setProducerFlowControl(false); 090 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 091 producerExchange.setMutable(true); 092 producerExchange.setConnectionContext(context); 093 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 094 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange(); 095 consumerExchange.setConnectionContext(context); 096 transactionStore.recover(new TransactionRecoveryListener() { 097 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { 098 try { 099 beginTransaction(context, xid); 100 XATransaction transaction = (XATransaction) getTransaction(context, xid, false); 101 for (int i = 0; i < addedMessages.length; i++) { 102 forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]); 103 } 104 for (int i = 0; i < aks.length; i++) { 105 forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]); 106 } 107 transaction.setState(Transaction.PREPARED_STATE); 108 registerMBean(transaction); 109 LOG.debug("recovered prepared transaction: {}", transaction.getTransactionId()); 110 } catch (Throwable e) { 111 throw new WrappedException(e); 112 } 113 } 114 }); 115 } catch (WrappedException e) { 116 Throwable cause = e.getCause(); 117 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause); 118 } 119 next.start(); 120 } 121 122 private void registerMBean(XATransaction transaction) { 123 if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) { 124 ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker(); 125 managedRegionBroker.registerRecoveredTransactionMBean(transaction); 126 } 127 } 128 129 private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction, 130 ActiveMQDestination amqDestination, BaseCommand ack) throws Exception { 131 Destination destination = addDestination(context, amqDestination, false); 132 registerSync(destination, transaction, ack); 133 } 134 135 private void registerSync(Destination destination, Transaction transaction, BaseCommand command) { 136 Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage()); 137 // ensure one per destination in the list 138 Synchronization existing = transaction.findMatching(sync); 139 if (existing != null) { 140 ((PreparedDestinationCompletion)existing).incrementOpCount(); 141 } else { 142 transaction.addSynchronization(sync); 143 } 144 } 145 146 static class PreparedDestinationCompletion extends Synchronization { 147 final Destination destination; 148 final boolean messageSend; 149 int opCount = 1; 150 public PreparedDestinationCompletion(final Destination destination, boolean messageSend) { 151 this.destination = destination; 152 // rollback relevant to acks, commit to sends 153 this.messageSend = messageSend; 154 } 155 156 public void incrementOpCount() { 157 opCount++; 158 } 159 160 @Override 161 public int hashCode() { 162 return System.identityHashCode(destination) + 163 System.identityHashCode(Boolean.valueOf(messageSend)); 164 } 165 166 @Override 167 public boolean equals(Object other) { 168 return other instanceof PreparedDestinationCompletion && 169 destination.equals(((PreparedDestinationCompletion) other).destination) && 170 messageSend == ((PreparedDestinationCompletion) other).messageSend; 171 } 172 173 @Override 174 public void afterRollback() throws Exception { 175 if (!messageSend) { 176 destination.clearPendingMessages(); 177 LOG.debug("cleared pending from afterRollback: {}", destination); 178 } 179 } 180 181 @Override 182 public void afterCommit() throws Exception { 183 if (messageSend) { 184 destination.clearPendingMessages(); 185 destination.getDestinationStatistics().getEnqueues().add(opCount); 186 destination.getDestinationStatistics().getMessages().add(opCount); 187 LOG.debug("cleared pending from afterCommit: {}", destination); 188 } else { 189 destination.getDestinationStatistics().getDequeues().add(opCount); 190 destination.getDestinationStatistics().getMessages().subtract(opCount); 191 } 192 } 193 } 194 195 public void stop() throws Exception { 196 transactionStore.stop(); 197 next.stop(); 198 } 199 200 // //////////////////////////////////////////////////////////////////////////// 201 // 202 // BrokerFilter overrides 203 // 204 // //////////////////////////////////////////////////////////////////////////// 205 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 206 List<TransactionId> txs = new ArrayList<TransactionId>(); 207 synchronized (xaTransactions) { 208 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) { 209 Transaction tx = iter.next(); 210 if (tx.isPrepared()) { 211 LOG.debug("prepared transaction: {}", tx.getTransactionId()); 212 txs.add(tx.getTransactionId()); 213 } 214 } 215 } 216 XATransactionId rc[] = new XATransactionId[txs.size()]; 217 txs.toArray(rc); 218 LOG.debug("prepared transaction list size: {}", rc.length); 219 return rc; 220 } 221 222 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 223 // the transaction may have already been started. 224 if (xid.isXATransaction()) { 225 XATransaction transaction = null; 226 synchronized (xaTransactions) { 227 transaction = xaTransactions.get(xid); 228 if (transaction != null) { 229 return; 230 } 231 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId()); 232 xaTransactions.put(xid, transaction); 233 } 234 } else { 235 Map<TransactionId, Transaction> transactionMap = context.getTransactions(); 236 Transaction transaction = transactionMap.get(xid); 237 if (transaction != null) { 238 throw new JMSException("Transaction '" + xid + "' has already been started."); 239 } 240 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context); 241 transactionMap.put(xid, transaction); 242 } 243 } 244 245 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 246 Transaction transaction = getTransaction(context, xid, false); 247 return transaction.prepare(); 248 } 249 250 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 251 Transaction transaction = getTransaction(context, xid, true); 252 transaction.commit(onePhase); 253 } 254 255 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 256 Transaction transaction = getTransaction(context, xid, true); 257 transaction.rollback(); 258 } 259 260 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 261 Transaction transaction = getTransaction(context, xid, true); 262 transaction.rollback(); 263 } 264 265 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 266 // This method may be invoked recursively. 267 // Track original tx so that it can be restored. 268 final ConnectionContext context = consumerExchange.getConnectionContext(); 269 Transaction originalTx = context.getTransaction(); 270 Transaction transaction = null; 271 if (ack.isInTransaction()) { 272 transaction = getTransaction(context, ack.getTransactionId(), false); 273 } 274 context.setTransaction(transaction); 275 try { 276 next.acknowledge(consumerExchange, ack); 277 } finally { 278 context.setTransaction(originalTx); 279 } 280 } 281 282 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception { 283 // This method may be invoked recursively. 284 // Track original tx so that it can be restored. 285 final ConnectionContext context = producerExchange.getConnectionContext(); 286 Transaction originalTx = context.getTransaction(); 287 Transaction transaction = null; 288 if (message.getTransactionId() != null) { 289 transaction = getTransaction(context, message.getTransactionId(), false); 290 } 291 context.setTransaction(transaction); 292 try { 293 next.send(producerExchange, message); 294 } finally { 295 context.setTransaction(originalTx); 296 } 297 } 298 299 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 300 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) { 301 try { 302 Transaction transaction = iter.next(); 303 transaction.rollback(); 304 } catch (Exception e) { 305 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e); 306 } 307 iter.remove(); 308 } 309 310 synchronized (xaTransactions) { 311 // first find all txs that belongs to the connection 312 ArrayList<XATransaction> txs = new ArrayList<XATransaction>(); 313 for (XATransaction tx : xaTransactions.values()) { 314 if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) { 315 txs.add(tx); 316 } 317 } 318 319 // then remove them 320 // two steps needed to avoid ConcurrentModificationException, from removeTransaction() 321 for (XATransaction tx : txs) { 322 try { 323 tx.rollback(); 324 } catch (Exception e) { 325 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e); 326 } 327 } 328 329 } 330 next.removeConnection(context, info, error); 331 } 332 333 // //////////////////////////////////////////////////////////////////////////// 334 // 335 // Implementation help methods. 336 // 337 // //////////////////////////////////////////////////////////////////////////// 338 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException { 339 Transaction transaction = null; 340 if (xid.isXATransaction()) { 341 synchronized (xaTransactions) { 342 transaction = xaTransactions.get(xid); 343 } 344 } else { 345 transaction = context.getTransactions().get(xid); 346 } 347 if (transaction != null) { 348 return transaction; 349 } 350 if (xid.isXATransaction()) { 351 XAException e = XATransaction.newXAException("Transaction '" + xid + "' has not been started.", XAException.XAER_NOTA); 352 throw e; 353 } else { 354 throw new JMSException("Transaction '" + xid + "' has not been started."); 355 } 356 } 357 358 public void removeTransaction(XATransactionId xid) { 359 synchronized (xaTransactions) { 360 xaTransactions.remove(xid); 361 } 362 } 363 364}