/******************************************************************************* * Copyright (c) 2009, 2019 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0 * and the Eclipse Distribution License is available at * https://www.eclipse.org/org/documents/edl-v10.php * * Contributors: * James Sutton - initial API and implementation and/or initial documentation */ package org.eclipse.paho.mqttv5.client; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.util.Hashtable; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import javax.net.SocketFactory; import org.eclipse.paho.mqttv5.client.internal.ClientComms; import org.eclipse.paho.mqttv5.client.internal.ConnectActionListener; import org.eclipse.paho.mqttv5.client.internal.DisconnectedMessageBuffer; import org.eclipse.paho.mqttv5.client.internal.MqttConnectionState; import org.eclipse.paho.mqttv5.client.internal.MqttSessionState; import org.eclipse.paho.mqttv5.client.internal.NetworkModule; import org.eclipse.paho.mqttv5.client.internal.NetworkModuleService; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.client.persist.MqttDefaultFilePersistence; import org.eclipse.paho.mqttv5.client.util.Debug; import org.eclipse.paho.mqttv5.client.logging.Logger; import org.eclipse.paho.mqttv5.client.logging.LoggerFactory; import org.eclipse.paho.mqttv5.common.ExceptionHelper; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttPersistenceException; import org.eclipse.paho.mqttv5.common.MqttSecurityException; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttAuth; import org.eclipse.paho.mqttv5.common.packet.MqttDataTypes; import org.eclipse.paho.mqttv5.common.packet.MqttDisconnect; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttPublish; import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttSubscribe; import org.eclipse.paho.mqttv5.common.packet.MqttUnsubscribe; import org.eclipse.paho.mqttv5.common.util.MqttTopicValidator; /** * Lightweight client for talking to an MQTT server using non-blocking methods * that allow an operation to run in the background. * *
* This class implements the non-blocking {@link IMqttAsyncClient} client * interface allowing applications to initiate MQTT actions and then carry on * working while the MQTT action completes on a background thread. This * implementation is compatible with all Java SE runtimes from 1.7 and up. *
** An application can connect to an MQTT server using: *
** To enable messages to be delivered even across network and client restarts * messages need to be safely stored until the message has been delivered at the * requested quality of service. A pluggable persistence mechanism is provided * to store the messages. *
** By default {@link MqttDefaultFilePersistence} is used to store messages to a * file. If persistence is set to null then messages are stored in memory and * hence can be lost if the client, Java runtime or device shuts down. *
** If connecting with {@link MqttConnectionOptions#setCleanStart(boolean)} set * to true it is safe to use memory persistence as all state is cleared when a * client disconnects. If connecting with cleanStart set to false in order to * provide reliable message delivery then a persistent message store such as the * default one should be used. *
** The message store interface is pluggable. Different stores can be used by * implementing the {@link MqttClientPersistence} interface and passing it to * the clients constructor. *
* * TODO - Class docs taken from IMqttAsyncClient, review for v5 Enables an * application to communicate with an MQTT server using non-blocking methods. ** It provides applications a simple programming interface to all features of * the MQTT version 3.1 specification including: *
** There are two styles of MQTT client, this one and {@link IMqttClient}. *
** An application is not restricted to using one style if an IMqttAsyncClient * based client is used as both blocking and non-blocking methods can be used in * the same application. If an IMqttClient based client is used then only * blocking methods are available to the application. For more details on the * blocking client see {@link IMqttClient} *
* ** There are two forms of non-blocking method: *
* IMqttToken token = asyncClient.method(parms) **
* In this form the method returns a token that can be used to track the * progress of the action (method). The method provides a waitForCompletion() * method that once invoked will block until the action completes. Once * completed there are method on the token that can be used to check if the * action completed successfully or not. For example to wait until a connect * completes: *
* ** IMqttToken conToken; * conToken = asyncClient.client.connect(conToken); * ... do some work... * conToken.waitForCompletion(); **
* To turn a method into a blocking invocation the following form can be used: *
* ** IMqttToken token; * token = asyncClient.method(parms).waitForCompletion(); ** *
* IMqttToken token method(parms, Object userContext, IMqttActionListener callback) **
* In this form a callback is registered with the method. The callback will be * notified when the action succeeds or fails. The callback is invoked on the * thread managed by the MQTT client so it is important that processing is * minimised in the callback. If not the operation of the MQTT client will be * inhibited. For example to be notified (called back) when a connect completes: *
* ** IMqttToken conToken; * conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() { * public void onSuccess(IMqttToken asyncActionToken) { * log("Connected"); * } * * public void onFailure(IMqttToken asyncActionToken, Throwable exception) { * log ("connect failed" +exception); * } * }); **
* An optional context object can be passed into the method which will then be * made available in the callback. The context is stored by the MQTT client) in * the token which is then returned to the invoker. The token is provided to the * callback methods where the context can then be accessed. *
** To understand when the delivery of a message is complete either of the two * methods above can be used to either wait on or be notified when the publish * completes. An alternative is to use the * {@link MqttCallback#deliveryComplete(IMqttToken)} method which will * also be notified when a message has been delivered to the requested quality * of service. *
* * @see IMqttAsyncClient */ public class MqttAsyncClient implements MqttClientInterface, IMqttAsyncClient { private static final String CLASS_NAME = MqttAsyncClient.class.getName(); private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME); private static final long QUIESCE_TIMEOUT = 30000; // ms private static final long DISCONNECT_TIMEOUT = 10000; // ms private static final char MIN_HIGH_SURROGATE = '\uD800'; private static final char MAX_HIGH_SURROGATE = '\uDBFF'; // private String clientId; private String serverURI; protected ClientComms comms; private Hashtable* The address of a server can be specified on the constructor. Alternatively a * list containing one or more servers can be specified using the * {@link MqttConnectionOptions#setServerURIs(String[]) setServerURIs} method on * MqttConnectOptions. * *
* The serverURI
parameter is typically used with the the
* clientId
parameter to form a key. The key is used to store and
* reference messages while they are being delivered. Hence the serverURI
* specified on the constructor must still be specified even if a list of
* servers is specified on an MqttConnectOptions object. The serverURI on the
* constructor must remain the same across restarts of the client for delivery
* of messages to be maintained from a given client to a given server or set of
* servers.
*
*
* The address of the server to connect to is specified as a URI. Two types of
* connection are supported tcp://
for a TCP connection and
* ssl://
for a TCP connection secured by SSL/TLS. For example:
*
tcp://localhost:1883
ssl://localhost:8883
* If the port is not specified, it will default to 1883 for
* tcp://
" URIs, and 8883 for ssl://
URIs.
*
* A client identifier clientId
must be specified and be less that
* 65535 characters. It must be unique across all clients connecting to the same
* server. The clientId is used by the server to store data related to the
* client, hence it is important that the clientId remain the same when
* connecting to a server if durable subscriptions or reliable messaging are
* required.
*
* As the client identifier is used by the server to identify a client when it * reconnects, the client must use the same identifier between connections if * durable subscriptions or reliable delivery of messages is required. *
** In Java SE, SSL can be configured in one of several ways, which the client * will use in the following order: *
*SSLSocketFactory
-
* applications can use
* {@link MqttConnectionOptions#setSocketFactory(SocketFactory)} to supply a
* factory with the appropriate SSL settings.* In Java ME, the platform settings are used for SSL connections. *
* ** An instance of the default persistence mechanism * {@link MqttDefaultFilePersistence} is used by the client. To specify a * different persistence mechanism or to turn off persistence, use the * {@link #MqttAsyncClient(String, String, MqttClientPersistence)} constructor. * * @param serverURI * the address of the server to connect to, specified as a URI. Can * be overridden using * {@link MqttConnectionOptions#setServerURIs(String[])} * @param clientId * a client identifier that is unique on the server being connected * to * @throws IllegalArgumentException * if the URI does not start with "tcp://", "ssl://" or "local://". * @throws IllegalArgumentException * if the clientId is null or is greater than 65535 characters in * length * @throws MqttException * if any other problem was encountered */ public MqttAsyncClient(String serverURI, String clientId) throws MqttException { this(serverURI, clientId, new MqttDefaultFilePersistence()); } /** * Create an MqttAsyncClient that is used to communicate with an MQTT server. *
* The address of a server can be specified on the constructor. Alternatively a * list containing one or more servers can be specified using the * {@link MqttConnectionOptions#setServerURIs(String[]) setServerURIs} method on * MqttConnectOptions. * *
* The serverURI
parameter is typically used with the the
* clientId
parameter to form a key. The key is used to store and
* reference messages while they are being delivered. Hence the serverURI
* specified on the constructor must still be specified even if a list of
* servers is specified on an MqttConnectOptions object. The serverURI on the
* constructor must remain the same across restarts of the client for delivery
* of messages to be maintained from a given client to a given server or set of
* servers.
*
*
* The address of the server to connect to is specified as a URI. Two types of
* connection are supported tcp://
for a TCP connection and
* ssl://
for a TCP connection secured by SSL/TLS. For example:
*
tcp://localhost:1883
ssl://localhost:8883
* If the port is not specified, it will default to 1883 for
* tcp://
" URIs, and 8883 for ssl://
URIs.
*
* A client identifier clientId
must be specified and be less that
* 65535 characters. It must be unique across all clients connecting to the same
* server. The clientId is used by the server to store data related to the
* client, hence it is important that the clientId remain the same when
* connecting to a server if durable subscriptions or reliable messaging are
* required.
*
* As the client identifier is used by the server to identify a client when it * reconnects, the client must use the same identifier between connections if * durable subscriptions or reliable delivery of messages is required. *
** In Java SE, SSL can be configured in one of several ways, which the client * will use in the following order: *
*SSLSocketFactory
-
* applications can use
* {@link MqttConnectionOptions#setSocketFactory(SocketFactory)} to supply a
* factory with the appropriate SSL settings.* In Java ME, the platform settings are used for SSL connections. *
** A persistence mechanism is used to enable reliable messaging. For messages * sent at qualities of service (QoS) 1 or 2 to be reliably delivered, messages * must be stored (on both the client and server) until the delivery of the * message is complete. If messages are not safely stored when being delivered * then a failure in the client or server can result in lost messages. A * pluggable persistence mechanism is supported via the * {@link MqttClientPersistence} interface. An implementer of this interface * that safely stores messages must be specified in order for delivery of * messages to be reliable. In addition * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false. In * the event that only QoS 0 messages are sent or received or cleanStart is set * to true then a safe store is not needed. *
*
* An implementation of file-based persistence is provided in class
* {@link MqttDefaultFilePersistence} which will work in all Java SE based
* systems. If no persistence is needed, the persistence parameter can be
* explicitly set to null
.
*
* The address of a server can be specified on the constructor. Alternatively a * list containing one or more servers can be specified using the * {@link MqttConnectionOptions#setServerURIs(String[]) setServerURIs} method on * MqttConnectOptions. * *
* The serverURI
parameter is typically used with the the
* clientId
parameter to form a key. The key is used to store and
* reference messages while they are being delivered. Hence the serverURI
* specified on the constructor must still be specified even if a list of
* servers is specified on an MqttConnectOptions object. The serverURI on the
* constructor must remain the same across restarts of the client for delivery
* of messages to be maintained from a given client to a given server or set of
* servers.
*
*
* The address of the server to connect to is specified as a URI. Two types of
* connection are supported tcp://
for a TCP connection and
* ssl://
for a TCP connection secured by SSL/TLS. For example:
*
tcp://localhost:1883
ssl://localhost:8883
* If the port is not specified, it will default to 1883 for
* tcp://
" URIs, and 8883 for ssl://
URIs.
*
* A client identifier clientId
must be specified and be less that
* 65535 characters. It must be unique across all clients connecting to the same
* server. The clientId is used by the server to store data related to the
* client, hence it is important that the clientId remain the same when
* connecting to a server if durable subscriptions or reliable messaging are
* required.
*
* As the client identifier is used by the server to identify a client when it * reconnects, the client must use the same identifier between connections if * durable subscriptions or reliable delivery of messages is required. *
** In Java SE, SSL can be configured in one of several ways, which the client * will use in the following order: *
*SSLSocketFactory
-
* applications can use
* {@link MqttConnectionOptions#setSocketFactory(SocketFactory)} to supply a
* factory with the appropriate SSL settings.* In Java ME, the platform settings are used for SSL connections. *
** A persistence mechanism is used to enable reliable messaging. For messages * sent at qualities of service (QoS) 1 or 2 to be reliably delivered, messages * must be stored (on both the client and server) until the delivery of the * message is complete. If messages are not safely stored when being delivered * then a failure in the client or server can result in lost messages. A * pluggable persistence mechanism is supported via the * {@link MqttClientPersistence} interface. An implementer of this interface * that safely stores messages must be specified in order for delivery of * messages to be reliable. In addition * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false. In * the event that only QoS 0 messages are sent or received or cleanStart is set * to true then a safe store is not needed. *
*
* An implementation of file-based persistence is provided in class
* {@link MqttDefaultFilePersistence} which will work in all Java SE based
* systems. If no persistence is needed, the persistence parameter can be
* explicitly set to null
.
*
* There are two alternative methods that should be used in preference to this * one when publishing a message: *
** When you build an application, the design of the topic tree should take into * account the following principles of topic name syntax and semantics: *
* ** The following principles apply to the construction and content of a topic * tree: *
* *By default, client sends * PingReq to server to keep the connection to server. For some platforms which * cannot use this mechanism, such as Android, developer needs to handle the * ping request manually with this method.
* * @throws MqttException for other errors encountered while publishing the * message. */ /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#checkPing(java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener) */ @Override public IMqttToken checkPing(Object userContext, MqttActionListener callback) throws MqttException { final String methodName = "ping"; MqttToken token; // @TRACE 117=> log.fine(CLASS_NAME, methodName, "117"); token = comms.checkForActivity(callback); // @TRACE 118=< log.fine(CLASS_NAME, methodName, "118"); return token; } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(java.lang.String, * int, java.lang.Object, org.eclipse.paho.mqttv5.client.MqttActionListener) */ @Override public IMqttToken subscribe(String topicFilter, int qos, Object userContext, MqttActionListener callback) throws MqttException { return this.subscribe(new MqttSubscription[] { new MqttSubscription(topicFilter, qos) }, userContext, callback, new MqttProperties()); } @Override public IMqttToken subscribe(String[] topicFilters, int[] qoss, Object userContext, MqttActionListener callback) throws MqttException { MqttSubscription[] subs = new MqttSubscription[topicFilters.length]; for (int i = 0; i < topicFilters.length; ++ i) { subs[i] = new MqttSubscription(topicFilters[i], qoss[i]); } return this.subscribe(subs, userContext, callback, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(java.lang.String, * int) */ @Override public IMqttToken subscribe(String topicFilter, int qos) throws MqttException { return this.subscribe(new MqttSubscription[] { new MqttSubscription(topicFilter, qos) }, null, null, new MqttProperties()); } @Override public IMqttToken subscribe(String[] topicFilters, int[] qoss) throws MqttException { return this.subscribe(topicFilters, qoss, null, null); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(java.lang.String, * int) */ @Override public IMqttToken subscribe(MqttSubscription subscription) throws MqttException { return this.subscribe(new MqttSubscription[] { subscription }, null, null, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription[]) */ @Override public IMqttToken subscribe(MqttSubscription[] subscriptions) throws MqttException { return this.subscribe(subscriptions, null, null, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription[], java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback, MqttProperties subscriptionProperties) throws MqttException { // remove any message handlers for individual topics and validate Topics for (MqttSubscription subscription : subscriptions) { this.comms.removeMessageListener(subscription.getTopic()); // Check if the topic filter is valid before subscribing MqttTopicValidator.validate(subscription.getTopic(), this.mqttConnection.isWildcardSubscriptionsAvailable(), this.mqttConnection.isSharedSubscriptionsAvailable()); } return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties); } private IMqttToken subscribeBase(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback, MqttProperties subscriptionProperties) throws MqttException { final String methodName = "subscribe"; // Only Generate Log string if we are logging at FINE level if (log.isLoggable(Logger.FINE)) { StringBuffer subs = new StringBuffer(); for (int i = 0; i < subscriptions.length; i++) { if (i > 0) { subs.append(", "); } subs.append(subscriptions[i].toString()); } // @TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2} log.fine(CLASS_NAME, methodName, "106", new Object[] { subs.toString(), userContext, callback }); } MqttToken token = new MqttToken(getClientId()); token.setActionCallback(callback); token.setUserContext(userContext); // TODO - Somehow refactor this.... // token.internalTok.setTopics(topicFilters); // TODO - Build up MQTT Subscriptions properly here MqttSubscribe register = new MqttSubscribe(subscriptions, subscriptionProperties); token.setRequestMessage(register); comms.sendNoWait(register, token); // @TRACE 109=< log.fine(CLASS_NAME, methodName, "109"); return token; } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription, java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.client.IMqttMessageListener, * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken subscribe(MqttSubscription mqttSubscription, Object userContext, MqttActionListener callback, IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException { return this.subscribe(new MqttSubscription[] { mqttSubscription }, userContext, callback, messageListener, subscriptionProperties); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription, * org.eclipse.paho.mqttv5.client.IMqttMessageListener) */ @Override public IMqttToken subscribe(MqttSubscription subscription, IMqttMessageListener messageListener) throws MqttException { return this.subscribe(new MqttSubscription[] { subscription }, null, null, messageListener, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription[], * org.eclipse.paho.mqttv5.client.IMqttMessageListener) */ @Override public IMqttToken subscribe(MqttSubscription[] subscriptions, IMqttMessageListener messageListener) throws MqttException { return this.subscribe(subscriptions, null, null, messageListener, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription[], java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.client.IMqttMessageListener[], * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback, IMqttMessageListener[] messageListeners, MqttProperties subscriptionProperties) throws MqttException { // add message handlers to the list for this client for (int i = 0; i < subscriptions.length; ++i) { MqttTopicValidator.validate(subscriptions[i].getTopic(), this.mqttConnection.isWildcardSubscriptionsAvailable(), this.mqttConnection.isSharedSubscriptionsAvailable()); if (messageListeners == null || messageListeners[i] == null) { this.comms.removeMessageListener(subscriptions[i].getTopic()); } else { this.comms.setMessageListener(null, subscriptions[i].getTopic(), messageListeners[i]); } } IMqttToken token = null; try { token = this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties); } catch(Exception e) { // if the subscribe fails, then we have to remove the message handlers for (MqttSubscription subscription : subscriptions) { this.comms.removeMessageListener(subscription.getTopic()); } throw e; } return token; } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#subscribe(org.eclipse.paho. * mqttv5.common.MqttSubscription[], java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.client.IMqttMessageListener, * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback, IMqttMessageListener messageListener, MqttProperties subscriptionProperties) throws MqttException { int subId = 0; try { subId = subscriptionProperties.getSubscriptionIdentifiers().get(0); } catch (IndexOutOfBoundsException e) { log.fine(CLASS_NAME, "subscribe", "No sub subscription property(s)"); } // Automatic Subscription Identifier Assignment is enabled if (connOpts.useSubscriptionIdentifiers() && this.mqttConnection.isSubscriptionIdentifiersAvailable()) { // Application is overriding the subscription Identifier if (subId != 0) { // Check that we are not already using this ID, else throw Illegal Argument // Exception if (this.comms.doesSubscriptionIdentifierExist(subId)) { throw new IllegalArgumentException( String.format("The Subscription Identifier %s already exists.", subId)); } } else { // Automatically assign new ID and link to callback. subId = this.mqttSession.getNextSubscriptionIdentifier(); } } // add message handlers to the list for this client for (MqttSubscription subscription : subscriptions) { MqttTopicValidator.validate(subscription.getTopic(), this.mqttConnection.isWildcardSubscriptionsAvailable(), this.mqttConnection.isSharedSubscriptionsAvailable()); if (messageListener == null) { this.comms.removeMessageListener(subscription.getTopic()); } else { this.comms.setMessageListener(subId, subscription.getTopic(), messageListener); } } IMqttToken token = null; try { token = this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties); } catch(Exception e) { // if the subscribe fails, then we have to remove the message handlers for (MqttSubscription subscription : subscriptions) { this.comms.removeMessageListener(subscription.getTopic()); } throw e; } return token; } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#unsubscribe(java.lang.String, * java.lang.Object, org.eclipse.paho.mqttv5.client.MqttActionListener) */ @Override public IMqttToken unsubscribe(String topicFilter, Object userContext, MqttActionListener callback) throws MqttException { return unsubscribe(new String[] { topicFilter }, userContext, callback, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#unsubscribe(java.lang.String) */ @Override public IMqttToken unsubscribe(String topicFilter) throws MqttException { return unsubscribe(new String[] { topicFilter }, null, null, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#unsubscribe(java.lang.String[ * ]) */ @Override public IMqttToken unsubscribe(String[] topicFilters) throws MqttException { return unsubscribe(topicFilters, null, null, new MqttProperties()); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#unsubscribe(java.lang.String[ * ], java.lang.Object, org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken unsubscribe(String[] topicFilters, Object userContext, MqttActionListener callback, MqttProperties unsubscribeProperties) throws MqttException { final String methodName = "unsubscribe"; // Only Generate Log string if we are logging at FINE level if (log.isLoggable(Logger.FINE)) { String subs = ""; for (int i = 0; i < topicFilters.length; i++) { if (i > 0) { subs += ", "; } subs += topicFilters[i]; } // @TRACE 107=Unsubscribe topic={0} userContext={1} callback={2} log.fine(CLASS_NAME, methodName, "107", new Object[] { subs, userContext, callback }); } for (String topicFilter : topicFilters) { // Check if the topic filter is valid before unsubscribing // Although we already checked when subscribing, but invalid // topic filter is meanless for unsubscribing, just prohibit it // to reduce unnecessary control packet send to broker. MqttTopicValidator.validate(topicFilter, true/* allow wildcards */, this.mqttConnection.isSharedSubscriptionsAvailable()); } // remove message handlers from the list for this client for (String topicFilter : topicFilters) { this.comms.removeMessageListener(topicFilter); } MqttToken token = new MqttToken(getClientId()); token.setActionCallback(callback); token.setUserContext(userContext); token.internalTok.setTopics(topicFilters); MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters, unsubscribeProperties); token.setRequestMessage(unregister); comms.sendNoWait(unregister, token); // @TRACE 110=< log.fine(CLASS_NAME, methodName, "110"); return token; } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#setCallback(org.eclipse.paho. * mqttv5.client.MqttCallback) */ @Override public void setCallback(MqttCallback callback) { this.mqttCallback = callback; comms.setCallback(callback); } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#setManualAcks(boolean) */ @Override public void setManualAcks(boolean manualAcks) { comms.setManualAcks(manualAcks); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#messageArrivedComplete(int, * int) */ @Override public void messageArrivedComplete(int messageId, int qos) throws MqttException { comms.messageArrivedComplete(messageId, qos); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#getPendingTokens() */ @Override public IMqttToken[] getPendingTokens() { return comms.getPendingTokens(); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#publish(java.lang.String, * byte[], int, boolean, java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener) */ @Override public IMqttToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, MqttActionListener callback) throws MqttException, MqttPersistenceException { MqttMessage message = new MqttMessage(payload); message.setProperties(new MqttProperties()); message.setQos(qos); message.setRetained(retained); return this.publish(topic, message, userContext, callback); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#publish(java.lang.String, * byte[], int, boolean) */ @Override public IMqttToken publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { return this.publish(topic, payload, qos, retained, null, null); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#publish(java.lang.String, * org.eclipse.paho.mqttv5.common.MqttMessage) */ @Override public IMqttToken publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { return this.publish(topic, message, null, null); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#publish(java.lang.String, * org.eclipse.paho.mqttv5.common.MqttMessage, java.lang.Object, * org.eclipse.paho.mqttv5.client.MqttActionListener, * org.eclipse.paho.mqttv5.common.packet.MqttProperties) */ @Override public IMqttToken publish(String topic, MqttMessage message, Object userContext, MqttActionListener callback) throws MqttException, MqttPersistenceException { final String methodName = "publish"; // @TRACE 111=< topic={0} message={1}userContext={1} callback={2} log.fine(CLASS_NAME, methodName, "111", new Object[] { topic, userContext, callback }); // Checks if a topic is valid when publishing a message. MqttTopicValidator.validate(topic, false/* wildcards NOT allowed */, true); MqttToken token = new MqttToken(getClientId()); token.internalTok.setDeliveryToken(true); token.setActionCallback(callback); token.setUserContext(userContext); token.setMessage(message); token.internalTok.setTopics(new String[] { topic }); MqttPublish pubMsg = new MqttPublish(topic, message, message.getProperties()); token.setRequestMessage(pubMsg); comms.sendNoWait(pubMsg, token); // @TRACE 112=< log.fine(CLASS_NAME, methodName, "112"); return token; } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#reconnect() */ @Override public void reconnect() throws MqttException { final String methodName = "reconnect"; // @Trace 500=Attempting to reconnect client: {0} log.fine(CLASS_NAME, methodName, "500", new Object[] { this.mqttSession.getClientId() }); // Some checks to make sure that we're not attempting to reconnect an // already connected client if (comms.isConnected()) { throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_CONNECTED); } if (comms.isConnecting()) { throw new MqttException(MqttClientException.REASON_CODE_CONNECT_IN_PROGRESS); } if (comms.isDisconnecting()) { throw new MqttException(MqttClientException.REASON_CODE_CLIENT_DISCONNECTING); } if (comms.isClosed()) { throw new MqttException(MqttClientException.REASON_CODE_CLIENT_CLOSED); } // We don't want to spam the server stopReconnectCycle(); attemptReconnect(); } /** * Attempts to reconnect the client to the server. If successful it will make * sure that there are no further reconnects scheduled. However if the connect * fails, the delay will double up to 128 seconds and will re-schedule the * reconnect for after the delay. * * Any thrown exceptions are logged but not acted upon as it is assumed that * they are being thrown due to the server being offline and so reconnect * attempts will continue. */ private void attemptReconnect() { final String methodName = "attemptReconnect"; // @Trace 500=Attempting to reconnect client: {0} log.fine(CLASS_NAME, methodName, "500", new Object[] { this.mqttSession.getClientId() }); try { connect(this.connOpts, this.userContext, new MqttReconnectActionListener(methodName)); } catch (MqttSecurityException ex) { // @TRACE 804=exception log.fine(CLASS_NAME, methodName, "804", null, ex); } catch (MqttException ex) { // @TRACE 804=exception log.fine(CLASS_NAME, methodName, "804", null, ex); } } private void startReconnectCycle() { String methodName = "startReconnectCycle"; // @Trace 503=Start reconnect timer for client: {0}, delay: {1} log.fine(CLASS_NAME, methodName, "503", new Object[] { this.mqttSession.getClientId(), Long.valueOf(reconnectDelay) }); reconnectTimer = new Timer("MQTT Reconnect: " + this.mqttSession.getClientId()); reconnectTimer.schedule(new ReconnectTask(), reconnectDelay); } private void stopReconnectCycle() { String methodName = "stopReconnectCycle"; // @Trace 504=Stop reconnect timer for client: {0} log.fine(CLASS_NAME, methodName, "504", new Object[] { this.mqttSession.getClientId() }); synchronized (clientLock) { if (this.connOpts.isAutomaticReconnect()) { if (reconnectTimer != null) { reconnectTimer.cancel(); reconnectTimer = null; } reconnectDelay = 1000; // Reset Delay Timer } } } private class ReconnectTask extends TimerTask { private static final String methodName = "ReconnectTask.run"; public void run() { // @Trace 506=Triggering Automatic Reconnect attempt. log.fine(CLASS_NAME, methodName, "506"); attemptReconnect(); } } class MqttReconnectCallback implements MqttCallback { final boolean automaticReconnect; MqttReconnectCallback(boolean isAutomaticReconnect) { automaticReconnect = isAutomaticReconnect; } public void messageArrived(String topic, MqttMessage message) throws Exception { } public void deliveryComplete(IMqttToken token) { } public void connectComplete(boolean reconnect, String serverURI) { } public void disconnected(MqttDisconnectResponse disconnectResponse) { if (automaticReconnect) { // Automatic reconnect is set so make sure comms is in resting // state comms.setRestingState(true); reconnecting = true; startReconnectCycle(); } } public void mqttErrorOccurred(MqttException exception) { } public void authPacketArrived(int reasonCode, MqttProperties properties) { } } class MqttReconnectActionListener implements MqttActionListener { final String methodName; MqttReconnectActionListener(String methodName) { this.methodName = methodName; } public void onSuccess(IMqttToken asyncActionToken) { // @Trace 501=Automatic Reconnect Successful: {0} log.fine(CLASS_NAME, methodName, "501", new Object[] { asyncActionToken.getClient().getClientId() }); comms.setRestingState(false); stopReconnectCycle(); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { // @Trace 502=Automatic Reconnect failed, rescheduling: {0} log.fine(CLASS_NAME, methodName, "502", new Object[] { asyncActionToken.getClient().getClientId() }); if (reconnectDelay < connOpts.getMaxReconnectDelay()) { reconnectDelay = reconnectDelay * 2; } rescheduleReconnectCycle(reconnectDelay); } private void rescheduleReconnectCycle(int delay) { String reschedulemethodName = methodName + ":rescheduleReconnectCycle"; // @Trace 505=Rescheduling reconnect timer for client: {0}, delay: // {1} log.fine(CLASS_NAME, reschedulemethodName, "505", new Object[] { MqttAsyncClient.this.mqttSession.getClientId(), String.valueOf(reconnectDelay) }); synchronized (clientLock) { if (MqttAsyncClient.this.connOpts.isAutomaticReconnect()) { if (reconnectTimer != null) { reconnectTimer.schedule(new ReconnectTask(), delay); } else { // The previous reconnect timer was cancelled reconnectDelay = delay; startReconnectCycle(); } } } } } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#setBufferOpts(org.eclipse. * paho.mqttv5.client.DisconnectedBufferOptions) */ @Override public void setBufferOpts(DisconnectedBufferOptions bufferOpts) { this.comms.setDisconnectedMessageBuffer(new DisconnectedMessageBuffer(bufferOpts)); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#getBufferedMessageCount() */ @Override public int getBufferedMessageCount() { return this.comms.getBufferedMessageCount(); } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#getBufferedMessage(int) */ @Override public MqttMessage getBufferedMessage(int bufferIndex) { return this.comms.getBufferedMessage(bufferIndex); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#deleteBufferedMessage(int) */ @Override public void deleteBufferedMessage(int bufferIndex) { this.comms.deleteBufferedMessage(bufferIndex); } /* * (non-Javadoc) * * @see * org.eclipse.paho.mqttv5.client.IMqttAsyncClient#getInFlightMessageCount() */ @Override public int getInFlightMessageCount() { return this.comms.getActualInFlight(); } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#close() */ @Override public void close() throws MqttException { close(false); } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#close(boolean) */ @Override public void close(boolean force) throws MqttException { final String methodName = "close"; // @TRACE 113=< log.fine(CLASS_NAME, methodName, "113"); comms.close(force); // @TRACE 114=> log.fine(CLASS_NAME, methodName, "114"); } /* * (non-Javadoc) * * @see org.eclipse.paho.mqttv5.client.IMqttAsyncClient#getDebug() */ @Override public Debug getDebug() { return new Debug(this.mqttSession.getClientId(), comms); } @Override public IMqttToken authenticate(int reasonCode, Object userContext, MqttProperties properties) throws MqttException { MqttToken token = new MqttToken(getClientId()); token.setUserContext(userContext); MqttAuth auth = new MqttAuth(reasonCode, properties); comms.sendNoWait(auth, token); return null; } }