001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import org.apache.activemq.broker.region.Destination;
020import org.apache.activemq.broker.region.DurableTopicSubscription;
021import org.apache.activemq.broker.region.Subscription;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.ActiveMQQueue;
024import org.apache.activemq.command.ActiveMQTopic;
025import org.apache.activemq.command.Message;
026
027/**
028 * A {@link DeadLetterStrategy} where each destination has its own individual
029 * DLQ using the subject naming hierarchy.
030 *
031 * @org.apache.xbean.XBean
032 *
033 */
034public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
035
036    private String topicPrefix = "ActiveMQ.DLQ.Topic.";
037    private String queuePrefix = "ActiveMQ.DLQ.Queue.";
038    private String topicSuffix;
039    private String queueSuffix;
040    private boolean useQueueForQueueMessages = true;
041    private boolean useQueueForTopicMessages = true;
042    private boolean destinationPerDurableSubscriber;
043
044    public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
045        if (message.getDestination().isQueue()) {
046            return createDestination(message, queuePrefix, queueSuffix, useQueueForQueueMessages, subscription);
047        } else {
048            return createDestination(message, topicPrefix, topicSuffix, useQueueForTopicMessages, subscription);
049        }
050    }
051
052    // Properties
053    // -------------------------------------------------------------------------
054
055    public String getQueuePrefix() {
056        return queuePrefix;
057    }
058
059    /**
060     * Sets the prefix to use for all dead letter queues for queue messages
061     */
062    public void setQueuePrefix(String queuePrefix) {
063        this.queuePrefix = queuePrefix;
064    }
065
066    public String getTopicPrefix() {
067        return topicPrefix;
068    }
069
070    /**
071     * Sets the prefix to use for all dead letter queues for topic messages
072     */
073    public void setTopicPrefix(String topicPrefix) {
074        this.topicPrefix = topicPrefix;
075    }
076
077    public String getQueueSuffix() {
078        return queueSuffix;
079    }
080
081    /**
082     * Sets the suffix to use for all dead letter queues for queue messages
083     */
084    public void setQueueSuffix(String queueSuffix) {
085        this.queueSuffix = queueSuffix;
086    }
087
088    public String getTopicSuffix() {
089        return topicSuffix;
090    }
091
092    /**
093     * Sets the suffix to use for all dead letter queues for topic messages
094     */
095    public void setTopicSuffix(String topicSuffix) {
096        this.topicSuffix = topicSuffix;
097    }
098
099    public boolean isUseQueueForQueueMessages() {
100        return useQueueForQueueMessages;
101    }
102
103    /**
104     * Sets whether a queue or topic should be used for queue messages sent to a
105     * DLQ. The default is to use a Queue
106     */
107    public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
108        this.useQueueForQueueMessages = useQueueForQueueMessages;
109    }
110
111    public boolean isUseQueueForTopicMessages() {
112        return useQueueForTopicMessages;
113    }
114
115    /**
116     * Sets whether a queue or topic should be used for topic messages sent to a
117     * DLQ. The default is to use a Queue
118     */
119    public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
120        this.useQueueForTopicMessages = useQueueForTopicMessages;
121    }
122
123    public boolean isDestinationPerDurableSubscriber() {
124        return destinationPerDurableSubscriber;
125    }
126
127    /**
128     * sets whether durable topic subscriptions are to get individual dead letter destinations.
129     * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
130     * The default is false.
131     * @param destinationPerDurableSubscriber
132     */
133    public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
134        this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
135    }
136
137    // Implementation methods
138    // -------------------------------------------------------------------------
139    protected ActiveMQDestination createDestination(Message message,
140                                                    String prefix,
141                                                    String suffix,
142                                                    boolean useQueue,
143                                                    Subscription subscription ) {
144        String name = null;
145
146        Destination regionDestination = (Destination) message.getRegionDestination();
147        if (regionDestination != null
148                && regionDestination.getActiveMQDestination() != null
149                && regionDestination.getActiveMQDestination().getPhysicalName() != null
150                && !regionDestination.getActiveMQDestination().getPhysicalName().isEmpty()){
151            name = prefix + regionDestination.getActiveMQDestination().getPhysicalName();
152        } else {
153            name = prefix + message.getDestination().getPhysicalName();
154        }
155
156        if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
157            name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
158        }
159
160        if (suffix != null && !suffix.isEmpty()) {
161            name += suffix;
162        }
163
164        if (useQueue) {
165            return new ActiveMQQueue(name);
166        } else {
167            return new ActiveMQTopic(name);
168        }
169    }
170
171}