001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.TimeUnit;
026
027import javax.jms.JMSException;
028
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessageId;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.Response;
042import org.apache.activemq.thread.Scheduler;
043import org.apache.activemq.transaction.Synchronization;
044import org.apache.activemq.transport.TransmitCallback;
045import org.apache.activemq.usage.SystemUsage;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A subscription that honors the pre-fetch option of the ConsumerInfo.
051 */
052public abstract class PrefetchSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
055    protected final Scheduler scheduler;
056
057    protected PendingMessageCursor pending;
058    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
059    protected boolean usePrefetchExtension = true;
060    private int maxProducersToAudit=32;
061    private int maxAuditDepth=2048;
062    protected final SystemUsage usageManager;
063    protected final Object pendingLock = new Object();
064    protected final Object dispatchLock = new Object();
065    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
066
067    public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException {
068        super(broker,context, info);
069        this.usageManager=usageManager;
070        pending = cursor;
071        try {
072            pending.start();
073        } catch (Exception e) {
074            throw new JMSException(e.getMessage());
075        }
076        this.scheduler = broker.getScheduler();
077    }
078
079    public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
080        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
081    }
082
083    /**
084     * Allows a message to be pulled on demand by a client
085     */
086    @Override
087    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
088        // The slave should not deliver pull messages.
089        // TODO: when the slave becomes a master, He should send a NULL message to all the
090        // consumers to 'wake them up' in case they were waiting for a message.
091        if (getPrefetchSize() == 0) {
092            prefetchExtension.set(pull.getQuantity());
093            final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
094
095            // Have the destination push us some messages.
096            for (Destination dest : destinations) {
097                dest.iterate();
098            }
099            dispatchPending();
100
101            synchronized(this) {
102                // If there was nothing dispatched.. we may need to setup a timeout.
103                if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
104                    // immediate timeout used by receiveNoWait()
105                    if (pull.getTimeout() == -1) {
106                        // Null message indicates the pull is done or did not have pending.
107                        prefetchExtension.set(1);
108                        add(QueueMessageReference.NULL_MESSAGE);
109                        dispatchPending();
110                    }
111                    if (pull.getTimeout() > 0) {
112                        scheduler.executeAfterDelay(new Runnable() {
113                            @Override
114                            public void run() {
115                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
116                            }
117                        }, pull.getTimeout());
118                    }
119                }
120            }
121        }
122        return null;
123    }
124
125    /**
126     * Occurs when a pull times out. If nothing has been dispatched since the
127     * timeout was setup, then send the NULL message.
128     */
129    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
130        synchronized (pendingLock) {
131            if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) {
132                try {
133                    prefetchExtension.set(1);
134                    add(QueueMessageReference.NULL_MESSAGE);
135                    dispatchPending();
136                } catch (Exception e) {
137                    context.getConnection().serviceException(e);
138                } finally {
139                    prefetchExtension.set(0);
140                }
141            }
142        }
143    }
144
145    @Override
146    public void add(MessageReference node) throws Exception {
147        synchronized (pendingLock) {
148            // The destination may have just been removed...
149            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
150                // perhaps we should inform the caller that we are no longer valid to dispatch to?
151                return;
152            }
153
154            // Don't increment for the pullTimeout control message.
155            if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
156                getSubscriptionStatistics().getEnqueues().increment();
157            }
158            pending.addMessageLast(node);
159        }
160        dispatchPending();
161    }
162
163    @Override
164    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
165        synchronized(pendingLock) {
166            try {
167                pending.reset();
168                while (pending.hasNext()) {
169                    MessageReference node = pending.next();
170                    node.decrementReferenceCount();
171                    if (node.getMessageId().equals(mdn.getMessageId())) {
172                        // Synchronize between dispatched list and removal of messages from pending list
173                        // related to remove subscription action
174                        synchronized(dispatchLock) {
175                            pending.remove();
176                            createMessageDispatch(node, node.getMessage());
177                            dispatched.add(node);
178                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
179                            onDispatch(node, node.getMessage());
180                        }
181                        return;
182                    }
183                }
184            } finally {
185                pending.release();
186            }
187        }
188        throw new JMSException(
189                "Slave broker out of sync with master: Dispatched message ("
190                        + mdn.getMessageId() + ") was not in the pending list for "
191                        + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
192    }
193
194    @Override
195    public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
196        // Handle the standard acknowledgment case.
197        boolean callDispatchMatched = false;
198        Destination destination = null;
199
200        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
201            // suppress unexpected ack exception in this expected case
202            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack);
203            return;
204        }
205
206        LOG.trace("ack: {}", ack);
207
208        synchronized(dispatchLock) {
209            if (ack.isStandardAck()) {
210                // First check if the ack matches the dispatched. When using failover this might
211                // not be the case. We don't ever want to ack the wrong messages.
212                assertAckMatchesDispatched(ack);
213
214                // Acknowledge all dispatched messages up till the message id of
215                // the acknowledgment.
216                boolean inAckRange = false;
217                List<MessageReference> removeList = new ArrayList<MessageReference>();
218                for (final MessageReference node : dispatched) {
219                    MessageId messageId = node.getMessageId();
220                    if (ack.getFirstMessageId() == null
221                            || ack.getFirstMessageId().equals(messageId)) {
222                        inAckRange = true;
223                    }
224                    if (inAckRange) {
225                        // Don't remove the nodes until we are committed.
226                        if (!context.isInTransaction()) {
227                            getSubscriptionStatistics().getDequeues().increment();
228                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
229                            removeList.add(node);
230                        } else {
231                            registerRemoveSync(context, node);
232                        }
233                        acknowledge(context, ack, node);
234                        if (ack.getLastMessageId().equals(messageId)) {
235                            destination = (Destination) node.getRegionDestination();
236                            callDispatchMatched = true;
237                            break;
238                        }
239                    }
240                }
241                for (final MessageReference node : removeList) {
242                    dispatched.remove(node);
243                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
244                }
245                // this only happens after a reconnect - get an ack which is not
246                // valid
247                if (!callDispatchMatched) {
248                    LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack);
249                }
250            } else if (ack.isIndividualAck()) {
251                // Message was delivered and acknowledge - but only delete the
252                // individual message
253                for (final MessageReference node : dispatched) {
254                    MessageId messageId = node.getMessageId();
255                    if (ack.getLastMessageId().equals(messageId)) {
256                        // Don't remove the nodes until we are committed - immediateAck option
257                        if (!context.isInTransaction()) {
258                            getSubscriptionStatistics().getDequeues().increment();
259                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
260                            dispatched.remove(node);
261                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
262                        } else {
263                            registerRemoveSync(context, node);
264                        }
265
266                        if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
267                            // allow transaction batch to exceed prefetch
268                            while (true) {
269                                int currentExtension = prefetchExtension.get();
270                                int newExtension = Math.max(currentExtension, currentExtension + 1);
271                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
272                                    break;
273                                }
274                            }
275                        }
276
277                        acknowledge(context, ack, node);
278                        destination = (Destination) node.getRegionDestination();
279                        callDispatchMatched = true;
280                        break;
281                    }
282                }
283            }else if (ack.isDeliveredAck()) {
284                // Message was delivered but not acknowledged: update pre-fetch
285                // counters.
286                int index = 0;
287                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
288                    final MessageReference node = iter.next();
289                    Destination nodeDest = (Destination) node.getRegionDestination();
290                    if (ack.getLastMessageId().equals(node.getMessageId())) {
291                        if (usePrefetchExtension && getPrefetchSize() != 0) {
292                            // allow  batch to exceed prefetch
293                            while (true) {
294                                int currentExtension = prefetchExtension.get();
295                                int newExtension = Math.max(currentExtension, index + 1);
296                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
297                                    break;
298                                }
299                            }
300                        }
301                        destination = nodeDest;
302                        callDispatchMatched = true;
303                        break;
304                    }
305                }
306                if (!callDispatchMatched) {
307                    throw new JMSException(
308                            "Could not correlate acknowledgment with dispatched message: "
309                                    + ack);
310                }
311            } else if (ack.isExpiredAck()) {
312                // Message was expired
313                int index = 0;
314                boolean inAckRange = false;
315                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
316                    final MessageReference node = iter.next();
317                    Destination nodeDest = (Destination) node.getRegionDestination();
318                    MessageId messageId = node.getMessageId();
319                    if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
320                        inAckRange = true;
321                    }
322                    if (inAckRange) {
323                        Destination regionDestination = nodeDest;
324                        if (broker.isExpired(node)) {
325                            regionDestination.messageExpired(context, this, node);
326                        }
327                        iter.remove();
328                        nodeDest.getDestinationStatistics().getInflight().decrement();
329
330                        if (ack.getLastMessageId().equals(messageId)) {
331                            if (usePrefetchExtension && getPrefetchSize() != 0) {
332                                // allow  batch to exceed prefetch
333                                while (true) {
334                                    int currentExtension = prefetchExtension.get();
335                                    int newExtension = Math.max(currentExtension, index + 1);
336                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
337                                        break;
338                                    }
339                                }
340                            }
341
342                            destination = (Destination) node.getRegionDestination();
343                            callDispatchMatched = true;
344                            break;
345                        }
346                    }
347                }
348                if (!callDispatchMatched) {
349                    throw new JMSException(
350                            "Could not correlate expiration acknowledgment with dispatched message: "
351                                    + ack);
352                }
353            } else if (ack.isRedeliveredAck()) {
354                // Message was re-delivered but it was not yet considered to be
355                // a DLQ message.
356                boolean inAckRange = false;
357                for (final MessageReference node : dispatched) {
358                    MessageId messageId = node.getMessageId();
359                    if (ack.getFirstMessageId() == null
360                            || ack.getFirstMessageId().equals(messageId)) {
361                        inAckRange = true;
362                    }
363                    if (inAckRange) {
364                        if (ack.getLastMessageId().equals(messageId)) {
365                            destination = (Destination) node.getRegionDestination();
366                            callDispatchMatched = true;
367                            break;
368                        }
369                    }
370                }
371                if (!callDispatchMatched) {
372                    throw new JMSException(
373                            "Could not correlate acknowledgment with dispatched message: "
374                                    + ack);
375                }
376            } else if (ack.isPoisonAck()) {
377                // TODO: what if the message is already in a DLQ???
378                // Handle the poison ACK case: we need to send the message to a
379                // DLQ
380                if (ack.isInTransaction()) {
381                    throw new JMSException("Poison ack cannot be transacted: "
382                            + ack);
383                }
384                int index = 0;
385                boolean inAckRange = false;
386                List<MessageReference> removeList = new ArrayList<MessageReference>();
387                for (final MessageReference node : dispatched) {
388                    MessageId messageId = node.getMessageId();
389                    if (ack.getFirstMessageId() == null
390                            || ack.getFirstMessageId().equals(messageId)) {
391                        inAckRange = true;
392                    }
393                    if (inAckRange) {
394                        sendToDLQ(context, node, ack.getPoisonCause());
395                        Destination nodeDest = (Destination) node.getRegionDestination();
396                        nodeDest.getDestinationStatistics()
397                        .getInflight().decrement();
398                        removeList.add(node);
399                        getSubscriptionStatistics().getDequeues().increment();
400                        index++;
401                        acknowledge(context, ack, node);
402                        if (ack.getLastMessageId().equals(messageId)) {
403                            while (true) {
404                                int currentExtension = prefetchExtension.get();
405                                int newExtension = Math.max(0, currentExtension - (index + 1));
406                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
407                                    break;
408                                }
409                            }
410                            destination = nodeDest;
411                            callDispatchMatched = true;
412                            break;
413                        }
414                    }
415                }
416                for (final MessageReference node : removeList) {
417                    dispatched.remove(node);
418                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
419                }
420                if (!callDispatchMatched) {
421                    throw new JMSException(
422                            "Could not correlate acknowledgment with dispatched message: "
423                                    + ack);
424                }
425            }
426        }
427        if (callDispatchMatched && destination != null) {
428            destination.wakeup();
429            dispatchPending();
430
431            if (pending.isEmpty()) {
432                wakeupDestinationsForDispatch();
433            }
434        } else {
435            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
436        }
437    }
438
439    private void registerRemoveSync(ConnectionContext context, final MessageReference node) {
440        // setup a Synchronization to remove nodes from the
441        // dispatched list.
442        context.getTransaction().addSynchronization(
443                new Synchronization() {
444
445                    @Override
446                    public void beforeEnd() {
447                        if (usePrefetchExtension && getPrefetchSize() != 0) {
448                            while (true) {
449                                int currentExtension = prefetchExtension.get();
450                                int newExtension = Math.max(0, currentExtension - 1);
451                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
452                                    break;
453                                }
454                            }
455                        }
456                    }
457
458                    @Override
459                    public void afterCommit()
460                            throws Exception {
461                        Destination nodeDest = (Destination) node.getRegionDestination();
462                        synchronized(dispatchLock) {
463                            getSubscriptionStatistics().getDequeues().increment();
464                            dispatched.remove(node);
465                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
466                            nodeDest.getDestinationStatistics().getInflight().decrement();
467                        }
468                        nodeDest.wakeup();
469                        dispatchPending();
470                    }
471
472                    @Override
473                    public void afterRollback() throws Exception {
474                        synchronized(dispatchLock) {
475                            // poisionAck will decrement - otherwise still inflight on client
476                        }
477                    }
478                });
479    }
480
481    /**
482     * Checks an ack versus the contents of the dispatched list.
483     *  called with dispatchLock held
484     * @param ack
485     * @throws JMSException if it does not match
486     */
487    protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
488        MessageId firstAckedMsg = ack.getFirstMessageId();
489        MessageId lastAckedMsg = ack.getLastMessageId();
490        int checkCount = 0;
491        boolean checkFoundStart = false;
492        boolean checkFoundEnd = false;
493        for (MessageReference node : dispatched) {
494
495            if (firstAckedMsg == null) {
496                checkFoundStart = true;
497            } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
498                checkFoundStart = true;
499            }
500
501            if (checkFoundStart) {
502                checkCount++;
503            }
504
505            if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
506                checkFoundEnd = true;
507                break;
508            }
509        }
510        if (!checkFoundStart && firstAckedMsg != null)
511            throw new JMSException("Unmatched acknowledge: " + ack
512                    + "; Could not find Message-ID " + firstAckedMsg
513                    + " in dispatched-list (start of ack)");
514        if (!checkFoundEnd && lastAckedMsg != null)
515            throw new JMSException("Unmatched acknowledge: " + ack
516                    + "; Could not find Message-ID " + lastAckedMsg
517                    + " in dispatched-list (end of ack)");
518        if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
519            throw new JMSException("Unmatched acknowledge: " + ack
520                    + "; Expected message count (" + ack.getMessageCount()
521                    + ") differs from count in dispatched-list (" + checkCount
522                    + ")");
523        }
524    }
525
526    /**
527     *
528     * @param context
529     * @param node
530     * @param poisonCause
531     * @throws IOException
532     * @throws Exception
533     */
534    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
535        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
536    }
537
538    @Override
539    public int getInFlightSize() {
540        return dispatched.size();
541    }
542
543    /**
544     * Used to determine if the broker can dispatch to the consumer.
545     *
546     * @return true if the subscription is full
547     */
548    @Override
549    public boolean isFull() {
550        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
551    }
552
553    /**
554     * @return true when 60% or more room is left for dispatching messages
555     */
556    @Override
557    public boolean isLowWaterMark() {
558        return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
559    }
560
561    /**
562     * @return true when 10% or less room is left for dispatching messages
563     */
564    @Override
565    public boolean isHighWaterMark() {
566        return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
567    }
568
569    @Override
570    public int countBeforeFull() {
571        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
572    }
573
574    @Override
575    public int getPendingQueueSize() {
576        return pending.size();
577    }
578
579    @Override
580    public long getPendingMessageSize() {
581        synchronized (pendingLock) {
582            return pending.messageSize();
583        }
584    }
585
586    @Override
587    public int getDispatchedQueueSize() {
588        return dispatched.size();
589    }
590
591    @Override
592    public long getDequeueCounter() {
593        return getSubscriptionStatistics().getDequeues().getCount();
594    }
595
596    @Override
597    public long getDispatchedCounter() {
598        return getSubscriptionStatistics().getDispatched().getCount();
599    }
600
601    @Override
602    public long getEnqueueCounter() {
603        return getSubscriptionStatistics().getEnqueues().getCount();
604    }
605
606    @Override
607    public boolean isRecoveryRequired() {
608        return pending.isRecoveryRequired();
609    }
610
611    public PendingMessageCursor getPending() {
612        return this.pending;
613    }
614
615    public void setPending(PendingMessageCursor pending) {
616        this.pending = pending;
617        if (this.pending!=null) {
618            this.pending.setSystemUsage(usageManager);
619            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
620        }
621    }
622
623    @Override
624    public void add(ConnectionContext context, Destination destination) throws Exception {
625        synchronized(pendingLock) {
626            super.add(context, destination);
627            pending.add(context, destination);
628        }
629    }
630
631    @Override
632    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
633        return remove(context, destination, dispatched);
634    }
635
636    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
637        LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
638        synchronized(pendingLock) {
639            super.remove(context, destination);
640            // Here is a potential problem concerning Inflight stat:
641            // Messages not already committed or rolled back may not be removed from dispatched list at the moment
642            // Except if each commit or rollback callback action comes before remove of subscriber.
643            redispatch.addAll(pending.remove(context, destination));
644
645            if (dispatched == null) {
646                return redispatch;
647            }
648
649            // Synchronized to DispatchLock if necessary
650            if (dispatched == this.dispatched) {
651                synchronized(dispatchLock) {
652                    addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
653                }
654            } else {
655                addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
656            }
657        }
658
659        return redispatch;
660    }
661
662    private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
663        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
664        for (MessageReference r : dispatched) {
665            if (r.getRegionDestination() == destination) {
666                references.add(r);
667                getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
668            }
669        }
670        redispatch.addAll(0, references);
671        destination.getDestinationStatistics().getInflight().subtract(references.size());
672        dispatched.removeAll(references);
673    }
674
675    // made public so it can be used in MQTTProtocolConverter
676    public void dispatchPending() throws IOException {
677        List<Destination> slowConsumerTargets = null;
678
679        synchronized(pendingLock) {
680            try {
681                int numberToDispatch = countBeforeFull();
682                if (numberToDispatch > 0) {
683                    setSlowConsumer(false);
684                    setPendingBatchSize(pending, numberToDispatch);
685                    int count = 0;
686                    pending.reset();
687                    while (pending.hasNext() && !isFull() && count < numberToDispatch) {
688                        MessageReference node = pending.next();
689                        if (node == null) {
690                            break;
691                        }
692
693                        // Synchronize between dispatched list and remove of message from pending list
694                        // related to remove subscription action
695                        synchronized(dispatchLock) {
696                            pending.remove();
697                            if (!isDropped(node) && canDispatch(node)) {
698
699                                // Message may have been sitting in the pending
700                                // list a while waiting for the consumer to ak the message.
701                                if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
702                                    //increment number to dispatch
703                                    numberToDispatch++;
704                                    if (broker.isExpired(node)) {
705                                        ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
706                                    }
707
708                                    if (!isBrowser()) {
709                                        node.decrementReferenceCount();
710                                        continue;
711                                    }
712                                }
713                                dispatch(node);
714                                count++;
715                            }
716                        }
717                        // decrement after dispatch has taken ownership to avoid usage jitter
718                        node.decrementReferenceCount();
719                    }
720                } else if (!isSlowConsumer()) {
721                    setSlowConsumer(true);
722                    slowConsumerTargets = destinations;
723                }
724            } finally {
725                pending.release();
726            }
727        }
728
729        if (slowConsumerTargets != null) {
730            for (Destination dest : slowConsumerTargets) {
731                dest.slowConsumer(context, this);
732            }
733        }
734    }
735
736    protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
737        pending.setMaxBatchSize(numberToDispatch);
738    }
739
740    // called with dispatchLock held
741    protected boolean dispatch(final MessageReference node) throws IOException {
742        final Message message = node.getMessage();
743        if (message == null) {
744            return false;
745        }
746
747        okForAckAsDispatchDone.countDown();
748
749        MessageDispatch md = createMessageDispatch(node, message);
750        if (node != QueueMessageReference.NULL_MESSAGE) {
751            getSubscriptionStatistics().getDispatched().increment();
752            dispatched.add(node);
753            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
754        }
755        if (getPrefetchSize() == 0) {
756            while (true) {
757                int currentExtension = prefetchExtension.get();
758                int newExtension = Math.max(0, currentExtension - 1);
759                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
760                    break;
761                }
762            }
763        }
764        if (info.isDispatchAsync()) {
765            md.setTransmitCallback(new TransmitCallback() {
766
767                @Override
768                public void onSuccess() {
769                    // Since the message gets queued up in async dispatch, we don't want to
770                    // decrease the reference count until it gets put on the wire.
771                    onDispatch(node, message);
772                }
773
774                @Override
775                public void onFailure() {
776                    Destination nodeDest = (Destination) node.getRegionDestination();
777                    if (nodeDest != null) {
778                        if (node != QueueMessageReference.NULL_MESSAGE) {
779                            nodeDest.getDestinationStatistics().getDispatched().increment();
780                            nodeDest.getDestinationStatistics().getInflight().increment();
781                            LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
782                        }
783                    }
784                    if (node instanceof QueueMessageReference) {
785                        ((QueueMessageReference) node).unlock();
786                    }
787                }
788            });
789            context.getConnection().dispatchAsync(md);
790        } else {
791            context.getConnection().dispatchSync(md);
792            onDispatch(node, message);
793        }
794        return true;
795    }
796
797    protected void onDispatch(final MessageReference node, final Message message) {
798        Destination nodeDest = (Destination) node.getRegionDestination();
799        if (nodeDest != null) {
800            if (node != QueueMessageReference.NULL_MESSAGE) {
801                nodeDest.getDestinationStatistics().getDispatched().increment();
802                nodeDest.getDestinationStatistics().getInflight().increment();
803                LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
804            }
805        }
806
807        if (info.isDispatchAsync()) {
808            try {
809                dispatchPending();
810            } catch (IOException e) {
811                context.getConnection().serviceExceptionAsync(e);
812            }
813        }
814    }
815
816    /**
817     * inform the MessageConsumer on the client to change it's prefetch
818     *
819     * @param newPrefetch
820     */
821    @Override
822    public void updateConsumerPrefetch(int newPrefetch) {
823        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
824            ConsumerControl cc = new ConsumerControl();
825            cc.setConsumerId(info.getConsumerId());
826            cc.setPrefetch(newPrefetch);
827            context.getConnection().dispatchAsync(cc);
828        }
829    }
830
831    /**
832     * @param node
833     * @param message
834     * @return MessageDispatch
835     */
836    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
837        MessageDispatch md = new MessageDispatch();
838        md.setConsumerId(info.getConsumerId());
839
840        if (node == QueueMessageReference.NULL_MESSAGE) {
841            md.setMessage(null);
842            md.setDestination(null);
843        } else {
844            Destination regionDestination = (Destination) node.getRegionDestination();
845            md.setDestination(regionDestination.getActiveMQDestination());
846            md.setMessage(message);
847            md.setRedeliveryCounter(node.getRedeliveryCounter());
848        }
849
850        return md;
851    }
852
853    /**
854     * Use when a matched message is about to be dispatched to the client.
855     *
856     * @param node
857     * @return false if the message should not be dispatched to the client
858     *         (another sub may have already dispatched it for example).
859     * @throws IOException
860     */
861    protected abstract boolean canDispatch(MessageReference node) throws IOException;
862
863    protected abstract boolean isDropped(MessageReference node);
864
865    /**
866     * Used during acknowledgment to remove the message.
867     *
868     * @throws IOException
869     */
870    protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
871
872
873    public int getMaxProducersToAudit() {
874        return maxProducersToAudit;
875    }
876
877    public void setMaxProducersToAudit(int maxProducersToAudit) {
878        this.maxProducersToAudit = maxProducersToAudit;
879        if (this.pending != null) {
880            this.pending.setMaxProducersToAudit(maxProducersToAudit);
881        }
882    }
883
884    public int getMaxAuditDepth() {
885        return maxAuditDepth;
886    }
887
888    public void setMaxAuditDepth(int maxAuditDepth) {
889        this.maxAuditDepth = maxAuditDepth;
890        if (this.pending != null) {
891            this.pending.setMaxAuditDepth(maxAuditDepth);
892        }
893    }
894
895    public boolean isUsePrefetchExtension() {
896        return usePrefetchExtension;
897    }
898
899    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
900        this.usePrefetchExtension = usePrefetchExtension;
901    }
902
903    @Override
904    public void setPrefetchSize(int prefetchSize) {
905        this.info.setPrefetchSize(prefetchSize);
906        try {
907            this.dispatchPending();
908        } catch (Exception e) {
909            LOG.trace("Caught exception during dispatch after prefetch change.", e);
910        }
911    }
912}