001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.util.Iterator;
020import java.util.Set;
021import java.util.concurrent.CopyOnWriteArraySet;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.activemq.Service;
025import org.apache.activemq.ThreadPriorities;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Used to provide information on the status of the Connection
031 * 
032 * 
033 */
034public class TransportStatusDetector implements Service, Runnable {
035    private static final Logger LOG = LoggerFactory.getLogger(TransportStatusDetector.class);
036    private TransportConnector connector;
037    private Set<TransportConnection> collectionCandidates = new CopyOnWriteArraySet<TransportConnection>();
038    private AtomicBoolean started = new AtomicBoolean(false);
039    private Thread runner;
040    private int sweepInterval = 5000;
041
042    TransportStatusDetector(TransportConnector connector) {
043        this.connector = connector;
044    }
045
046    /**
047     * @return Returns the sweepInterval.
048     */
049    public int getSweepInterval() {
050        return sweepInterval;
051    }
052
053    /**
054     * The sweepInterval to set.
055     * 
056     * @param sweepInterval
057     */
058    public void setSweepInterval(int sweepInterval) {
059        this.sweepInterval = sweepInterval;
060    }
061
062    protected void doCollection() {
063        for (Iterator<TransportConnection> i = collectionCandidates.iterator(); i.hasNext();) {
064            TransportConnection tc = i.next();
065            if (tc.isMarkedCandidate()) {
066                if (tc.isBlockedCandidate()) {
067                    collectionCandidates.remove(tc);
068                    doCollection(tc);
069                } else {
070                    tc.doMark();
071                }
072            } else {
073                collectionCandidates.remove(tc);
074            }
075        }
076    }
077
078    protected void doSweep() {
079        for (Iterator i = connector.getConnections().iterator(); i.hasNext();) {
080            TransportConnection connection = (TransportConnection)i.next();
081            if (connection.isMarkedCandidate()) {
082                connection.doMark();
083                collectionCandidates.add(connection);
084            }
085        }
086    }
087
088    protected void doCollection(TransportConnection tc) {
089        LOG.warn("found a blocked client - stopping: {}", tc);
090        try {
091            tc.stop();
092        } catch (Exception e) {
093            LOG.error("Error stopping {}", tc, e);
094        }
095    }
096
097    public void run() {
098        while (started.get()) {
099            try {
100                doCollection();
101                doSweep();
102                Thread.sleep(sweepInterval);
103            } catch (Throwable e) {
104                LOG.error("failed to complete a sweep for blocked clients", e);
105            }
106        }
107    }
108
109    public void start() throws Exception {
110        if (started.compareAndSet(false, true)) {
111            runner = new Thread(this, "ActiveMQ Transport Status Monitor: " + connector);
112            runner.setDaemon(true);
113            runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
114            runner.start();
115        }
116    }
117
118    public void stop() throws Exception {
119        started.set(false);
120        if (runner != null) {
121            runner.join(getSweepInterval() * 5);
122        }
123    }
124}