001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.activemq.broker.AbstractLocker; 024import org.apache.activemq.util.LockFile; 025import org.apache.activemq.util.ServiceStopper; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Represents an exclusive lock on a database to avoid multiple brokers running 031 * against the same logical database. 032 * 033 * @org.apache.xbean.XBean element="shared-file-locker" 034 * 035 */ 036public class SharedFileLocker extends AbstractLocker { 037 038 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 039 private static final Logger LOG = LoggerFactory.getLogger(SharedFileLocker.class); 040 041 private LockFile lockFile; 042 protected File directory = DEFAULT_DIRECTORY; 043 044 @Override 045 public void doStart() throws Exception { 046 if (lockFile == null) { 047 File lockFileName = new File(directory, "lock"); 048 lockFile = new LockFile(lockFileName, false); 049 if (failIfLocked) { 050 lockFile.lock(); 051 } else { 052 // Print a warning only once 053 boolean warned = false; 054 boolean locked = false; 055 while ((!isStopped()) && (!isStopping())) { 056 try { 057 lockFile.lock(); 058 if (warned) { 059 // ensure lockHolder has released; wait for one keepAlive iteration 060 try { 061 TimeUnit.MILLISECONDS.sleep(lockable != null ? lockable.getLockKeepAlivePeriod() : 0l); 062 } catch (InterruptedException e1) { 063 } 064 } 065 locked = keepAlive(); 066 break; 067 } catch (IOException e) { 068 if (!warned) 069 { 070 LOG.info("Database " 071 + lockFileName 072 + " is locked by another server. This broker is now in slave mode waiting a lock to be acquired"); 073 warned = true; 074 } 075 076 LOG.debug("Database " 077 + lockFileName 078 + " is locked... waiting " 079 + (lockAcquireSleepInterval / 1000) 080 + " seconds for the database to be unlocked. Reason: " 081 + e); 082 try { 083 TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval); 084 } catch (InterruptedException e1) { 085 } 086 } 087 } 088 if (!locked) { 089 throw new IOException("attempt to obtain lock aborted due to shutdown"); 090 } 091 } 092 } 093 } 094 095 @Override 096 public boolean keepAlive() { 097 boolean result = lockFile != null && lockFile.keepAlive(); 098 LOG.trace("keepAlive result: " + result + (name != null ? ", name: " + name : "")); 099 return result; 100 } 101 102 @Override 103 public void doStop(ServiceStopper stopper) throws Exception { 104 if (lockFile != null) { 105 lockFile.unlock(); 106 lockFile = null; 107 } 108 } 109 110 public File getDirectory() { 111 return directory; 112 } 113 114 public void setDirectory(File directory) { 115 this.directory = directory; 116 } 117 118 @Override 119 public void configure(PersistenceAdapter persistenceAdapter) throws IOException { 120 this.setDirectory(persistenceAdapter.getDirectory()); 121 if (name == null) { 122 name = getDirectory().toString(); 123 } 124 } 125}