Java Source Code: org.apache.activemq.advisory.ConsumerEventSource


   1: /**
   2:  *
   3:  * Licensed to the Apache Software Foundation (ASF) under one or more
   4:  * contributor license agreements.  See the NOTICE file distributed with
   5:  * this work for additional information regarding copyright ownership.
   6:  * The ASF licenses this file to You under the Apache License, Version 2.0
   7:  * (the "License"); you may not use this file except in compliance with
   8:  * the License.  You may obtain a copy of the License at
   9:  *
  10:  * http://www.apache.org/licenses/LICENSE-2.0
  11:  *
  12:  * Unless required by applicable law or agreed to in writing, software
  13:  * distributed under the License is distributed on an "AS IS" BASIS,
  14:  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15:  * See the License for the specific language governing permissions and
  16:  * limitations under the License.
  17:  */
  18: package org.apache.activemq.advisory;
  19: 
  20: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
  21: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
  22: 
  23: import org.apache.activemq.Service;
  24: import org.apache.activemq.command.ActiveMQDestination;
  25: import org.apache.activemq.command.ActiveMQMessage;
  26: import org.apache.activemq.command.ActiveMQTopic;
  27: import org.apache.activemq.command.ConsumerId;
  28: import org.apache.activemq.command.ConsumerInfo;
  29: import org.apache.activemq.command.RemoveInfo;
  30: import org.apache.commons.logging.Log;
  31: import org.apache.commons.logging.LogFactory;
  32: 
  33: import javax.jms.Connection;
  34: import javax.jms.Destination;
  35: import javax.jms.JMSException;
  36: import javax.jms.Message;
  37: import javax.jms.MessageConsumer;
  38: import javax.jms.MessageListener;
  39: import javax.jms.Session;
  40: 
  41: /**
  42:  * An object which can be used to listen to the number of active consumers
  43:  * available on a given destination.
  44:  * 
  45:  * @version $Revision: 426366 $
  46:  */
  47:	  public class ConsumerEventSource implements Service, MessageListener {
  48:    private static final Log log = LogFactory.getLog(ConsumerEventSource.class);
  49:
  50:    private final Connection connection;
  51:    private final ActiveMQDestination destination;
  52:    private ConsumerListener listener;
  53:    private AtomicBoolean started = new AtomicBoolean(false);
  54:    private AtomicInteger consumerCount = new AtomicInteger();
  55:    private Session session;
  56:    private MessageConsumer consumer;
  57:
  58:	      public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
  59:        this.connection = connection;
  60:        this.destination = ActiveMQDestination.transform(destination);
  61:    }
  62:
  63:	      public void setConsumerListener(ConsumerListener listener) {
  64:        this.listener = listener;
  65:    }
  66:
  67:	      public void start() throws Exception {
  68:	          if (started.compareAndSet(false, true)) {
  69:            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  70:            ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
  71:            consumer = session.createConsumer(advisoryTopic);
  72:            consumer.setMessageListener(this);
  73:        }
  74:    }
  75:
  76:	      public void stop() throws Exception {
  77:	          if (started.compareAndSet(true, false)) {
  78:	              if (session != null) {
  79:                session.close();
  80:            }
  81:        }
  82:    }
  83:
  84:	      public void onMessage(Message message) {
  85:	          if (message instanceof ActiveMQMessage) {
  86:            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
  87:            Object command = activeMessage.getDataStructure();
  88:            int count = 0;
  89:	              if (command instanceof ConsumerInfo) {
  90:                count = consumerCount.incrementAndGet();
  91:                count = extractConsumerCountFromMessage(message, count);
  92:                fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo) command, count));
  93:            }
  94:	              else if (command instanceof RemoveInfo) {
  95:                RemoveInfo removeInfo = (RemoveInfo) command;
  96:	                  if (removeInfo.isConsumerRemove()) {
  97:                    count = consumerCount.decrementAndGet();
  98:                    count = extractConsumerCountFromMessage(message, count);
  99:                    fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId) removeInfo.getObjectId(), count));
 100:                }
 101:            }
 102:	              else {
 103:                log.warn("Unknown command: " + command);
 104:            }
 105:        }
 106:	          else {
 107:            log.warn("Unknown message type: " + message + ". Message ignored");
 108:        }
 109:    }
 110:
 111:    /**
 112:     * Lets rely by default on the broker telling us what the consumer count is
 113:     * as it can ensure that we are up to date at all times and have not
 114:     * received messages out of order etc.
 115:     */
 116:	      protected int extractConsumerCountFromMessage(Message message, int count) {
 117:	          try {
 118:            Object value = message.getObjectProperty("consumerCount");
 119:	              if (value instanceof Number) {
 120:                Number n = (Number) value;
 121:                return n.intValue();
 122:            }
 123:            log.warn("No consumerCount header available on the message: " + message);
 124:        }
 125:	          catch (Exception e) {
 126:            log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e);
 127:        }
 128:        return count;
 129:    }
 130:
 131:	      protected void fireConsumerEvent(ConsumerEvent event) {
 132:	          if (listener != null) {
 133:            listener.onConsumerEvent(event);
 134:        }
 135:    }
 136:
 137:}