001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.ArrayList;
020import java.util.List;
021import org.apache.activemq.broker.Broker;
022import org.apache.activemq.broker.ConnectionContext;
023import org.apache.activemq.broker.region.MessageReference;
024import org.apache.activemq.broker.region.SubscriptionRecovery;
025import org.apache.activemq.broker.region.Topic;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.filter.DestinationFilter;
029
030/**
031 * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
032 * count of last messages.
033 * 
034 * @org.apache.xbean.XBean
035 * 
036 */
037public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
038    private volatile MessageReference messages[];
039    private int maximumSize = 100;
040    private int tail;
041
042    public SubscriptionRecoveryPolicy copy() {
043        FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy();
044        rc.setMaximumSize(maximumSize);
045        return rc;
046    }
047
048    public synchronized boolean add(ConnectionContext context, MessageReference node) throws Exception {
049        messages[tail++] = node;
050        if (tail >= messages.length) {
051            tail = 0;
052        }
053        return true;
054    }
055
056    public synchronized void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
057        // Re-dispatch the last message seen.
058        int t = tail;
059        // The buffer may not have rolled over yet..., start from the front
060        if (messages[t] == null) {
061            t = 0;
062        }
063        // Well the buffer is really empty then.
064        if (messages[t] == null) {
065            return;
066        }
067        // Keep dispatching until t hit's tail again.
068        do {
069            MessageReference node = messages[t];
070            sub.addRecoveredMessage(context, node);
071            t++;
072            if (t >= messages.length) {
073                t = 0;
074            }
075        } while (t != tail);
076    }
077
078    public void start() throws Exception {
079        messages = new MessageReference[maximumSize];
080    }
081
082    public void stop() throws Exception {
083        messages = null;
084    }
085
086    public int getMaximumSize() {
087        return maximumSize;
088    }
089
090    /**
091     * Sets the maximum number of messages that this destination will hold
092     * around in RAM
093     */
094    public void setMaximumSize(int maximumSize) {
095        this.maximumSize = maximumSize;
096    }
097
098    public synchronized Message[] browse(ActiveMQDestination destination) throws Exception {
099        List<Message> result = new ArrayList<Message>();
100        DestinationFilter filter = DestinationFilter.parseFilter(destination);
101        int t = tail;
102        if (messages[t] == null) {
103            t = 0;
104        }
105        if (messages[t] != null) {
106            do {
107                MessageReference ref = messages[t];
108                Message message = ref.getMessage();
109                if (filter.matches(message.getDestination())) {
110                    result.add(message);
111                }
112                t++;
113                if (t >= messages.length) {
114                    t = 0;
115                }
116            } while (t != tail);
117        }
118        return result.toArray(new Message[result.size()]);
119    }
120
121    public void setBroker(Broker broker) {        
122    }
123
124}