001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.util.regex.Matcher;
020import java.util.regex.Pattern;
021
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQQueue;
027import org.apache.activemq.command.ActiveMQTopic;
028import org.apache.activemq.filter.DestinationFilter;
029
030/**
031 * Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
032 * Topics</a> using a prefix and postfix. The virtual destination creates a
033 * wildcard that is then used to look up all active queue subscriptions which
034 * match.
035 *
036 * @org.apache.xbean.XBean
037 */
038public class VirtualTopic implements VirtualDestination {
039
040    private String prefix = "Consumer.*.";
041    private String postfix = "";
042    private String name = ">";
043    private boolean selectorAware = false;
044    private boolean local = false;
045    private boolean concurrentSend = false;
046    private boolean transactedSend = false;
047
048    @Override
049    public ActiveMQDestination getVirtualDestination() {
050        return new ActiveMQTopic(getName());
051    }
052
053    @Override
054    public Destination intercept(Destination destination) {
055        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) :
056                new VirtualTopicInterceptor(destination, this);
057    }
058
059    @Override
060    public ActiveMQDestination getMappedDestinations() {
061        return new ActiveMQQueue(prefix + name + postfix);
062    }
063
064    @Override
065    public Destination interceptMappedDestination(Destination destination) {
066        // do a reverse map from destination to get actual virtual destination
067        final String physicalName = destination.getActiveMQDestination().getPhysicalName();
068        final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix));
069        final Matcher matcher = pattern.matcher(physicalName);
070        if (matcher.matches()) {
071            final String virtualName = matcher.group(1);
072            return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination);
073        }
074        return destination;
075    }
076
077    private String getRegex(String part) {
078        StringBuilder builder = new StringBuilder();
079        for (char c : part.toCharArray()) {
080            switch (c) {
081                case '.':
082                    builder.append("\\.");
083                    break;
084                case '*':
085                    builder.append("[^\\.]*");
086                    break;
087                default:
088                    builder.append(c);
089            }
090        }
091        return builder.toString();
092    }
093
094    @Override
095    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
096        if (destination.isQueue() && destination.isPattern()) {
097            DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
098            if (filter.matches(destination)) {
099                broker.addDestination(context, destination, false);
100
101            }
102        }
103    }
104
105    @Override
106    public void remove(Destination destination) {
107    }
108
109    // Properties
110    // -------------------------------------------------------------------------
111
112    public String getPostfix() {
113        return postfix;
114    }
115
116    /**
117     * Sets any postix used to identify the queue consumers
118     */
119    public void setPostfix(String postfix) {
120        this.postfix = postfix;
121    }
122
123    public String getPrefix() {
124        return prefix;
125    }
126
127    /**
128     * Sets the prefix wildcard used to identify the queue consumers for a given
129     * topic
130     */
131    public void setPrefix(String prefix) {
132        this.prefix = prefix;
133    }
134
135    public String getName() {
136        return name;
137    }
138
139    public void setName(String name) {
140        this.name = name;
141    }
142
143    /**
144     * Indicates whether the selectors of consumers are used to determine
145     * dispatch to a virtual destination, when true only messages matching an
146     * existing consumer will be dispatched.
147     *
148     * @param selectorAware
149     *            when true take consumer selectors into consideration
150     */
151    public void setSelectorAware(boolean selectorAware) {
152        this.selectorAware = selectorAware;
153    }
154
155    public boolean isSelectorAware() {
156        return selectorAware;
157    }
158
159    public boolean isLocal() {
160        return local;
161    }
162
163    public void setLocal(boolean local) {
164        this.local = local;
165    }
166
167    @Override
168    public String toString() {
169        return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').
170                                                  append(postfix).append(',').append(selectorAware).
171                                                  append(',').append(local).toString();
172    }
173
174    public boolean isConcurrentSend() {
175        return concurrentSend;
176    }
177
178    /**
179     * When true, dispatch to matching destinations in parallel (in multiple threads)
180     * @param concurrentSend
181     */
182    public void setConcurrentSend(boolean concurrentSend) {
183        this.concurrentSend = concurrentSend;
184    }
185
186    public boolean isTransactedSend() {
187        return transactedSend;
188    }
189
190    /**
191     * When true, dispatch to matching destinations always uses a transaction.
192     * @param transactedSend
193     */
194    public void setTransactedSend(boolean transactedSend) {
195        this.transactedSend = transactedSend;
196    }
197
198    @Override
199    public int hashCode() {
200        final int prime = 31;
201        int result = 1;
202        result = prime * result + (concurrentSend ? 1231 : 1237);
203        result = prime * result + (local ? 1231 : 1237);
204        result = prime * result + ((name == null) ? 0 : name.hashCode());
205        result = prime * result + ((postfix == null) ? 0 : postfix.hashCode());
206        result = prime * result + ((prefix == null) ? 0 : prefix.hashCode());
207        result = prime * result + (selectorAware ? 1231 : 1237);
208        result = prime * result + (transactedSend ? 1231 : 1237);
209        return result;
210    }
211
212    @Override
213    public boolean equals(Object obj) {
214        if (this == obj)
215            return true;
216        if (obj == null)
217            return false;
218        if (getClass() != obj.getClass())
219            return false;
220        VirtualTopic other = (VirtualTopic) obj;
221        if (concurrentSend != other.concurrentSend)
222            return false;
223        if (local != other.local)
224            return false;
225        if (name == null) {
226            if (other.name != null)
227                return false;
228        } else if (!name.equals(other.name))
229            return false;
230        if (postfix == null) {
231            if (other.postfix != null)
232                return false;
233        } else if (!postfix.equals(other.postfix))
234            return false;
235        if (prefix == null) {
236            if (other.prefix != null)
237                return false;
238        } else if (!prefix.equals(other.prefix))
239            return false;
240        if (selectorAware != other.selectorAware)
241            return false;
242        if (transactedSend != other.transactedSend)
243            return false;
244        return true;
245    }
246}