001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store; 018 019import java.io.IOException; 020 021import org.apache.activemq.broker.ConnectionContext; 022import org.apache.activemq.command.ActiveMQDestination; 023import org.apache.activemq.command.Message; 024import org.apache.activemq.command.MessageAck; 025import org.apache.activemq.command.MessageId; 026import org.apache.activemq.usage.MemoryUsage; 027 028abstract public class AbstractMessageStore implements MessageStore { 029 public static final ListenableFuture<Object> FUTURE; 030 protected final ActiveMQDestination destination; 031 protected boolean prioritizedMessages; 032 protected IndexListener indexListener; 033 protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics(); 034 035 public AbstractMessageStore(ActiveMQDestination destination) { 036 this.destination = destination; 037 } 038 039 @Override 040 public void dispose(ConnectionContext context) { 041 } 042 043 @Override 044 public void start() throws Exception { 045 recoverMessageStoreStatistics(); 046 } 047 048 @Override 049 public void stop() throws Exception { 050 } 051 052 @Override 053 public ActiveMQDestination getDestination() { 054 return destination; 055 } 056 057 @Override 058 public void setMemoryUsage(MemoryUsage memoryUsage) { 059 } 060 061 @Override 062 public void setBatch(MessageId messageId) throws IOException, Exception { 063 } 064 065 /** 066 * flag to indicate if the store is empty 067 * 068 * @return true if the message count is 0 069 * @throws Exception 070 */ 071 @Override 072 public boolean isEmpty() throws Exception { 073 return getMessageCount() == 0; 074 } 075 076 @Override 077 public void setPrioritizedMessages(boolean prioritizedMessages) { 078 this.prioritizedMessages = prioritizedMessages; 079 } 080 081 @Override 082 public boolean isPrioritizedMessages() { 083 return this.prioritizedMessages; 084 } 085 086 @Override 087 public void addMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException{ 088 addMessage(context, message); 089 } 090 091 @Override 092 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { 093 addMessage(context, message); 094 return FUTURE; 095 } 096 097 @Override 098 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException { 099 addMessage(context, message, canOptimizeHint); 100 return FUTURE; 101 } 102 103 @Override 104 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException { 105 addMessage(context, message, canOptimizeHint); 106 return FUTURE; 107 } 108 109 @Override 110 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { 111 addMessage(context, message); 112 return new InlineListenableFuture(); 113 } 114 115 @Override 116 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 117 removeMessage(context, ack); 118 } 119 120 @Override 121 public void updateMessage(Message message) throws IOException { 122 throw new IOException("update is not supported by: " + this); 123 } 124 125 @Override 126 public void registerIndexListener(IndexListener indexListener) { 127 this.indexListener = indexListener; 128 } 129 130 public IndexListener getIndexListener() { 131 return indexListener; 132 } 133 134 static { 135 FUTURE = new InlineListenableFuture(); 136 } 137 138 @Override 139 public int getMessageCount() throws IOException { 140 return (int) getMessageStoreStatistics().getMessageCount().getCount(); 141 } 142 143 @Override 144 public long getMessageSize() throws IOException { 145 return getMessageStoreStatistics().getMessageSize().getTotalSize(); 146 } 147 148 @Override 149 public MessageStoreStatistics getMessageStoreStatistics() { 150 return messageStoreStatistics; 151 } 152 153 protected void recoverMessageStoreStatistics() throws IOException { 154 155 } 156}