This commit is contained in:
candi
2025-09-04 04:43:31 +08:00
parent 2723b0ddd3
commit 212c19d40a
136 changed files with 96 additions and 766 deletions

View File

@@ -0,0 +1,978 @@
package org.eclipse.paho.mqttv5.client.internal;
/*******************************************************************************
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
* Ian Craggs - per subscription message handlers (bug 466579)
* Ian Craggs - ack control (bug 472172)
* James Sutton - checkForActivity Token (bug 473928)
* James Sutton - Automatic Reconnect & Offline Buffering.
*/
import java.util.Enumeration;
import java.util.Properties;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import org.eclipse.paho.mqttv5.client.BufferedMessage;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttClientInterface;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttPingSender;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.MqttTopic;
import org.eclipse.paho.mqttv5.client.TimerPingSender;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck;
import org.eclipse.paho.mqttv5.common.packet.MqttConnect;
import org.eclipse.paho.mqttv5.common.packet.MqttDisconnect;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttPublish;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
/**
* Handles client communications with the server. Sends and receives MQTT V5
* messages.
*/
public class ClientComms {
public static String VERSION = "${project.version}";
public static String BUILD_LEVEL = "L${build.level}";
private static final String CLASS_NAME = ClientComms.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private static final byte CONNECTED = 0;
private static final byte CONNECTING = 1;
private static final byte DISCONNECTING = 2;
private static final byte DISCONNECTED = 3;
private static final byte CLOSED = 4;
private MqttClientInterface client;
private int networkModuleIndex;
private NetworkModule[] networkModules;
private CommsReceiver receiver;
private CommsSender sender;
private CommsCallback callback;
private ClientState clientState;
private MqttConnectionOptions conOptions;
private MqttClientPersistence persistence;
private MqttPingSender pingSender;
private CommsTokenStore tokenStore;
private boolean stoppingComms = false;
private byte conState = DISCONNECTED;
private final Object conLock = new Object(); // Used to synchronize connection state
private boolean closePending = false;
private boolean resting = false;
private DisconnectedMessageBuffer disconnectedMessageBuffer;
private ExecutorService executorService;
private MqttConnectionState mqttConnection;
/**
* Creates a new ClientComms object, using the specified module to handle the
* network calls.
*
* @param client
* The {@link MqttClientInterface}
* @param persistence
* the {@link MqttClientPersistence} layer.
* @param pingSender
* the {@link TimerPingSender}
* @param executorService
* the {@link ExecutorService}
* @param mqttSession
* the {@link MqttSessionState}
* @param mqttConnection
* the {@link MqttConnectionState}
* @throws MqttException
* if an exception occurs whilst communicating with the server
*/
public ClientComms(MqttClientInterface client, MqttClientPersistence persistence, MqttPingSender pingSender,
ExecutorService executorService, MqttSessionState mqttSession, MqttConnectionState mqttConnection) throws MqttException {
this.conState = DISCONNECTED;
this.client = client;
this.persistence = persistence;
this.pingSender = pingSender;
this.pingSender.init(this);
this.executorService = executorService;
this.mqttConnection = mqttConnection;
this.tokenStore = new CommsTokenStore(getClient().getClientId());
this.callback = new CommsCallback(this);
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender, this.mqttConnection);
callback.setClientState(clientState);
log.setResourceName(getClient().getClientId());
}
CommsReceiver getReceiver() {
return receiver;
}
/**
* Sends a message to the server. Does not check if connected this validation
* must be done by invoking routines.
*
* @param message
* @param token
* @throws MqttException
*/
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "internalSend";
// @TRACE 200=internalSend key={0} message={1} token={2}
log.fine(CLASS_NAME, methodName, "200", new Object[] { message.getKey(), message, token });
if (token.getClient() == null) {
// Associate the client with the token - also marks it as in use.
token.internalTok.setClient(getClient());
} else {
// Token is already in use - cannot reuse
// @TRACE 213=fail: token in use: key={0} message={1} token={2}
log.fine(CLASS_NAME, methodName, "213", new Object[] { message.getKey(), message, token });
throw new MqttException(MqttClientException.REASON_CODE_TOKEN_INUSE);
}
try {
// Persist if needed and send the message
this.clientState.send(message, token);
} catch (MqttException e) {
token.internalTok.setClient(null); // undo client setting on error
if (message instanceof MqttPublish) {
this.clientState.undo((MqttPublish) message);
}
throw e;
}
}
/**
* Sends a message to the broker if in connected state, but only waits for the
* message to be stored, before returning.
*
* @param message
* The {@link MqttWireMessage} to send
* @param token
* The {@link MqttToken} to send.
* @throws MqttException
* if an error occurs sending the message
*/
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "sendNoWait";
if (isConnected() || (!isConnected() && message instanceof MqttConnect)
|| (isDisconnecting() && message instanceof MqttDisconnect)) {
if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
// @TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
// message to buffer. message={0}
log.fine(CLASS_NAME, methodName, "507", new Object[] { message.getKey() });
// If the message is a publish, strip the topic alias:
if(message instanceof MqttPublish && message.getProperties().getTopicAlias()!= null) {
MqttProperties messageProps = message.getProperties();
messageProps.setTopicAlias(null);
message.setProperties(messageProps);
}
if (disconnectedMessageBuffer.isPersistBuffer()) {
this.clientState.persistBufferedMessage(message);
}
disconnectedMessageBuffer.putMessage(message, token);
} else {
if (message instanceof MqttPublish) {
// Override the QoS if the server has set a maximum
if (this.mqttConnection.getMaximumQoS() != null
&& ((MqttPublish) message).getMessage().getQos() > this.mqttConnection.getMaximumQoS()) {
MqttMessage mqttMessage = ((MqttPublish) message).getMessage();
mqttMessage.setQos(this.mqttConnection.getMaximumQoS());
((MqttPublish) message).setMessage(mqttMessage);
}
// Override the Retain flag if the server has disabled it
if (this.mqttConnection.isRetainAvailable() != null
&& ((MqttPublish) message).getMessage().isRetained()
&& (this.mqttConnection.isRetainAvailable() == false)) {
MqttMessage mqttMessage = ((MqttPublish) message).getMessage();
mqttMessage.setRetained(false);
((MqttPublish) message).setMessage(mqttMessage);
}
}
this.internalSend(message, token);
}
} else if (disconnectedMessageBuffer != null && isResting()) {
// @TRACE 508=Client Resting, Offline Buffer available. Adding message to
// buffer. message={0}
log.fine(CLASS_NAME, methodName, "508", new Object[] { message.getKey() });
if (disconnectedMessageBuffer.isPersistBuffer()) {
this.clientState.persistBufferedMessage(message);
}
disconnectedMessageBuffer.putMessage(message, token);
} else {
// @TRACE 208=failed: not connected
log.fine(CLASS_NAME, methodName, "208");
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
/**
* Close and tidy up.
*
* Call each main class and let it tidy up e.g. releasing the token store which
* normally survives a disconnect.
*
* @param force
* force disconnection
* @throws MqttException
* if not disconnected
*/
public void close(boolean force) throws MqttException {
final String methodName = "close";
synchronized (conLock) {
if (!isClosed()) {
// Must be disconnected before close can take place or if we are being forced
if (!isDisconnected() || force) {
// @TRACE 224=failed: not disconnected
log.fine(CLASS_NAME, methodName, "224");
if (isConnecting()) {
throw new MqttException(MqttClientException.REASON_CODE_CONNECT_IN_PROGRESS);
} else if (isConnected()) {
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_CONNECTED);
} else if (isDisconnecting()) {
closePending = true;
return;
}
}
conState = CLOSED;
// Don't shut down an externally supplied executor service
//shutdownExecutorService();
// ShutdownConnection has already cleaned most things
clientState.close();
clientState = null;
callback = null;
persistence = null;
sender = null;
pingSender = null;
receiver = null;
networkModules = null;
conOptions = null;
tokenStore = null;
}
}
}
/**
* Sends a connect message and waits for an ACK or NACK. Connecting is a special
* case which will also start up the network connection, receive thread, and
* keep alive thread.
*
* @param options
* The {@link MqttConnectionOptions} for the connection
* @param token
* The {@link MqttToken} to track the connection
* @throws MqttException
* if an error occurs when connecting
*/
public void connect(MqttConnectionOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
// @TRACE 214=state=CONNECTING
log.fine(CLASS_NAME, methodName, "214");
conState = CONNECTING;
conOptions = options;
MqttConnect connect = new MqttConnect(client.getClientId(), conOptions.getMqttVersion(),
conOptions.isCleanStart(), conOptions.getKeepAliveInterval(),
conOptions.getConnectionProperties(), conOptions.getWillMessageProperties());
if (conOptions.getWillDestination() != null) {
connect.setWillDestination(conOptions.getWillDestination());
}
if (conOptions.getWillMessage() != null) {
connect.setWillMessage(conOptions.getWillMessage());
}
if (conOptions.getUserName() != null) {
connect.setUserName(conOptions.getUserName());
}
if (conOptions.getPassword() != null) {
connect.setPassword(conOptions.getPassword());
}
/*
* conOptions.getUserName(), conOptions.getPassword(),
* conOptions.getWillMessage(), conOptions.getWillDestination()
*/
this.mqttConnection.setKeepAliveSeconds(conOptions.getKeepAliveInterval());
this.clientState.setCleanStart(conOptions.isCleanStart());
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
conbg.start();
} else {
// @TRACE 207=connect failed: not disconnected {0}
log.fine(CLASS_NAME, methodName, "207", new Object[] { Byte.valueOf(conState) });
if (isClosed() || closePending) {
throw new MqttException(MqttClientException.REASON_CODE_CLIENT_CLOSED);
} else if (isConnecting()) {
throw new MqttException(MqttClientException.REASON_CODE_CONNECT_IN_PROGRESS);
} else if (isDisconnecting()) {
throw new MqttException(MqttClientException.REASON_CODE_CLIENT_DISCONNECTING);
} else {
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_CONNECTED);
}
}
}
}
public void connectComplete(MqttConnAck cack, MqttException mex) throws MqttException {
final String methodName = "connectComplete";
int rc = cack.getReturnCode();
synchronized (conLock) {
if (rc == 0) {
// We've successfully connected
// @TRACE 215=state=CONNECTED
log.fine(CLASS_NAME, methodName, "215");
conState = CONNECTED;
return;
}
}
// @TRACE 204=connect failed: rc={0}
log.fine(CLASS_NAME, methodName, "204", new Object[] { Integer.valueOf(rc) });
throw mex;
}
/**
* Shuts down the connection to the server. This may have been invoked as a
* result of a user calling disconnect or an abnormal disconnection. The method
* may be invoked multiple times in parallel as each thread when it receives an
* error uses this method to ensure that shutdown completes successfully.
*
* @param token
* the {@link MqttToken} To track closing the connection
* @param reason
* the {@link MqttException} thrown requiring the connection to be
* shut down.
* @param message
* the {@link MqttDisconnect} that triggered the connection to be
* shut down.
*/
public void shutdownConnection(MqttToken token, MqttException reason, MqttDisconnect message) {
final String methodName = "shutdownConnection";
boolean wasConnected;
MqttToken endToken = null; // Token to notify after disconnect completes
// This method could concurrently be invoked from many places only allow it
// to run once.
synchronized (conLock) {
if (stoppingComms || closePending || isClosed()) {
return;
}
stoppingComms = true;
// @TRACE 216=state=DISCONNECTING
log.fine(CLASS_NAME, methodName, "216");
wasConnected = (isConnected() || isDisconnecting());
conState = DISCONNECTING;
}
// Update the token with the reason for shutdown if it
// is not already complete.
if (token != null && !token.isComplete()) {
token.internalTok.setException(reason);
}
// Stop the thread that is used to call the user back
// when actions complete
if (callback != null) {
callback.stop();
}
// Stop the thread that handles inbound work from the network
if (receiver != null) {
receiver.stop();
}
// Stop the network module, send and receive now not possible
try {
if (networkModules != null) {
NetworkModule networkModule = networkModules[networkModuleIndex];
if (networkModule != null) {
networkModule.stop();
}
}
} catch (Exception ioe) {
// Ignore as we are shutting down
}
// Stop any new tokens being saved by app and throwing an exception if they do
tokenStore.quiesce(new MqttException(MqttClientException.REASON_CODE_CLIENT_DISCONNECTING));
// Notify any outstanding tokens with the exception of
// con or discon which may be returned and will be notified at
// the end
endToken = handleOldTokens(token, reason);
try {
// Clean session handling and tidy up
clientState.disconnected(reason);
if (clientState.getCleanStart())
callback.removeMessageListeners();
} catch (Exception ex) {
// Ignore as we are shutting down
}
if (sender != null) {
sender.stop();
}
if (pingSender != null) {
pingSender.stop();
}
try {
if (disconnectedMessageBuffer == null && persistence != null) {
persistence.close();
}
} catch (Exception ex) {
// Ignore as we are shutting down
}
// All disconnect logic has been completed allowing the
// client to be marked as disconnected.
synchronized (conLock) {
// @TRACE 217=state=DISCONNECTED
log.fine(CLASS_NAME, methodName, "217");
conState = DISCONNECTED;
stoppingComms = false;
}
// Internal disconnect processing has completed. If there
// is a disconnect token or a connect in error notify
// it now. This is done at the end to allow a new connect
// to be processed and now throw a currently disconnecting error.
// any outstanding tokens and unblock any waiters
if (endToken != null && callback != null) {
callback.asyncOperationComplete(endToken);
}
if (wasConnected && callback != null) {
// Let the user know client has disconnected either normally or abnormally
callback.connectionLost(reason, message);
}
// While disconnecting, close may have been requested - try it now
synchronized (conLock) {
if (closePending) {
try {
close(true);
} catch (Exception e) { // ignore any errors as closing
}
}
}
}
// Tidy up. There may be tokens outstanding as the client was
// not disconnected/quiseced cleanly! Work out what tokens still
// need to be notified and waiters unblocked. Store the
// disconnect or connect token to notify after disconnect is
// complete.
private MqttToken handleOldTokens(MqttToken token, MqttException reason) {
final String methodName = "handleOldTokens";
// @TRACE 222=>
log.fine(CLASS_NAME, methodName, "222");
MqttToken tokToNotifyLater = null;
try {
// First the token that was related to the disconnect / shutdown may
// not be in the token table - temporarily add it if not
if (token != null) {
if (tokenStore.getToken(token.internalTok.getKey()) == null) {
tokenStore.saveToken(token, token.internalTok.getKey());
}
}
Vector<MqttToken> toksToNot = clientState.resolveOldTokens(reason);
Enumeration<MqttToken> toksToNotE = toksToNot.elements();
while (toksToNotE.hasMoreElements()) {
MqttToken tok = (MqttToken) toksToNotE.nextElement();
if (tok.internalTok.getKey().equals(MqttDisconnect.KEY)
|| tok.internalTok.getKey().equals(MqttConnect.KEY)) {
// Its con or discon so remember and notify @ end of disc routine
tokToNotifyLater = tok;
} else {
// notify waiters and callbacks of outstanding tokens
// that a problem has occurred and disconnect is in
// progress
callback.asyncOperationComplete(tok);
}
}
} catch (Exception ex) {
// Ignore as we are shutting down
}
return tokToNotifyLater;
}
public void disconnect(MqttDisconnect disconnect, long quiesceTimeout, MqttToken token) throws MqttException {
final String methodName = "disconnect";
synchronized (conLock) {
if (isClosed()) {
// @TRACE 223=failed: in closed state
log.fine(CLASS_NAME, methodName, "223");
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_CLOSED);
} else if (isDisconnected()) {
// @TRACE 211=failed: already disconnected
log.fine(CLASS_NAME, methodName, "211");
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED);
} else if (isDisconnecting()) {
// @TRACE 219=failed: already disconnecting
log.fine(CLASS_NAME, methodName, "219");
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_DISCONNECTING);
} else if (Thread.currentThread() == callback.getThread()) {
// @TRACE 210=failed: called on callback thread
log.fine(CLASS_NAME, methodName, "210");
// Not allowed to call disconnect() from the callback, as it will deadlock.
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_DISCONNECT_PROHIBITED);
}
// @TRACE 218=state=DISCONNECTING
log.fine(CLASS_NAME, methodName, "218");
conState = DISCONNECTING;
DisconnectBG discbg = new DisconnectBG(disconnect, quiesceTimeout, token, executorService);
discbg.start();
}
}
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, int reasonCode,
MqttProperties disconnectProperties) throws MqttException {
disconnectForcibly(quiesceTimeout, disconnectTimeout, true, reasonCode, disconnectProperties);
}
/**
* Disconnect the connection and reset all the states.
*
* @param quiesceTimeout
* How long to wait whilst quiesing before messages are deleted.
* @param disconnectTimeout
* How long to wait whilst disconnecting
* @param sendDisconnectPacket
* If true, will send a disconnect packet
* @param reasonCode
* the disconnection reason code.
* @param disconnectProperties
* the {@link MqttProperties} to send in the Disconnect packet.
* @throws MqttException
* if an error occurs whilst disconnecting
*/
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, boolean sendDisconnectPacket,
int reasonCode, MqttProperties disconnectProperties) throws MqttException {
conState = DISCONNECTING;
// Allow current inbound and outbound work to complete
if (clientState != null) {
clientState.quiesce(quiesceTimeout);
}
MqttToken token = new MqttToken(client.getClientId());
try {
// Send disconnect packet
if (sendDisconnectPacket) {
internalSend(new MqttDisconnect(reasonCode, disconnectProperties), token);
// Wait util the disconnect packet sent with timeout
token.waitForCompletion(disconnectTimeout);
}
} catch (Exception ex) {
// ignore, probably means we failed to send the disconnect packet.
} finally {
token.internalTok.markComplete(null, null);
shutdownConnection(token, null, null);
}
}
public boolean isConnected() {
synchronized (conLock) {
return conState == CONNECTED;
}
}
public boolean isConnecting() {
synchronized (conLock) {
return conState == CONNECTING;
}
}
public boolean isDisconnected() {
synchronized (conLock) {
return conState == DISCONNECTED;
}
}
public boolean isDisconnecting() {
synchronized (conLock) {
return conState == DISCONNECTING;
}
}
public boolean isClosed() {
synchronized (conLock) {
return conState == CLOSED;
}
}
public boolean isResting() {
synchronized (conLock) {
return resting;
}
}
public void setCallback(MqttCallback mqttCallback) {
this.callback.setCallback(mqttCallback);
}
public void setReconnectCallback(MqttCallback callback) {
this.callback.setReconnectCallback(callback);
}
public void setManualAcks(boolean manualAcks) {
this.callback.setManualAcks(manualAcks);
}
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
this.callback.messageArrivedComplete(messageId, qos);
}
public void setMessageListener(Integer subscriptionId, String topicFilter, IMqttMessageListener messageListener) {
this.callback.setMessageListener(subscriptionId, topicFilter, messageListener);
}
public void removeMessageListener(String topicFilter) {
this.callback.removeMessageListener(topicFilter);
}
protected MqttTopic getTopic(String topic) {
return new MqttTopic(topic, this);
}
public void setNetworkModuleIndex(int index) {
this.networkModuleIndex = index;
}
public int getNetworkModuleIndex() {
return networkModuleIndex;
}
public NetworkModule[] getNetworkModules() {
return networkModules;
}
public void setNetworkModules(NetworkModule[] networkModules) {
this.networkModules = networkModules;
}
public MqttToken[] getPendingTokens() {
return tokenStore.getOutstandingDelTokens();
}
protected void deliveryComplete(MqttPublish msg) throws MqttPersistenceException {
this.clientState.deliveryComplete(msg);
}
protected void deliveryComplete(int messageId) throws MqttPersistenceException {
this.clientState.deliveryComplete(messageId);
}
public MqttClientInterface getClient() {
return client;
}
public long getKeepAlive() {
return this.mqttConnection.getKeepAlive();
}
public MqttState getClientState() {
return clientState;
}
public MqttConnectionOptions getConOptions() {
return conOptions;
}
public Properties getDebug() {
Properties props = new Properties();
props.put("conState", Integer.valueOf(conState));
props.put("serverURI", getClient().getServerURI());
props.put("callback", callback);
props.put("stoppingComms", Boolean.valueOf(stoppingComms));
return props;
}
// Kick off the connect processing in the background so that it does not block.
// For instance
// the socket could take time to create.
private class ConnectBG implements Runnable {
ClientComms clientComms = null;
MqttToken conToken;
MqttConnect conPacket;
private String threadName;
ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket, ExecutorService executorService) {
clientComms = cc;
conToken = cToken;
conPacket = cPacket;
threadName = "MQTT Con: " + getClient().getClientId();
}
void start() {
if (executorService == null) {
new Thread(this).start();
} else {
executorService.execute(this);
}
}
public void run() {
Thread.currentThread().setName(threadName);
final String methodName = "connectBG:run";
MqttException mqttEx = null;
// @TRACE 220=>
log.fine(CLASS_NAME, methodName, "220");
try {
// Reset an exception on existing delivery tokens.
// This will have been set if disconnect occurred before delivery was
// fully processed.
MqttToken[] toks = tokenStore.getOutstandingDelTokens();
for (MqttToken tok : toks) {
tok.internalTok.setException(null);
}
// Save the connect token in tokenStore as failure can occur before send
tokenStore.saveToken(conToken, conPacket);
// Connect to the server at the network level e.g. TCP socket and then
// start the background processing threads before sending the connect
// packet.
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: " + getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: " + getClient().getClientId(), executorService);
callback.start("MQTT Call: " + getClient().getClientId(), executorService);
internalSend(conPacket, conToken);
} catch (MqttException ex) {
// @TRACE 212=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "212", null, ex);
mqttEx = ex;
} catch (Exception ex) {
// @TRACE 209=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "209", null, ex);
mqttEx = ExceptionHelper.createMqttException(ex);
}
if (mqttEx != null) {
shutdownConnection(conToken, mqttEx, null);
}
}
}
// Kick off the disconnect processing in the background so that it does not
// block. For instance
// the quiesce
private class DisconnectBG implements Runnable {
MqttDisconnect disconnect;
long quiesceTimeout;
MqttToken token;
private String threadName;
DisconnectBG(MqttDisconnect disconnect, long quiesceTimeout, MqttToken token, ExecutorService executorService) {
this.disconnect = disconnect;
this.quiesceTimeout = quiesceTimeout;
this.token = token;
}
void start() {
threadName = "MQTT Disc: "+getClient().getClientId();
if (executorService == null) {
new Thread(this).start();
} else {
executorService.execute(this);
}
}
public void run() {
Thread.currentThread().setName(threadName);
final String methodName = "disconnectBG:run";
// @TRACE 221=>
log.fine(CLASS_NAME, methodName, "221");
// Allow current inbound and outbound work to complete
clientState.quiesce(quiesceTimeout);
try {
internalSend(disconnect, token);
// do not wait if the sender process is not running
if (sender != null && sender.isRunning()) {
token.internalTok.waitUntilSent();
}
}
catch (MqttException ex) {
}
finally {
token.internalTok.markComplete(null, null);
if (sender == null || !sender.isRunning()) {
// if the sender process is not running
token.internalTok.notifyComplete();
}
shutdownConnection(token, null, null);
}
}
}
/*
* Check and send a ping if needed and check for ping timeout. Need to send a
* ping if nothing has been sent or received in the last keepalive interval.
*/
public MqttToken checkForActivity() {
return this.checkForActivity(null);
}
/*
* Check and send a ping if needed and check for ping timeout. Need to send a
* ping if nothing has been sent or received in the last keepalive interval.
* Passes an IMqttActionListener to ClientState.checkForActivity so that the
* callbacks are attached as soon as the token is created (Bug 473928)
*/
public MqttToken checkForActivity(MqttActionListener pingCallback) {
MqttToken token = null;
try {
token = clientState.checkForActivity(pingCallback);
} catch (MqttException e) {
handleRunException(e);
} catch (Exception e) {
handleRunException(e);
}
return token;
}
private void handleRunException(Exception ex) {
final String methodName = "handleRunException";
// @TRACE 804=exception
log.fine(CLASS_NAME, methodName, "804", null, ex);
MqttException mex;
if (!(ex instanceof MqttException)) {
mex = new MqttException(MqttClientException.REASON_CODE_CONNECTION_LOST, ex);
} else {
mex = (MqttException) ex;
}
shutdownConnection(null, mex, null);
}
/**
* When Automatic reconnect is enabled, we want ClientComs to enter the
* 'resting' state if disconnected. This will allow us to publish messages
*
* @param resting
* if true, resting is enabled
*/
public void setRestingState(boolean resting) {
this.resting = resting;
}
public void setDisconnectedMessageBuffer(DisconnectedMessageBuffer disconnectedMessageBuffer) {
this.disconnectedMessageBuffer = disconnectedMessageBuffer;
}
public int getBufferedMessageCount() {
return this.disconnectedMessageBuffer.getMessageCount();
}
public MqttMessage getBufferedMessage(int bufferIndex) {
MqttPublish send = (MqttPublish) this.disconnectedMessageBuffer.getMessage(bufferIndex).getMessage();
return send.getMessage();
}
public void deleteBufferedMessage(int bufferIndex) {
this.disconnectedMessageBuffer.deleteMessage(bufferIndex);
}
/**
* When the client automatically reconnects, we want to send all messages from
* the buffer first before allowing the user to send any messages
*/
public void notifyReconnect() {
final String methodName = "notifyReconnect";
if (disconnectedMessageBuffer != null) {
// @TRACE 509=Client Reconnected, Offline Buffer Available. Sending Buffered
// Messages.
log.fine(CLASS_NAME, methodName, "509");
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
if (executorService == null) {
new Thread(disconnectedMessageBuffer).start();
} else {
executorService.execute(disconnectedMessageBuffer);
}
}
}
class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback {
final String methodName;
ReconnectDisconnectedBufferCallback(String methodName) {
this.methodName = methodName;
}
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
if (isConnected()) {
// @TRACE 510=Publising Buffered message message={0}
log.fine(CLASS_NAME, methodName, "510", new Object[] { bufferedMessage.getMessage().getKey() });
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());
// Delete from persistence if in there
clientState.unPersistBufferedMessage(bufferedMessage.getMessage());
} else {
// @TRACE 208=failed: not connected
log.fine(CLASS_NAME, methodName, "208");
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
}
public int getActualInFlight() {
return this.clientState.getActualInFlight();
}
public boolean doesSubscriptionIdentifierExist(int subscriptionIdentifier) {
return this.callback.doesSubscriptionIdentifierExist(subscriptionIdentifier);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,670 @@
/*******************************************************************************
* Copyright (c) 2009, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
* Ian Craggs - per subscription message handlers (bug 466579)
* Ian Craggs - ack control (bug 472172)
* James Sutton - Automatic Reconnect & Offline Buffering
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttAuth;
import org.eclipse.paho.mqttv5.common.packet.MqttDisconnect;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttPubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttPubComp;
import org.eclipse.paho.mqttv5.common.packet.MqttPublish;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.eclipse.paho.mqttv5.common.util.MqttTopicValidator;
/**
* Bridge between Receiver and the external API. This class gets called by
* Receiver, and then converts the comms-centric MQTT message objects into ones
* understood by the external API.
*/
public class CommsCallback implements Runnable {
private static final String CLASS_NAME = CommsCallback.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private static final int INBOUND_QUEUE_SIZE = 10;
private MqttCallback mqttCallback;
private MqttCallback reconnectInternalCallback;
private HashMap<Integer, IMqttMessageListener> callbackMap; // Map of message handler callbacks to internal IDs
private HashMap<String, Integer> callbackTopicMap; // Map of Topic Strings to internal callback Ids
private HashMap<Integer, Integer> subscriptionIdMap; // Map of Subscription Ids to callback Ids
private AtomicInteger messageHandlerId = new AtomicInteger(0);
private ClientComms clientComms;
private ArrayList<MqttPublish> messageQueue;
private ArrayList<MqttToken> completeQueue;
private enum State {STOPPED, RUNNING, QUIESCING}
private State current_state = State.STOPPED;
private State target_state = State.STOPPED;
private final Object lifecycle = new Object();
private Thread callbackThread;
private String threadName;
private Future<?> callbackFuture;
private final Object workAvailable = new Object();
private final Object spaceAvailable = new Object();
private ClientState clientState;
private boolean manualAcks = false;
CommsCallback(ClientComms clientComms) {
this.clientComms = clientComms;
this.messageQueue = new ArrayList<>(INBOUND_QUEUE_SIZE);
this.completeQueue = new ArrayList<>(INBOUND_QUEUE_SIZE);
this.callbackMap = new HashMap<>();
this.callbackTopicMap = new HashMap<>();
this.subscriptionIdMap = new HashMap<>();
log.setResourceName(clientComms.getClient().getClientId());
}
public void setClientState(ClientState clientState) {
this.clientState = clientState;
}
/**
* Starts up the Callback thread.
*
* @param threadName
* The name of the thread
* @param executorService
* the {@link ExecutorService}
*/
public void start(String threadName, ExecutorService executorService) {
this.threadName = threadName;
synchronized (lifecycle) {
if (current_state == State.STOPPED) {
// Preparatory work before starting the background thread.
// For safety ensure any old events are cleared.
synchronized (workAvailable) {
messageQueue.clear();
completeQueue.clear();
}
target_state = State.RUNNING;
if (executorService == null) {
new Thread(this).start();
} else {
callbackFuture = executorService.submit(this);
}
}
}
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
}
}
/**
* Stops the callback thread. This call will block until stop has completed.
*/
public void stop() {
final String methodName = "stop";
synchronized (lifecycle) {
if (callbackFuture != null) {
callbackFuture.cancel(true);
}
}
if (isRunning()) {
// @TRACE 700=stopping
log.fine(CLASS_NAME, methodName, "700");
synchronized (lifecycle) {
target_state = State.STOPPED;
}
if (!Thread.currentThread().equals(callbackThread)) {
synchronized (workAvailable) {
// @TRACE 701=notify workAvailable and wait for run
// to finish
log.fine(CLASS_NAME, methodName, "701");
workAvailable.notifyAll();
}
// Wait for the thread to finish.
while (isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
clientState.notifyQueueLock();
}
}
callbackThread = null;
// @TRACE 703=stopped
log.fine(CLASS_NAME, methodName, "703");
}
}
public void setCallback(MqttCallback mqttCallback) {
this.mqttCallback = mqttCallback;
}
public void setReconnectCallback(MqttCallback callback) {
this.reconnectInternalCallback = callback;
}
public void setManualAcks(boolean manualAcks) {
this.manualAcks = manualAcks;
}
public void run() {
final String methodName = "run";
callbackThread = Thread.currentThread();
callbackThread.setName(threadName);
synchronized (lifecycle) {
current_state = State.RUNNING;
}
while (isRunning()) {
try {
// If no work is currently available, then wait until there is some...
try {
synchronized (workAvailable) {
if (isRunning() && messageQueue.isEmpty()
&& completeQueue.isEmpty()) {
// @TRACE 704=wait for workAvailable
log.fine(CLASS_NAME, methodName, "704");
workAvailable.wait();
}
}
} catch (InterruptedException e) {
}
if (isRunning()) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (workAvailable) {
if (!completeQueue.isEmpty()) {
// First call the delivery arrived callback if needed
token = completeQueue.get(0);
completeQueue.remove(0);
}
}
if (null != token) {
handleActionComplete(token);
}
// Check for messageArrived callbacks...
MqttPublish message = null;
synchronized (workAvailable) {
if (!messageQueue.isEmpty()) {
// Note, there is a window on connect where a publish
// could arrive before we've
// finished the connect logic.
message = messageQueue.get(0);
messageQueue.remove(0);
}
}
if (null != message) {
handleMessage(message);
}
}
if (isQuiescing()) {
clientState.checkQuiesceLock();
}
} catch (Throwable ex) {
// Users code could throw an Error or Exception e.g. in the case
// of class NoClassDefFoundError
// @TRACE 714=callback threw exception
log.fine(CLASS_NAME, methodName, "714", null, ex);
clientComms.shutdownConnection(null, new MqttException(ex), null);
} finally {
synchronized (spaceAvailable) {
// Notify the spaceAvailable lock, to say that there's now
// some space on the queue...
// @TRACE 706=notify spaceAvailable
log.fine(CLASS_NAME, methodName, "706");
spaceAvailable.notifyAll();
}
}
}
synchronized (lifecycle) {
current_state = State.STOPPED;
}
callbackThread = null;
}
private void handleActionComplete(MqttToken token) throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// @TRACE 705=callback and notify for key={0}
log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() });
if (token.isComplete()) {
// Finish by doing any post processing such as delete
// from persistent store but only do so if the action
// is complete
clientState.notifyComplete(token);
}
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if (mqttCallback != null && token.internalTok.isDeliveryToken() == true && token.isComplete()) {
try {
mqttCallback.deliveryComplete(token);
} catch (Throwable ex) {
// Just log the fact that an exception was thrown
// @TRACE 726=Ignoring Exception thrown from deliveryComplete {0}
log.fine(CLASS_NAME, methodName, "726", new Object[] { ex });
}
}
// Now call async action completion callbacks
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if (token.isComplete()) {
if (token.internalTok.isDeliveryToken() == true || token.getActionCallback() instanceof MqttActionListener) {
token.internalTok.setNotified(true);
}
}
}
}
/**
* This method is called when the connection to the server is lost. If there is
* no cause then it was a clean disconnect. The connectionLost callback will be
* invoked if registered and run on the thread that requested shutdown e.g.
* receiver or sender thread. If the request was a user initiated disconnect
* then the disconnect token will be notified.
*
* @param cause
* the reason behind the loss of connection.
* @param message
* The {@link MqttDisconnect} packet sent by the server
*/
public void connectionLost(MqttException cause, MqttDisconnect message) {
final String methodName = "connectionLost";
// If there was a problem and a client callback has been set inform
// the connection lost listener of the problem.
try {
if (mqttCallback != null && message != null) {
// @TRACE 722=Server initiated disconnect, connection closed. Disconnect={0}
log.fine(CLASS_NAME, methodName, "722", new Object[] { message.toString() });
MqttDisconnectResponse disconnectResponse = new MqttDisconnectResponse(message.getReturnCode(),
message.getProperties().getReasonString(),
(ArrayList<UserProperty>) message.getProperties().getUserProperties(),
message.getProperties().getServerReference());
mqttCallback.disconnected(disconnectResponse);
} else if (mqttCallback != null && cause != null) {
// @TRACE 708=call connectionLost
log.fine(CLASS_NAME, methodName, "708", new Object[] { cause });
MqttDisconnectResponse disconnectResponse = new MqttDisconnectResponse(cause);
mqttCallback.disconnected(disconnectResponse);
}
if (reconnectInternalCallback != null && cause != null) {
MqttDisconnectResponse disconnectResponse = new MqttDisconnectResponse(cause);
reconnectInternalCallback.disconnected(disconnectResponse);
}
} catch (Throwable t) {
// Just log the fact that an exception was thrown
// @TRACE 720=Ignoring Exception thrown from connectionLost {0}
log.fine(CLASS_NAME, methodName, "720", new Object[] { t });
}
}
/**
* An action has completed - if a completion listener has been set on the token
* then invoke it with the outcome of the action.
*
* @param token
* The {@link MqttToken} that has completed
*/
public void fireActionEvent(MqttToken token) {
final String methodName = "fireActionEvent";
if (token != null) {
MqttActionListener asyncCB = token.getActionCallback();
if (asyncCB != null) {
if (token.getException() == null) {
// @TRACE 716=call onSuccess key={0}
log.fine(CLASS_NAME, methodName, "716", new Object[] { token.internalTok.getKey() });
asyncCB.onSuccess(token);
} else {
// @TRACE 717=call onFailure key {0}
log.fine(CLASS_NAME, methodName, "716", new Object[] { token.internalTok.getKey() });
asyncCB.onFailure(token, token.getException());
}
}
}
}
/**
* This method is called when a message arrives on a topic. Messages are only
* added to the queue for inbound messages if the client is not quiescing.
*
* @param sendMessage
* the MQTT SEND message.
*/
public void messageArrived(MqttPublish sendMessage) {
final String methodName = "messageArrived";
if (mqttCallback != null || callbackMap.size() > 0) {
// If we already have enough messages queued up in memory, wait
// until some more queue space becomes available. This helps
// the client protect itself from getting flooded by messages
// from the server.
synchronized (spaceAvailable) {
while (isRunning() && !isQuiescing() && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
try {
// @TRACE 709=wait for spaceAvailable
log.fine(CLASS_NAME, methodName, "709");
spaceAvailable.wait(200);
} catch (InterruptedException ex) {
}
}
}
if (!isQuiescing()) {
// Notify the CommsCallback thread that there's work to do...
synchronized (workAvailable) {
messageQueue.add(sendMessage);
// @TRACE 710=new msg avail, notify workAvailable
log.fine(CLASS_NAME, methodName, "710");
workAvailable.notifyAll();
}
}
}
}
/**
* This method is called when an Auth Message is received.
*
* @param authMessage
* The {@link MqttAuth} message.
*/
public void authMessageReceived(MqttAuth authMessage) {
String methodName = "authMessageReceived";
if (mqttCallback != null) {
try {
mqttCallback.authPacketArrived(authMessage.getReturnCode(), authMessage.getProperties());
} catch (Throwable ex) {
// Just log the fact that an exception was thrown
// @TRACE 727=Ignoring Exception thrown from authPacketArrived {0}
log.fine(CLASS_NAME, methodName, "727", new Object[] { ex });
}
}
}
/**
* This method is called when a non-critical MQTT error has occurred in the
* client that the application should choose how to deal with.
*
* @param exception
* The exception that was thrown containing the cause for
* disconnection.
*/
public void mqttErrorOccurred(MqttException exception) {
final String methodName = "mqttErrorOccurred";
log.warning(CLASS_NAME, methodName, "721", new Object[] { exception.getMessage() });
if (mqttCallback != null) {
try {
mqttCallback.mqttErrorOccurred(exception);
} catch (Exception ex) {
// Just log the fact that an exception was thrown
// @TRACE 724=Ignoring Exception thrown from mqttErrorOccurred: {0}
log.fine(CLASS_NAME, methodName, "724", new Object[] { ex });
}
}
}
/**
* Let the call back thread quiesce. Prevent new inbound messages being added to
* the process queue and let existing work quiesce. (until the thread is told to
* shutdown).
*/
public void quiesce() {
final String methodName = "quiesce";
synchronized (lifecycle) {
if (current_state == State.RUNNING)
current_state = State.QUIESCING;
}
synchronized (spaceAvailable) {
// @TRACE 711=quiesce notify spaceAvailable
log.fine(CLASS_NAME, methodName, "711");
// Unblock anything waiting for space...
spaceAvailable.notifyAll();
}
}
boolean areQueuesEmpty() {
synchronized (workAvailable) {
return completeQueue.isEmpty() && messageQueue.isEmpty();
}
}
public boolean isQuiesced() {
return (isQuiescing() && areQueuesEmpty());
}
private void handleMessage(MqttPublish publishMessage) throws Exception {
final String methodName = "handleMessage";
// If quisecing process any pending messages.
String destName = publishMessage.getTopicName();
// @TRACE 713=call messageArrived key={0} topic={1}
log.fine(CLASS_NAME, methodName, "713", new Object[] { Integer.valueOf(publishMessage.getMessageId()), destName });
deliverMessage(destName, publishMessage.getMessageId(), publishMessage.getMessage());
// If we are not in manual ACK mode:
if (!this.manualAcks && publishMessage.getMessage().getQos() == 1) {
this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS,
publishMessage.getMessageId(), new MqttProperties()),
new MqttToken(clientComms.getClient().getClientId()));
}
}
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
if (qos == 1) {
this.clientComms.internalSend(
new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties()),
new MqttToken(clientComms.getClient().getClientId()));
} else if (qos == 2) {
this.clientComms.deliveryComplete(messageId);
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties());
// @TRACE 723=Creating MqttPubComp due to manual ACK: {0}
log.info(CLASS_NAME, "messageArrivedComplete", "723", new Object[] { pubComp.toString() });
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
}
}
public void asyncOperationComplete(MqttToken token) {
final String methodName = "asyncOperationComplete";
if (isRunning()) {
// invoke callbacks on callback thread
synchronized (workAvailable) {
completeQueue.add(token);
// @TRACE 715=new workAvailable. key={0}
log.fine(CLASS_NAME, methodName, "715", new Object[] { token.internalTok.getKey() });
workAvailable.notifyAll();
}
} else {
// invoke async callback on invokers thread
try {
handleActionComplete(token);
} catch (MqttException ex) {
// Users code could throw an Error or Exception e.g. in the case
// of class NoClassDefFoundError
// @TRACE 719=callback threw ex:
log.fine(CLASS_NAME, methodName, "719", null, ex);
// Shutdown likely already in progress but no harm to confirm
clientComms.shutdownConnection(null, new MqttException(ex), null);
}
}
}
/**
* Returns the thread used by this callback.
*
* @return The {@link Thread}
*/
protected Thread getThread() {
return callbackThread;
}
public void setMessageListener(Integer subscriptionId, String topicFilter, IMqttMessageListener messageListener) {
int internalId = messageHandlerId.incrementAndGet();
this.callbackMap.put(internalId, messageListener);
this.callbackTopicMap.put(topicFilter, internalId);
if (subscriptionId != null) {
this.subscriptionIdMap.put(subscriptionId, internalId);
}
}
/**
* Removes a Message Listener by Topic. If the Topic is null or incorrect, this
* function will return without making any changes. It will also attempt to find
* any subscription IDs linked to the same message listener and will remove them
* too.
*
* @param topicFilter
* the topic filter that identifies the Message listener to remove.
*/
public void removeMessageListener(String topicFilter) {
Integer callbackId = this.callbackTopicMap.get(topicFilter);
this.callbackMap.remove(callbackId);
this.callbackTopicMap.remove(topicFilter);
// Reverse lookup the subscription ID if it exists to remove that as well
for (Map.Entry<Integer, Integer> entry : this.subscriptionIdMap.entrySet()) {
if (entry.getValue().equals(callbackId)) {
this.subscriptionIdMap.remove(entry.getKey());
}
}
}
/**
* Removes a Message Listener by subscription ID. If the Subscription Identifier
* is null or incorrect, this function will return without making any changes.
* It will also attempt to find any Topic Strings linked to the same message
* listener and will remove them too.
*
* @param subscriptionId
* the subscription ID that identifies the Message listener to
* remove.
*/
public void removeMessageListener(Integer subscriptionId) {
Integer callbackId = this.subscriptionIdMap.get(subscriptionId);
this.subscriptionIdMap.remove(callbackId);
this.callbackMap.remove(callbackId);
// Reverse lookup the topic if it exists to remove that as well
for (Map.Entry<String, Integer> entry : this.callbackTopicMap.entrySet()) {
if (entry.getValue().equals(callbackId)) {
this.callbackTopicMap.remove(entry.getKey());
}
}
}
public void removeMessageListeners() {
this.callbackMap.clear();
this.subscriptionIdMap.clear();
this.callbackTopicMap.clear();
}
protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception {
boolean delivered = false;
String methodName = "deliverMessage";
if (aMessage.getProperties().getSubscriptionIdentifiers().isEmpty()) {
// No Subscription IDs, use topic filter matching
for (Map.Entry<String, Integer> entry : this.callbackTopicMap.entrySet()) {
if (MqttTopicValidator.isMatched(entry.getKey(), topicName)) {
aMessage.setId(messageId);
this.callbackMap.get(entry.getValue()).messageArrived(topicName, aMessage);
delivered = true;
}
}
} else {
// We have Subscription IDs
for (Integer subId : aMessage.getProperties().getSubscriptionIdentifiers()) {
if (this.subscriptionIdMap.containsKey(subId)) {
Integer callbackId = this.subscriptionIdMap.get(subId);
aMessage.setId(messageId);
this.callbackMap.get(callbackId).messageArrived(topicName, aMessage);
delivered = true;
}
}
}
/*
* if the message hasn't been delivered to a per subscription handler, give it
* to the default handler
*/
if (mqttCallback != null && !delivered) {
aMessage.setId(messageId);
try {
mqttCallback.messageArrived(topicName, aMessage);
} catch (Exception ex) {
// Just log the fact that an exception was thrown
// @TRACE 725=Ignoring Exception thrown from messageArrived: {0}
log.fine(CLASS_NAME, methodName, "725", new Object[] { ex });
}
delivered = true;
}
return delivered;
}
public boolean doesSubscriptionIdentifierExist(int subscriptionIdentifier) {
return (this.subscriptionIdMap.containsKey(subscriptionIdentifier));
}
public boolean isRunning() {
boolean result;
synchronized (lifecycle) {
result = ((current_state == State.RUNNING || current_state == State.QUIESCING)
&& target_state == State.RUNNING);
}
return result;
}
public boolean isQuiescing() {
boolean result;
synchronized (lifecycle) {
result = (current_state == State.QUIESCING);
}
return result;
}
}

View File

@@ -0,0 +1,241 @@
/*******************************************************************************
* Copyright (c) 2009, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.client.wire.MqttInputStream;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttAck;
import org.eclipse.paho.mqttv5.common.packet.MqttDisconnect;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
/**
* Receives MQTT packets from the server.
*/
public class CommsReceiver implements Runnable {
private static final String CLASS_NAME = CommsReceiver.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private enum State {STOPPED, RUNNING, STARTING, RECEIVING}
private State current_state = State.STOPPED;
private State target_state = State.STOPPED;
private final Object lifecycle = new Object();
private String threadName;
private Future<?> receiverFuture;
private ClientState clientState = null;
private ClientComms clientComms = null;
private MqttInputStream in;
private CommsTokenStore tokenStore = null;
private Thread recThread = null;
public CommsReceiver(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, InputStream in) {
this.in = new MqttInputStream(clientState, in, clientComms.getClient().getClientId());
this.clientComms = clientComms;
this.clientState = clientState;
this.tokenStore = tokenStore;
log.setResourceName(clientComms.getClient().getClientId());
}
/**
* Starts up the Receiver's thread.
*
* @param threadName
* the thread name.
* @param executorService
* used to execute the thread
*/
public void start(String threadName, ExecutorService executorService) {
this.threadName = threadName;
final String methodName = "start";
// @TRACE 855=starting
log.fine(CLASS_NAME, methodName, "855");
synchronized (lifecycle) {
if (current_state == State.STOPPED && target_state == State.STOPPED) {
target_state = State.RUNNING;
if (executorService == null) {
new Thread(this).start();
} else {
receiverFuture = executorService.submit(this);
}
}
}
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
}
}
/**
* Stops the Receiver's thread. This call will block.
*/
public void stop() {
final String methodName = "stop";
synchronized (lifecycle) {
if (receiverFuture != null) {
receiverFuture.cancel(true);
}
//@TRACE 850=stopping
log.fine(CLASS_NAME,methodName, "850");
if (isRunning()) {
target_state = State.STOPPED;
}
}
while (isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
}
//@TRACE 851=stopped
log.fine(CLASS_NAME,methodName,"851");
}
/**
* Run loop to receive messages from the server.
*/
public void run() {
recThread = Thread.currentThread();
recThread.setName(threadName);
final String methodName = "run";
MqttToken token = null;
synchronized (lifecycle) {
current_state = State.RUNNING;
}
try {
State my_target;
synchronized (lifecycle) {
my_target = target_state;
}
while (my_target == State.RUNNING && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
if (in.available() > 0) {
synchronized (lifecycle) {
current_state = State.RECEIVING;
}
}
MqttWireMessage message = in.readMqttWireMessage();
synchronized (lifecycle) {
current_state = State.RUNNING;
}
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck) message);
}
} else {
// This is an ack for a message we no longer have a ticket for.
log.fine(CLASS_NAME, methodName, "857");
clientState.handleOrphanedAcks((MqttAck) message);
}
} else if (message != null && message instanceof MqttDisconnect) {
// This is a Disconnect Message
clientComms.shutdownConnection(null, new MqttException(MqttClientException.REASON_CODE_SERVER_DISCONNECTED, (MqttDisconnect) message), (MqttDisconnect) message);
} else {
if (message != null) {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
else {
if (!clientComms.isConnected() && !clientComms.isConnecting()) {
throw new IOException("Connection is lost.");
}
}
}
}
catch (MqttException ex) {
// @TRACE 856=Stopping, MQttException
log.fine(CLASS_NAME, methodName, "856", null, ex);
synchronized (lifecycle) {
target_state = State.STOPPED;
}
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex, null);
}
catch (IOException ioe) {
// @TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME, methodName, "853");
if (target_state != State.STOPPED) {
synchronized (lifecycle) {
target_state = State.STOPPED;
}
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token,
new MqttException(MqttClientException.REASON_CODE_CONNECTION_LOST, ioe), null);
}
}
}
finally {
synchronized (lifecycle) {
current_state = State.RUNNING;
}
}
synchronized (lifecycle) {
my_target = target_state;
}
} // end while
} finally {
synchronized (lifecycle) {
current_state = State.STOPPED;
}
} // end try
recThread = null;
//@TRACE 854=<
log.fine(CLASS_NAME,methodName,"854");
}
public boolean isRunning() {
boolean result;
synchronized (lifecycle) {
result = ((current_state == State.RUNNING || current_state == State.RECEIVING)
&& target_state == State.RUNNING);
}
return result;
}
/**
* Returns the receiving state.
*
* @return true if the receiver is receiving data, false otherwise.
*/
public boolean isReceiving() {
boolean result;
synchronized (lifecycle) {
result = (current_state == State.RECEIVING);
}
return result;
}
}

View File

@@ -0,0 +1,210 @@
/*******************************************************************************
* Copyright (c) 2009, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.client.wire.MqttOutputStream;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttAck;
import org.eclipse.paho.mqttv5.common.packet.MqttDisconnect;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
public class CommsSender implements Runnable {
private static final String CLASS_NAME = CommsSender.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
//Sends MQTT packets to the server on its own thread
private enum State {STOPPED, RUNNING, STARTING}
private State current_state = State.STOPPED;
private State target_state = State.STOPPED;
private final Object lifecycle = new Object();
private Thread sendThread = null;
private String threadName;
private Future<?> senderFuture;
private ClientState clientState = null;
private MqttOutputStream out;
private ClientComms clientComms = null;
private CommsTokenStore tokenStore = null;
public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, OutputStream out) {
this.out = new MqttOutputStream(clientState, out, clientComms.getClient().getClientId());
this.clientComms = clientComms;
this.clientState = clientState;
this.tokenStore = tokenStore;
log.setResourceName(clientComms.getClient().getClientId());
}
/**
* Starts up the Sender thread.
* @param threadName the threadname
* @param executorService used to execute the thread
*/
public void start(String threadName, ExecutorService executorService) {
this.threadName = threadName;
synchronized (lifecycle) {
if (current_state == State.STOPPED && target_state == State.STOPPED) {
target_state = State.RUNNING;
if (executorService == null) {
new Thread(this).start();
} else {
senderFuture = executorService.submit(this);
}
}
}
while (!isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
}
}
/**
* Stops the Sender's thread. This call will block.
*/
public void stop() {
final String methodName = "stop";
if (!isRunning()) {
return;
}
synchronized (lifecycle) {
if (senderFuture != null) {
senderFuture.cancel(true);
}
//@TRACE 800=stopping sender
log.fine(CLASS_NAME,methodName,"800");
if (isRunning()) {
target_state = State.STOPPED;
clientState.notifyQueueLock();
}
}
while (isRunning()) {
try { Thread.sleep(100); } catch (Exception e) { }
clientState.notifyQueueLock();
}
//@TRACE 801=stopped
log.fine(CLASS_NAME,methodName,"801");
}
public void run() {
sendThread = Thread.currentThread();
sendThread.setName(threadName);
final String methodName = "run";
MqttWireMessage message = null;
synchronized (lifecycle) {
current_state = State.RUNNING;
}
try {
State my_target;
synchronized (lifecycle) {
my_target = target_state;
}
while (my_target == State.RUNNING && (out != null)) {
try {
message = clientState.get();
if (message != null) {
//@TRACE 802=network send key={0} msg={1}
log.fine(CLASS_NAME,methodName,"802", new Object[] {message.getKey(),message});
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
} else { // null message
//@TRACE 803=get message returned null, stopping}
log.fine(CLASS_NAME,methodName,"803");
synchronized (lifecycle) {
target_state = State.STOPPED;
}
}
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
synchronized (lifecycle) {
my_target = target_state;
}
} // end while
} finally {
synchronized (lifecycle) {
current_state = State.STOPPED;
sendThread = null;
}
}
//@TRACE 805=<
log.fine(CLASS_NAME, methodName,"805");
}
private void handleRunException(MqttWireMessage message, Exception ex) {
final String methodName = "handleRunException";
//@TRACE 804=exception
log.severe(CLASS_NAME,methodName,"804",null, ex);
MqttException mex;
if ( !(ex instanceof MqttException)) {
mex = new MqttException(MqttClientException.REASON_CODE_CONNECTION_LOST, ex);
} else {
mex = (MqttException)ex;
}
synchronized (lifecycle) {
target_state = State.STOPPED;
}
clientComms.shutdownConnection(null, mex, null);
}
public boolean isRunning() {
boolean result;
synchronized (lifecycle) {
result = (current_state == State.RUNNING && target_state == State.RUNNING);
}
return result;
}
}

View File

@@ -0,0 +1,254 @@
/*******************************************************************************
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Vector;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttPublish;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
/**
* Provides a "token" based system for storing and tracking actions across
* multiple threads.
* When a message is sent, a token is associated with the message
* and saved using the {@link CommsTokenStore#saveToken(MqttToken, MqttWireMessage)} method. Anyone interested
* in tacking the state can call one of the wait methods on the token or using
* the asynchronous listener callback method on the operation.
* The {@link CommsReceiver} class, on another thread, reads responses back from
* the network. It uses the response to find the relevant token, which it can then
* notify.
*
* Note:
* Ping, connect and disconnect do not have a unique message id as
* only one outstanding request of each type is allowed to be outstanding
*/
public class CommsTokenStore {
private static final String CLASS_NAME = CommsTokenStore.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
// Maps message-specific data (usually message IDs) to tokens
private final Hashtable<String, MqttToken> tokens;
private String logContext;
private MqttException closedResponse = null;
public CommsTokenStore(String logContext) {
final String methodName = "<Init>";
log.setResourceName(logContext);
this.tokens = new Hashtable<String, MqttToken>();
this.logContext = logContext;
//@TRACE 308=<>
log.fine(CLASS_NAME,methodName,"308");//,new Object[]{message});
}
/**
* Based on the message type that has just been received return the associated
* token from the token store or null if one does not exist.
* @param message whose token is to be returned
* @return token for the requested message
*/
public MqttToken getToken(MqttWireMessage message) {
String key = message.getKey();
return (MqttToken)tokens.get(key);
}
public MqttToken getToken(String key) {
return (MqttToken)tokens.get(key);
}
public MqttToken removeToken(MqttWireMessage message) {
if (message != null) {
return removeToken(message.getKey());
}
return null;
}
public MqttToken removeToken(String key) {
final String methodName = "removeToken";
//@TRACE 306=key={0}
log.fine(CLASS_NAME,methodName,"306",new Object[]{key});
if ( null != key ){
return (MqttToken) tokens.remove(key);
}
return null;
}
/**
* Restores a token after a client restart. This method could be called
* for a SEND of CONFIRM, but either way, the original SEND is what's
* needed to re-build the token.
* @param message The {@link MqttPublish} message to restore
* @return {@link MqttToken}
*/
protected MqttToken restoreToken(MqttPublish message) {
final String methodName = "restoreToken";
MqttToken token;
synchronized(tokens) {
String key = Integer.valueOf(message.getMessageId()).toString();
if (this.tokens.containsKey(key)) {
token = this.tokens.get(key);
//@TRACE 302=existing key={0} message={1} token={2}
log.fine(CLASS_NAME,methodName, "302",new Object[]{key, message,token});
} else {
token = new MqttToken(logContext);
token.internalTok.setDeliveryToken(true);
token.internalTok.setKey(key);
this.tokens.put(key, token);
//@TRACE 303=creating new token key={0} message={1} token={2}
log.fine(CLASS_NAME,methodName,"303",new Object[]{key, message, token});
}
}
return token;
}
// For outbound messages store the token in the token store
// For pubrel use the existing publish token
protected void saveToken(MqttToken token, MqttWireMessage message) throws MqttException {
final String methodName = "saveToken";
synchronized(tokens) {
if (closedResponse == null) {
String key = message.getKey();
//@TRACE 300=key={0} message={1}
log.fine(CLASS_NAME,methodName,"300",new Object[]{key, message});
saveToken(token,key);
} else {
throw closedResponse;
}
}
}
protected void saveToken(MqttToken token, String key) {
final String methodName = "saveToken";
synchronized(tokens) {
//@TRACE 307=key={0} token={1}
log.fine(CLASS_NAME,methodName,"307",new Object[]{key,token.toString()});
token.internalTok.setKey(key);
this.tokens.put(key, token);
}
}
protected void quiesce(MqttException quiesceResponse) {
final String methodName = "quiesce";
synchronized(tokens) {
//@TRACE 309=resp={0}
log.fine(CLASS_NAME,methodName,"309",new Object[]{quiesceResponse});
closedResponse = quiesceResponse;
}
}
public void open() {
final String methodName = "open";
synchronized(tokens) {
//@TRACE 310=>
log.fine(CLASS_NAME,methodName,"310");
closedResponse = null;
}
}
public MqttToken[] getOutstandingDelTokens() {
final String methodName = "getOutstandingDelTokens";
synchronized(tokens) {
//@TRACE 311=>
log.fine(CLASS_NAME,methodName,"311");
Vector<MqttToken> list = new Vector<MqttToken>();
Enumeration<MqttToken> enumeration = tokens.elements();
MqttToken token;
while(enumeration.hasMoreElements()) {
token = (MqttToken)enumeration.nextElement();
if (token != null
&& token.internalTok.isDeliveryToken() == true
&& !token.internalTok.isNotified()) {
list.addElement(token);
}
}
MqttToken[] result = new MqttToken[list.size()];
return (MqttToken[]) list.toArray(result);
}
}
public Vector<MqttToken> getOutstandingTokens() {
final String methodName = "getOutstandingTokens";
synchronized(tokens) {
//@TRACE 312=>
log.fine(CLASS_NAME,methodName,"312");
Vector<MqttToken> list = new Vector<MqttToken>();
Enumeration<MqttToken> enumeration = tokens.elements();
MqttToken token;
while(enumeration.hasMoreElements()) {
token = (MqttToken)enumeration.nextElement();
if (token != null) {
list.addElement(token);
}
}
return list;
}
}
/**
* Empties the token store without notifying any of the tokens.
*/
public void clear() {
final String methodName = "clear";
//@TRACE 305=> {0} tokens
log.fine(CLASS_NAME, methodName, "305", new Object[] { Integer.valueOf(tokens.size())});
synchronized(tokens) {
tokens.clear();
}
}
public int count() {
synchronized(tokens) {
return tokens.size();
}
}
public String toString() {
String lineSep = System.getProperty("line.separator","\n");
StringBuffer toks = new StringBuffer();
synchronized(tokens) {
Enumeration<MqttToken> enumeration = tokens.elements();
MqttToken token;
while(enumeration.hasMoreElements()) {
token = (MqttToken)enumeration.nextElement();
toks.append("{"+token.internalTok+"}"+lineSep);
}
return toks.toString();
}
}
}

View File

@@ -0,0 +1,263 @@
/*******************************************************************************
* Copyright (c) 2009, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Ian Craggs - MQTT 3.1.1 support
* Ian Craggs - fix bug 469527
* James Sutton - Automatic Reconnect & Offline Buffering
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
/**
* <p>
* This class handles the connection of the AsyncClient to one of the available
* URLs.
* </p>
* <p>
* The URLs are supplied as either the singleton when the client is created, or
* as a list in the connect options.
* </p>
* <p>
* This class uses its own onSuccess and onFailure callbacks in preference to
* the user supplied callbacks.
* </p>
* <p>
* An attempt is made to connect to each URL in the list until either a
* connection attempt succeeds or all the URLs have been tried
* </p>
* <p>
* If a connection succeeds then the users token is notified and the users
* onSuccess callback is called.
* </p>
* <p>
* If a connection fails then another URL in the list is attempted, otherwise
* the users token is notified and the users onFailure callback is called
* </p>
*/
public class ConnectActionListener implements MqttActionListener {
private MqttClientPersistence persistence;
private MqttAsyncClient client;
private ClientComms comms;
private MqttConnectionOptions options;
private MqttToken userToken;
private Object userContext;
private MqttActionListener userCallback;
private MqttCallback mqttCallback;
private MqttSessionState mqttSession;
private MqttConnectionState mqttConnection;
private boolean reconnect;
/**
* @param persistence
* The {@link MqttClientPersistence} layer
* @param client
* the {@link MqttAsyncClient}
* @param comms
* {@link ClientComms}
* @param options
* the {@link MqttConnectionOptions}
* @param userToken
* the {@link MqttToken}
* @param userContext
* the User Context Object
* @param userCallback
* the {@link MqttActionListener} as the callback for the user
* @param mqttSession
* the {@link MqttSessionState}
* @param mqttConnection
* the {@link MqttConnectionState}
* @param reconnect
* If true, this is a reconnect attempt
*/
public ConnectActionListener(MqttAsyncClient client, MqttClientPersistence persistence, ClientComms comms,
MqttConnectionOptions options, MqttToken userToken, Object userContext, MqttActionListener userCallback,
boolean reconnect, MqttSessionState mqttSession, MqttConnectionState mqttConnection) {
this.persistence = persistence;
this.client = client;
this.comms = comms;
this.options = options;
this.userToken = userToken;
this.userContext = userContext;
this.userCallback = userCallback;
this.reconnect = reconnect;
this.mqttSession = mqttSession;
this.mqttConnection = mqttConnection;
}
/**
* If the connect succeeded then call the users onSuccess callback
*
* @param token
* the {@link IMqttToken} from the successful connection
*/
public void onSuccess(IMqttToken token) {
// Set properties imposed on us by the Server
MqttToken myToken = (MqttToken) token;
if (myToken.getResponseProperties() != null) {
mqttConnection.setReceiveMaximum(myToken.getResponseProperties().getReceiveMaximum());
mqttConnection.setMaximumQoS(myToken.getResponseProperties().getMaximumQoS());
mqttConnection.setRetainAvailable(myToken.getResponseProperties().isRetainAvailable());
mqttConnection.setOutgoingMaximumPacketSize(myToken.getResponseProperties().getMaximumPacketSize());
mqttConnection.setIncomingMaximumPacketSize(options.getMaximumPacketSize());
mqttConnection.setOutgoingTopicAliasMaximum(myToken.getResponseProperties().getTopicAliasMaximum());
mqttConnection
.setWildcardSubscriptionsAvailable(myToken.getResponseProperties().isWildcardSubscriptionsAvailable());
mqttConnection.setSubscriptionIdentifiersAvailable(
myToken.getResponseProperties().isSubscriptionIdentifiersAvailable());
mqttConnection.setSharedSubscriptionsAvailable(myToken.getResponseProperties().isSharedSubscriptionAvailable());
// If provided, set the server keep alive value.
if(myToken.getResponseProperties().getServerKeepAlive() != null) {
mqttConnection.setKeepAliveSeconds(myToken.getResponseProperties().getServerKeepAlive());
}
// If we are assigning the client ID post connect, then we need to re-initialise
// our persistence layer.
if (myToken.getResponseProperties().getAssignedClientIdentifier() != null) {
mqttSession.setClientId(myToken.getResponseProperties().getAssignedClientIdentifier());
try {
persistence.open(myToken.getResponseProperties().getAssignedClientIdentifier());
if (options.isCleanStart()) {
persistence.clear();
}
} catch (MqttPersistenceException exception) {
// If we fail to open persistence at this point, our best bet is to immediately
// close the connection.
try {
client.disconnect();
} catch (MqttException ex) {
}
onFailure(token, exception);
return;
}
}
}
userToken.internalTok.markComplete(token.getResponse(), null);
userToken.internalTok.notifyComplete();
userToken.internalTok.setClient(this.client); // fix bug 469527 - maybe should be set elsewhere?
if (reconnect) {
comms.notifyReconnect();
}
if (userCallback != null) {
userToken.setUserContext(userContext);
userCallback.onSuccess(userToken);
}
if (mqttCallback != null) {
String serverURI = comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
try {
mqttCallback.connectComplete(reconnect, serverURI);
} catch (Throwable ex) {
// Just catch any exceptions thrown here and ignore.
}
}
}
/**
* The connect failed, so try the next URI on the list. If there are no more
* URIs, then fail the overall connect.
*
* @param token
* the {@link IMqttToken} from the failed connection attempt
* @param exception
* the {@link Throwable} exception from the failed connection attempt
*/
public void onFailure(IMqttToken token, Throwable exception) {
int numberOfURIs = comms.getNetworkModules().length;
int index = comms.getNetworkModuleIndex();
if ((index + 1) < numberOfURIs) {
comms.setNetworkModuleIndex(index + 1);
try {
connect();
} catch (MqttPersistenceException e) {
onFailure(token, e); // try the next URI in the list
}
} else {
MqttException ex;
if (exception instanceof MqttException) {
ex = (MqttException) exception;
} else {
ex = new MqttException(exception);
}
userToken.internalTok.markComplete(null, ex);
userToken.internalTok.notifyComplete();
userToken.internalTok.setClient(this.client); // fix bug 469527 - maybe should be set elsewhere?
if (userCallback != null) {
userToken.setUserContext(userContext);
userCallback.onFailure(userToken, exception);
}
}
}
/**
* Start the connect processing
*
* @throws MqttPersistenceException
* if an error is thrown whilst setting up persistence
*/
public void connect() throws MqttPersistenceException {
MqttToken token = new MqttToken(client.getClientId());
token.setActionCallback(this);
token.setUserContext(this);
if (!client.getClientId().equals("")) {
persistence.open(client.getClientId());
if (options.isCleanStart()) {
persistence.clear();
}
}
try {
comms.connect(options, token);
} catch (MqttException e) {
onFailure(token, e);
}
}
/**
* Set the MqttCallbackExtened callback to receive connectComplete callbacks
*
* @param mqttCallback
* the {@link MqttCallback} to be called when the connection
* completes
*/
public void setMqttCallbackExtended(MqttCallback mqttCallback) {
this.mqttCallback = mqttCallback;
}
}

View File

@@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.client.MqttTopic;
/**
* This interface exists to act as a common type for
* MqttClient and MqttMIDPClient so they can be passed to
* ClientComms without either client class need to know
* about the other.
* Specifically, this allows the MIDP client to work
* without the non-MIDP MqttClient/MqttConnectOptions
* classes being present.
*/
public interface DestinationProvider {
MqttTopic getTopic(String topic);
}

View File

@@ -0,0 +1,135 @@
/*******************************************************************************
* Copyright (c) 2016, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* James Sutton - Initial Contribution for Automatic Reconnect & Offline Buffering
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.util.ArrayList;
import org.eclipse.paho.mqttv5.client.BufferedMessage;
import org.eclipse.paho.mqttv5.client.DisconnectedBufferOptions;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
public class DisconnectedMessageBuffer implements Runnable {
private static final String CLASS_NAME = DisconnectedMessageBuffer.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private DisconnectedBufferOptions bufferOpts;
private ArrayList<BufferedMessage> buffer;
private final Object bufLock = new Object(); // Used to synchronise the buffer
private IDisconnectedBufferCallback callback;
public DisconnectedMessageBuffer(DisconnectedBufferOptions options){
this.bufferOpts = options;
buffer = new ArrayList<BufferedMessage>();
}
/**
* This will add a new message to the offline buffer,
* if the buffer is full and deleteOldestMessages is enabled
* then the 0th item in the buffer will be deleted and the
* new message will be added. If it is not enabled then an
* MqttException will be thrown.
* @param message the {@link MqttWireMessage} that will be buffered
* @param token the associated {@link MqttToken}
* @throws MqttException if the Buffer is full
*/
public void putMessage(MqttWireMessage message, MqttToken token) throws MqttException{
BufferedMessage bufferedMessage = new BufferedMessage(message, token);
synchronized (bufLock) {
if(buffer.size() < bufferOpts.getBufferSize()){
buffer.add(bufferedMessage);
} else if(bufferOpts.isDeleteOldestMessages() == true){
buffer.remove(0);
buffer.add(bufferedMessage);
}else {
throw new MqttException(MqttClientException.REASON_CODE_DISCONNECTED_BUFFER_FULL);
}
}
}
/**
* Retrieves a message from the buffer at the given index.
* @param messageIndex the index of the message to be retrieved in the buffer
* @return the {@link BufferedMessage}
*/
public BufferedMessage getMessage(int messageIndex){
synchronized (bufLock) {
return((BufferedMessage) buffer.get(messageIndex));
}
}
/**
* Removes a message from the buffer
* @param messageIndex the index of the message to be deleted in the buffer
*/
public void deleteMessage(int messageIndex){
synchronized (bufLock) {
buffer.remove(messageIndex);
}
}
/**
* Returns the number of messages currently in the buffer
* @return The count of messages in the buffer
*/
public int getMessageCount() {
synchronized (bufLock) {
return buffer.size();
}
}
/**
* Flushes the buffer of messages into an open connection
*/
public void run() {
final String methodName = "run";
// @TRACE 516=Restoring all buffered messages.
log.fine(CLASS_NAME, methodName, "516");
while(getMessageCount() > 0){
try {
BufferedMessage bufferedMessage = getMessage(0);
callback.publishBufferedMessage(bufferedMessage);
// Publish was successful, remove message from buffer.
deleteMessage(0);
} catch (MqttException ex) {
if (ex.getReasonCode() == MqttClientException.REASON_CODE_MAX_INFLIGHT) {
// If we get the max_inflight condition, try again after a short
// interval to allow more messages to be completely sent.
try { Thread.sleep(100); } catch (Exception e) {}
} else {
// Error occurred attempting to publish buffered message likely because the client is not connected
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}.
log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
break;
}
}
}
}
public void setPublishCallback(IDisconnectedBufferCallback callback) {
this.callback = callback;
}
public boolean isPersistBuffer(){
return bufferOpts.isPersistBuffer();
}
}

View File

@@ -0,0 +1,63 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSecurityException;
/**
* Utility class to help create exceptions of the correct type.
*/
public class ExceptionHelper {
public static MqttException createMqttException(int reasonCode) {
if ((reasonCode == MqttClientException.REASON_CODE_FAILED_AUTHENTICATION) ||
(reasonCode == MqttClientException.REASON_CODE_NOT_AUTHORIZED)) {
return new MqttSecurityException(reasonCode);
}
return new MqttException(reasonCode);
}
public static MqttException createMqttException(Throwable cause) {
if (cause.getClass().getName().equals("java.security.GeneralSecurityException")) {
return new MqttSecurityException(cause);
}
return new MqttException(cause);
}
/**
* Returns whether or not the specified class is available to the current
* class loader. This is used to protect the code against using Java SE
* APIs on Java ME.
* @param className The name of the class
* @return If true, the class is available
*/
public static boolean isClassAvailable(String className) {
boolean result = false;
try {
Class.forName(className);
result = true;
}
catch (ClassNotFoundException ex) {
}
return result;
}
// Utility classes should not have a public or default constructor.
private ExceptionHelper() {
}
}

View File

@@ -0,0 +1,96 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
/**
* FileLock - used to obtain a lock that can be used to prevent other MQTT clients
* using the same persistent store. If the lock is already held then an exception
* is thrown.
*
* Some Java runtimes such as JME MIDP do not support file locking or even
* the Java classes that support locking. The class is coded to both compile
* and work on all Java runtimes. In Java runtimes that do not support
* locking it will look as though a lock has been obtained but in reality
* no lock has been obtained.
*/
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
public class FileLock {
private File lockFile;
private RandomAccessFile file;
private Object fileLock;
/**
* Creates an NIO FileLock on the specified file if on a suitable Java runtime.
* @param clientDir the a File of the directory to contain the lock file.
* @param lockFilename name of the the file to lock
* @throws Exception if the lock could not be obtained for any reason
*/
public FileLock(File clientDir, String lockFilename) throws Exception {
// Create a file to obtain a lock on.
lockFile = new File(clientDir,lockFilename);
if (ExceptionHelper.isClassAvailable("java.nio.channels.FileLock")) {
try {
this.file = new RandomAccessFile(lockFile,"rw");
Method m = file.getClass().getMethod("getChannel",new Class[]{});
Object channel = m.invoke(file,new Object[]{});
m = channel.getClass().getMethod("tryLock",new Class[]{});
this.fileLock = m.invoke(channel, new Object[]{});
} catch(NoSuchMethodException nsme) {
this.fileLock = null;
} catch(IllegalArgumentException iae) {
this.fileLock = null;
} catch(IllegalAccessException iae) {
this.fileLock = null;
}
if (fileLock == null) {
// Lock not obtained
release();
throw new Exception("Problem obtaining file lock");
}
}
}
/**
* Releases the lock.
*/
public void release() {
try {
if (fileLock != null) {
Method m = fileLock.getClass().getMethod("release",new Class[]{});
m.invoke(fileLock, new Object[]{});
fileLock = null;
}
} catch (Exception e) {
// Ignore exceptions
}
if (file != null) {
try {
file.close();
} catch (IOException e) {
}
file = null;
}
if (lockFile != null && lockFile.exists()) {
lockFile.delete();
}
lockFile = null;
}
}

View File

@@ -0,0 +1,25 @@
/*******************************************************************************
* Copyright (c) 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* James Sutton - Initial Contribution for Automatic Reconnect & Offline Buffering
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.client.BufferedMessage;
import org.eclipse.paho.mqttv5.common.MqttException;
public interface IDisconnectedBufferCallback {
void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException;
}

View File

@@ -0,0 +1,46 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
/**
* Catalog of human readable error messages.
*/
public abstract class MessageCatalog {
private static MessageCatalog INSTANCE = null;
public static final String getMessage(int id) {
if (INSTANCE == null) {
if (ExceptionHelper.isClassAvailable("java.util.ResourceBundle")) {
try {
// Hide this class reference behind reflection so that the class does not need to
// be present when compiled on midp
INSTANCE = (MessageCatalog)Class.forName("org.eclipse.paho.mqttv5.client.internal.ResourceBundleCatalog").newInstance();
} catch (Exception e) {
return "";
}
} else if (ExceptionHelper.isClassAvailable("org.eclipse.paho.mqttv5.client.internal.MIDPCatalog")){
try {
INSTANCE = (MessageCatalog)Class.forName("org.eclipse.paho.mqttv5.client.internal.MIDPCatalog").newInstance();
} catch (Exception e) {
return "";
}
}
}
return INSTANCE.getLocalizedMessage(id);
}
protected abstract String getLocalizedMessage(int id);
}

View File

@@ -0,0 +1,185 @@
package org.eclipse.paho.mqttv5.client.internal;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is used as a store for client information that should be preserved
* for a single connection.
* Properties returned in subsequent connect packets will override existing properties
* here as well.
*
* Connection variables that this class holds:
*
* <ul>
* <li>Receive Maximum</li>
* <li>Maximum QoS</li>
* <li>Retain Available</li>
* <li>Maximum Packet Size</li>
* <li>Outgoing Topic Alias Maximum</li>
* <li>Incoming Topic Alias Maximum</li>
* <li>Wildcard Subscriptions Available</li>
* <li>Subscription Identifiers Available</li>
* <li>Shared Subscriptions Available</li>
* <li>Send Reason Messages</li>
* </ul>
*/
public class MqttConnectionState {
// ******* Connection properties ******//
private Integer receiveMaximum = 65535;
private Integer maximumQoS = 2;
private Boolean retainAvailable = true;
private Long outgoingMaximumPacketSize = null;
private Long incomingMaximumPacketSize = null;
private Integer outgoingTopicAliasMaximum = 0;
private Integer incomingTopicAliasMax = 0;
private Boolean wildcardSubscriptionsAvailable = true;
private Boolean subscriptionIdentifiersAvailable = true;
private Boolean sharedSubscriptionsAvailable = true;
private boolean sendReasonMessages = false;
private long keepAlive = 60;
private String clientId = "";
// ******* Counters ******//
private AtomicInteger nextOutgoingTopicAlias = new AtomicInteger(1);
public MqttConnectionState(String clientId) {
this.clientId = clientId;
}
public String getClientId() {
return clientId;
}
/**
* Clears the session and resets. This would be called when the connection has
* been lost and cleanStart = True.
*/
public void clearConnectionState() {
nextOutgoingTopicAlias.set(1);
}
public Integer getReceiveMaximum() {
if (receiveMaximum == null) {
return 65535;
}
return receiveMaximum;
}
public void setReceiveMaximum(Integer receiveMaximum) {
if (receiveMaximum != null)
this.receiveMaximum = receiveMaximum;
}
public Integer getMaximumQoS() {
return maximumQoS;
}
public void setMaximumQoS(Integer maximumQoS) {
if (maximumQoS != null)
this.maximumQoS = maximumQoS;
}
public Boolean isRetainAvailable() {
return retainAvailable;
}
public void setRetainAvailable(Boolean retainAvailable) {
if (retainAvailable != null)
this.retainAvailable = retainAvailable;
}
public Long getOutgoingMaximumPacketSize() {
return outgoingMaximumPacketSize;
}
public void setOutgoingMaximumPacketSize(Long maximumPacketSize) {
if (maximumPacketSize != null)
this.outgoingMaximumPacketSize = maximumPacketSize;
}
public Long getIncomingMaximumPacketSize() {
return incomingMaximumPacketSize;
}
public void setIncomingMaximumPacketSize(Long incomingMaximumPacketSize) {
if (incomingMaximumPacketSize != null)
this.incomingMaximumPacketSize = incomingMaximumPacketSize;
}
public Integer getOutgoingTopicAliasMaximum() {
return outgoingTopicAliasMaximum;
}
public void setOutgoingTopicAliasMaximum(Integer topicAliasMaximum) {
if (topicAliasMaximum != null)
this.outgoingTopicAliasMaximum = topicAliasMaximum;
}
public Boolean isWildcardSubscriptionsAvailable() {
return wildcardSubscriptionsAvailable;
}
public void setWildcardSubscriptionsAvailable(Boolean wildcardSubscriptionsAvailable) {
if (wildcardSubscriptionsAvailable != null)
this.wildcardSubscriptionsAvailable = wildcardSubscriptionsAvailable;
}
public Boolean isSubscriptionIdentifiersAvailable() {
return subscriptionIdentifiersAvailable;
}
public void setSubscriptionIdentifiersAvailable(Boolean subscriptionIdentifiersAvailable) {
if (subscriptionIdentifiersAvailable != null)
this.subscriptionIdentifiersAvailable = subscriptionIdentifiersAvailable;
}
public Boolean isSharedSubscriptionsAvailable() {
return sharedSubscriptionsAvailable;
}
public void setSharedSubscriptionsAvailable(Boolean sharedSubscriptionsAvailable) {
if (sharedSubscriptionsAvailable != null)
this.sharedSubscriptionsAvailable = sharedSubscriptionsAvailable;
}
public Integer getNextOutgoingTopicAlias() {
return nextOutgoingTopicAlias.getAndIncrement();
}
public Integer getIncomingTopicAliasMax() {
return incomingTopicAliasMax;
}
public void setIncomingTopicAliasMax(Integer incomingTopicAliasMax) {
if (incomingTopicAliasMax != null)
this.incomingTopicAliasMax = incomingTopicAliasMax;
}
public boolean isSendReasonMessages() {
return sendReasonMessages;
}
public void setSendReasonMessages(boolean enableReasonMessages) {
this.sendReasonMessages = enableReasonMessages;
}
public long getKeepAlive() {
return keepAlive;
}
public void setKeepAliveSeconds(long keepAlive) {
this.keepAlive = keepAlive * 1000;
}
}

View File

@@ -0,0 +1,98 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.common.MqttPersistable;
public class MqttPersistentData implements MqttPersistable {
// Message key
private String key = null;
// Message header
private byte[] header = null;
private int hOffset = 0;
private int hLength = 0;
// Message payload
private byte[] payload = null;
private int pOffset = 0;
private int pLength = 0;
/**
* Construct a data object to pass across the MQTT client persistence interface.
*
* When this Object is passed to the persistence implementation the key is
* used by the client to identify the persisted data to which further
* update or deletion requests are targeted.<BR>
* When this Object is created for returning to the client when it is
* recovering its state from persistence the key is not required to be set.
* The client can determine the key from the data.
* @param key The key which identifies this data
* @param header The message header
* @param hOffset The start offset of the header bytes in header.
* @param hLength The length of the header in the header bytes array.
* @param payload The message payload
* @param pOffset The start offset of the payload bytes in payload.
* @param pLength The length of the payload in the payload bytes array
* when persisting the message.
*/
public MqttPersistentData( String key,
byte[] header,
int hOffset,
int hLength,
byte[] payload,
int pOffset,
int pLength) {
this.key = key;
this.header = header;
this.hOffset = hOffset;
this.hLength = hLength;
this.payload = payload;
this.pOffset = pOffset;
this.pLength = pLength;
}
public String getKey() {
return key;
}
public byte[] getHeaderBytes() {
return header;
}
public int getHeaderLength() {
return hLength;
}
public int getHeaderOffset() {
return hOffset;
}
public byte[] getPayloadBytes() {
return payload;
}
public int getPayloadLength() {
if ( payload == null ) {
return 0;
}
return pLength;
}
public int getPayloadOffset() {
return pOffset;
}
}

View File

@@ -0,0 +1,39 @@
package org.eclipse.paho.mqttv5.client.internal;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is used as a store for client information that should be preserved
* for a single MQTT Session. If the client is disconnected and reconnects with
* clean start = true, then this object will be reset to it's initial state.
*
* Connection variables that this class holds:
*
* <ul>
* <li>Client ID</li>
* <li>Next Subscription Identifier - The next subscription Identifier available
* to use.</li>
* </ul>
*/
public class MqttSessionState {
// ******* Session Specific Properties and counters ******//
private AtomicInteger nextSubscriptionIdentifier = new AtomicInteger(1);
private String clientId;
public void clearSessionState() {
nextSubscriptionIdentifier.set(1);
}
public Integer getNextSubscriptionIdentifier() {
return nextSubscriptionIdentifier.getAndIncrement();
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
}

View File

@@ -0,0 +1,114 @@
/*******************************************************************************
* Copyright (c) 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - Original MQTTv3 implementation
* James Sutton - Initial MQTTv5 implementation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.util.Properties;
import java.util.Vector;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttToken;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
public interface MqttState {
/**
* Submits a message for delivery. This method will block until there is
* room in the inFlightWindow for the message. The message is put into
* persistence before returning.
*
* @param message the message to send
* @param token the token that can be used to track delivery of the message
* @throws MqttException if an exception occurs whilst sending the message
*/
void send(MqttWireMessage message, MqttToken token) throws MqttException;
/**
* Persists a buffered message to the persistence layer
*
* @param message The {@link MqttWireMessage} to persist
*/
void persistBufferedMessage(MqttWireMessage message);
/**
* @param message The {@link MqttWireMessage} to un-persist
*/
void unPersistBufferedMessage(MqttWireMessage message);
/**
* Check and send a ping if needed and check for ping timeout.
* Need to send a ping if nothing has been sent or received
* in the last keepalive interval. It is important to check for
* both sent and received packets in order to catch the case where an
* app is solely sending QoS 0 messages or receiving QoS 0 messages.
* QoS 0 message are not good enough for checking a connection is
* alive as they are one way messages.
*
* If a ping has been sent but no data has been received in the
* last keepalive interval then the connection is deamed to be broken.
* @param pingCallback The {@link MqttActionListener} to be called
* @return token of ping command, null if no ping command has been sent.
* @throws MqttException if an exception occurs during the Ping
*/
MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttException;
void notifySentBytes(int sentBytesCount);
void notifyReceivedBytes(int receivedBytesCount);
/**
* Called when the client has successfully connected to the broker
*/
void connected();
/**
* Called during shutdown to work out if there are any tokens still
* to be notified and waiters to be unblocked. Notifying and unblocking
* takes place after most shutdown processing has completed. The tokenstore
* is tidied up so it only contains outstanding delivery tokens which are
* valid after reconnect (if clean session is false)
* @param reason The root cause of the disconnection, or null if it is a clean disconnect
* @return {@link Vector}
*/
Vector<MqttToken> resolveOldTokens(MqttException reason);
/**
* Called when the client has been disconnected from the broker.
* @param reason The root cause of the disconnection, or null if it is a clean disconnect
*/
void disconnected(MqttException reason);
/**
* Quiesce the client state, preventing any new messages getting sent,
* and preventing the callback on any newly received messages.
* After the timeout expires, delete any pending messages except for
* outbound ACKs, and wait for those ACKs to complete.
* @param timeout How long to wait during Quiescing
*/
void quiesce(long timeout);
void notifyQueueLock();
int getActualInFlight();
Properties getDebug();
Long getOutgoingMaximumPacketSize();
Long getIncomingMaximumPacketSize();
}

View File

@@ -0,0 +1,35 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.eclipse.paho.mqttv5.common.MqttException;
public interface NetworkModule {
void start() throws IOException, MqttException;
InputStream getInputStream() throws IOException;
OutputStream getOutputStream() throws IOException;
void stop() throws IOException;
String getServerURI();
}

View File

@@ -0,0 +1,158 @@
/*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.client.spi.NetworkModuleFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
/**
* The NetworkModuleService uses the installed {@link NetworkModuleFactory}s to create {@link NetworkModule} instances.
* <p>
* The selection of the appropriate NetworkModuleFactory is based on the URI scheme.
*
* @author Maik Scheibler
*/
public class NetworkModuleService {
private static Logger LOG = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,
NetworkModuleService.class.getSimpleName());
private static final ServiceLoader<NetworkModuleFactory> FACTORY_SERVICE_LOADER = ServiceLoader.load(
NetworkModuleFactory.class, NetworkModuleService.class.getClassLoader());
/** Pattern to match URI authority parts: {@code authority = [userinfo"@"]host[":"port]} */
private static final Pattern AUTHORITY_PATTERN = Pattern.compile("((.+)@)?([^:]*)(:(\\d+))?");
private static final int AUTH_GROUP_USERINFO = 2;
private static final int AUTH_GROUP_HOST = 3;
private static final int AUTH_GROUP_PORT = 5;
private NetworkModuleService() {
// no instances
}
/**
* Validates the provided URI to be valid and that a NetworkModule is installed to serve it.
*
* @param brokerUri to be validated
* @throws IllegalArgumentException is case the URI is invalid or there is no {@link NetworkModule} installed for
* the URI scheme
*/
public static void validateURI(String brokerUri) throws IllegalArgumentException {
try {
URI uri = new URI(brokerUri);
String scheme = uri.getScheme();
if (scheme == null || scheme.isEmpty()) {
throw new IllegalArgumentException("missing scheme in broker URI: " + brokerUri);
}
scheme = scheme.toLowerCase();
synchronized (FACTORY_SERVICE_LOADER) {
for (NetworkModuleFactory factory : FACTORY_SERVICE_LOADER) {
if (factory.getSupportedUriSchemes().contains(scheme)) {
factory.validateURI(uri);
return;
}
}
}
throw new IllegalArgumentException("no NetworkModule installed for scheme \"" + scheme
+ "\" of URI \"" + brokerUri + "\"");
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Can't parse string to URI \"" + brokerUri + "\"", e);
}
}
/**
* Creates a {@link NetworkModule} instance for the provided address, using the given options.
*
* @param address must be a valid URI
* @param options used to initialize the NetworkModule
* @param clientId a client identifier that is unique on the server being connected to
* @return a new NetworkModule instance
* @throws MqttException if the initialization fails
* @throws IllegalArgumentException if the provided {@code address} is invalid
*/
public static NetworkModule createInstance(String address, MqttConnectionOptions options, String clientId)
throws MqttException, IllegalArgumentException
{
try {
URI brokerUri = new URI(address);
applyRFC3986AuthorityPatch(brokerUri);
String scheme = brokerUri.getScheme().toLowerCase();
synchronized (FACTORY_SERVICE_LOADER) {
for (NetworkModuleFactory factory : FACTORY_SERVICE_LOADER) {
if (factory.getSupportedUriSchemes().contains(scheme)) {
return factory.createNetworkModule(brokerUri, options, clientId);
}
}
}
/*
* To throw an IllegalArgumentException exception matches the previous behavior of
* MqttConnectOptions.validateURI(String), but it would be nice to provide something more meaningful.
*/
throw new IllegalArgumentException(brokerUri.toString());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(address, e);
}
}
/**
* Java does URI parsing according to RFC2396 and thus hostnames are limited to alphanumeric characters and '-'.
* But the current &quot;Uniform Resource Identifier (URI): Generic Syntax&quot; (RFC3986) allows for a much wider
* range of valid characters. This causes Java to fail parsing the authority part and thus the user-info, host and
* port will not be set on an URI which does not conform to RFC2396.
* <p>
* This workaround tries to detect such a parsing failure and does tokenize the authority parts according to
* RFC3986, but does not enforce any character restrictions (for sake of simplicity).
*
* @param toPatch
* @see https://tools.ietf.org/html/rfc3986#section-3.2
*/
static void applyRFC3986AuthorityPatch(URI toPatch) {
if (toPatch == null
|| toPatch.getHost() != null // already successfully parsed
|| toPatch.getAuthority() == null
|| toPatch.getAuthority().isEmpty())
{
return;
}
Matcher matcher = AUTHORITY_PATTERN.matcher(toPatch.getAuthority());
if (matcher.find()) {
setURIField(toPatch, "userInfo", matcher.group(AUTH_GROUP_USERINFO));
setURIField(toPatch, "host", matcher.group(AUTH_GROUP_HOST));
String portString = matcher.group(AUTH_GROUP_PORT);
setURIField(toPatch, "port", portString != null ? Integer.parseInt(portString) : -1);
}
}
/**
* Reflective manipulation of a URI field to work around the URI parser, because all fields are validated even on
* the full qualified URI constructor.
*
* @see URI#URI(String, String, String, int, String, String, String)
*/
private static void setURIField(URI toManipulate, String fieldName, Object newValue) {
try {
Field field = URI.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(toManipulate, newValue);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
LOG.warning(NetworkModuleService.class.getName(), "setURIField", "115", new Object[] {
toManipulate.toString() }, e);
}
}
}

View File

@@ -0,0 +1,37 @@
/*******************************************************************************
* Copyright (c) 2009, 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.util.MissingResourceException;
import java.util.ResourceBundle;
public class ResourceBundleCatalog extends MessageCatalog {
private ResourceBundle bundle;
public ResourceBundleCatalog() {
//MAY throws MissingResourceException
bundle = ResourceBundle.getBundle("org.eclipse.paho.mqttv5.client.internal.nls.messages");
}
protected String getLocalizedMessage(int id) {
try {
return bundle.getString(Integer.toString(id));
} catch(MissingResourceException mre) {
return "MqttException";
}
}
}

View File

@@ -0,0 +1,161 @@
/*******************************************************************************
* Copyright (c) 2009, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
/**
* A network module for connecting over SSL.
*/
public class SSLNetworkModule extends TCPNetworkModule {
private static final String CLASS_NAME = SSLNetworkModule.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private String[] enabledCiphers;
private int handshakeTimeoutSecs;
private HostnameVerifier hostnameVerifier;
private boolean httpsHostnameVerificationEnabled = true;
private String host;
private int port;
/**
* Constructs a new SSLNetworkModule using the specified host and port. The
* supplied SSLSocketFactory is used to supply the network socket.
*
* @param factory
* the {@link SSLSocketFactory} to be used in this SSLNetworkModule
* @param host
* the Hostname of the Server
* @param port
* the Port of the Server
* @param resourceContext
* Resource Context
*/
public SSLNetworkModule(SSLSocketFactory factory, String host, int port, String resourceContext) {
super(factory, host, port, resourceContext);
this.host = host;
this.port = port;
log.setResourceName(resourceContext);
}
/**
* Returns the enabled cipher suites.
*
* @return a string array of enabled Cipher suites
*/
public String[] getEnabledCiphers() {
return enabledCiphers;
}
/**
* Sets the enabled cipher suites on the underlying network socket.
*
* @param enabledCiphers
* a String array of cipher suites to enable
*/
public void setEnabledCiphers(String[] enabledCiphers) {
final String methodName = "setEnabledCiphers";
this.enabledCiphers = enabledCiphers;
if ((socket != null) && (enabledCiphers != null)) {
if (log.isLoggable(Logger.FINE)) {
String ciphers = "";
for (int i = 0; i < enabledCiphers.length; i++) {
if (i > 0) {
ciphers += ",";
}
ciphers += enabledCiphers[i];
}
// @TRACE 260=setEnabledCiphers ciphers={0}
log.fine(CLASS_NAME, methodName, "260", new Object[] { ciphers });
}
((SSLSocket) socket).setEnabledCipherSuites(enabledCiphers);
}
}
public void setSSLhandshakeTimeout(int timeout) {
super.setConnectTimeout(timeout);
this.handshakeTimeoutSecs = timeout;
}
public HostnameVerifier getSSLHostnameVerifier() {
return hostnameVerifier;
}
public void setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
this.hostnameVerifier = hostnameVerifier;
}
public boolean isHttpsHostnameVerificationEnabled() {
return httpsHostnameVerificationEnabled;
}
public void setHttpsHostnameVerificationEnabled(boolean httpsHostnameVerificationEnabled) {
this.httpsHostnameVerificationEnabled = httpsHostnameVerificationEnabled;
}
public void start() throws IOException, MqttException {
super.start();
setEnabledCiphers(enabledCiphers);
int soTimeout = socket.getSoTimeout();
// RTC 765: Set a timeout to avoid the SSL handshake being blocked indefinitely
socket.setSoTimeout(this.handshakeTimeoutSecs * 1000);
// SNI support. Should be automatic under some circumstances - not all, apparently
SSLParameters sslParameters = ((SSLSocket)socket).getSSLParameters();
List<SNIServerName> sniHostNames = new ArrayList<SNIServerName>(1);
sniHostNames.add(new SNIHostName(host));
sslParameters.setServerNames(sniHostNames);
// If default Hostname verification is enabled, use the same method that is used with HTTPS
if (this.httpsHostnameVerificationEnabled) {
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
}
((SSLSocket) socket).setSSLParameters(sslParameters);
((SSLSocket) socket).startHandshake();
if (hostnameVerifier != null) {
SSLSession session = ((SSLSocket) socket).getSession();
if(!hostnameVerifier.verify(host, session)) {
session.invalidate();
socket.close();
throw new SSLPeerUnverifiedException("Host: " + host + ", Peer Host: " + session.getPeerHost());
}
}
// reset timeout to default value
socket.setSoTimeout(soTimeout);
}
public String getServerURI() {
return "ssl://" + host + ":" + port;
}
}

View File

@@ -0,0 +1,88 @@
/*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.security.SSLSocketFactoryFactory;
import org.eclipse.paho.mqttv5.client.spi.NetworkModuleFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
public class SSLNetworkModuleFactory implements NetworkModuleFactory {
@Override
public Set<String> getSupportedUriSchemes() {
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList("ssl")));
}
@Override
public void validateURI(URI brokerUri) throws IllegalArgumentException {
String path = brokerUri.getPath();
if (path != null && !path.isEmpty()) {
throw new IllegalArgumentException(brokerUri.toString());
}
}
@Override
public NetworkModule createNetworkModule(URI brokerUri, MqttConnectionOptions options, String clientId)
throws MqttException
{
String host = brokerUri.getHost();
int port = brokerUri.getPort(); // -1 if not defined
if (port == -1) {
port = 8883;
}
String path = brokerUri.getPath();
if (path != null && !path.isEmpty()) {
throw new IllegalArgumentException(brokerUri.toString());
}
SocketFactory factory = options.getSocketFactory();
SSLSocketFactoryFactory factoryFactory = null;
if (factory == null) {
// try {
factoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps) {
factoryFactory.initialize(sslClientProps, null);
}
factory = factoryFactory.createSocketFactory(null);
// }
// catch (MqttDirectException ex) {
// throw ExceptionHelper.createMqttException(ex.getCause());
// }
} else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
// Create the network module...
SSLNetworkModule netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
netModule.setSSLhandshakeTimeout(options.getConnectionTimeout());
netModule.setSSLHostnameVerifier(options.getSSLHostnameVerifier());
netModule.setHttpsHostnameVerificationEnabled(options.isHttpsHostnameVerificationEnabled());
// Ciphers suites need to be set, if they are available
if (factoryFactory != null) {
String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
netModule.setEnabledCiphers(enabledCiphers);
}
}
return netModule;
}
}

View File

@@ -0,0 +1,133 @@
/*******************************************************************************
* Copyright (c) 2009, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Dave Locke - initial API and implementation and/or initial documentation
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import javax.net.SocketFactory;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
/**
* A network module for connecting over TCP.
*/
public class TCPNetworkModule implements NetworkModule {
private static final String CLASS_NAME = TCPNetworkModule.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
protected Socket socket;
private SocketFactory factory;
private String host;
private int port;
private int conTimeout;
/**
* Constructs a new TCPNetworkModule using the specified host and
* port. The supplied SocketFactory is used to supply the network
* socket.
* @param factory the {@link SocketFactory} to be used to set up this connection
* @param host The server hostname
* @param port The server port
* @param resourceContext The Resource Context
*/
public TCPNetworkModule(SocketFactory factory, String host, int port, String resourceContext) {
log.setResourceName(resourceContext);
this.factory = factory;
this.host = host;
this.port = port;
}
/**
* Starts the module, by creating a TCP socket to the server.
* @throws IOException if there is an error creating the socket
* @throws MqttException if there is an error connecting to the server
*/
public void start() throws IOException, MqttException {
final String methodName = "start";
try {
// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, Integer.valueOf(port), Long.valueOf(conTimeout*1000)});
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
socket.setSoTimeout(1000);
}
catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttClientException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}
}
public InputStream getInputStream() throws IOException {
return socket.getInputStream();
}
public OutputStream getOutputStream() throws IOException {
return socket.getOutputStream();
}
/**
* Stops the module, by closing the TCP socket.
* @throws IOException if there is an error closing the socket
*/
public void stop() throws IOException {
if (socket != null) {
// CDA: an attempt is made to stop the receiver cleanly before closing the socket.
// If the socket is forcibly closed too early, the blocking socket read in
// the receiver thread throws a SocketException.
// While this causes the receiver thread to exit, it also invalidates the
// SSL session preventing to perform an accelerated SSL handshake in the
// next connection.
//
// Also note that due to the blocking socket reads in the receiver thread,
// it's not possible to interrupt the thread. Using non blocking reads in
// combination with a socket timeout (see setSoTimeout()) would be a better approach.
//
// Please note that the Javadoc only says that an EOF is returned on
// subsequent reads of the socket stream.
// Anyway, at least with Oracle Java SE 7 on Linux systems, this causes a blocked read
// to return EOF immediately.
// This workaround should not cause any harm in general but you might
// want to move it in SSLNetworkModule.
socket.shutdownInput();
socket.close();
}
}
/**
* Set the maximum time to wait for a socket to be established
* @param timeout The connection timeout
*/
public void setConnectTimeout(int timeout) {
this.conTimeout = timeout;
}
public String getServerURI() {
return "tcp://" + host + ":" + port;
}
}

View File

@@ -0,0 +1,64 @@
/*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*/
package org.eclipse.paho.mqttv5.client.internal;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.spi.NetworkModuleFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
public class TCPNetworkModuleFactory implements NetworkModuleFactory {
@Override
public Set<String> getSupportedUriSchemes() {
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList("tcp")));
}
@Override
public void validateURI(URI brokerUri) throws IllegalArgumentException {
String path = brokerUri.getPath();
if (path != null && !path.isEmpty()) {
throw new IllegalArgumentException("URI path must be empty \"" + brokerUri.toString() + "\"");
}
}
@Override
public NetworkModule createNetworkModule(URI brokerUri, MqttConnectionOptions options, String clientId)
throws MqttException
{
String host = brokerUri.getHost();
int port = brokerUri.getPort(); // -1 if not defined
if (port == -1) {
port = 1883;
}
String path = brokerUri.getPath();
if (path != null && !path.isEmpty()) {
throw new IllegalArgumentException(brokerUri.toString());
}
SocketFactory factory = options.getSocketFactory();
if (factory == null) {
factory = SocketFactory.getDefault();
} else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
TCPNetworkModule networkModule = new TCPNetworkModule(factory, host, port, clientId);
networkModule.setConnectTimeout(options.getConnectionTimeout());
return networkModule;
}
}

View File

@@ -0,0 +1,516 @@
/*******************************************************************************
* Copyright (c) 2014, 2019 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0
* and the Eclipse Distribution License is available at
* https://www.eclipse.org/org/documents/edl-v10.php
*
* Contributors:
* Ian Craggs - MQTT 3.1.1 support
*/
package org.eclipse.paho.mqttv5.client.internal;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttClientException;
import org.eclipse.paho.mqttv5.client.MqttClientInterface;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttAck;
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck;
import org.eclipse.paho.mqttv5.common.packet.MqttPubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttPubComp;
import org.eclipse.paho.mqttv5.common.packet.MqttPubRec;
import org.eclipse.paho.mqttv5.common.packet.MqttPubRel;
import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttUnsubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
public class Token {
private static final String CLASS_NAME = Token.class.getName();
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
private volatile boolean completed = false;
private boolean pendingComplete = false;
private boolean sent = false;
private final Object responseLock = new Object();
private final Object sentLock = new Object();
protected MqttMessage message = null;
private MqttWireMessage response = null;
private MqttWireMessage request = null;
private MqttException exception = null;
private String[] topics = null;
private String key;
private MqttClientInterface client = null;
private MqttActionListener callback = null;
private Object userContext = null;
private int messageID = 0;
private boolean notified = false;
private int[] reasonCodes = null;
private boolean deliveryToken = false;
public Token(String logContext) {
log.setResourceName(logContext);
}
public int getMessageID() {
return messageID;
}
public void setMessageID(int messageID) {
this.messageID = messageID;
}
public void setDeliveryToken(boolean deliveryToken) {
this.deliveryToken = deliveryToken;
}
public boolean isDeliveryToken() {
return this.deliveryToken;
}
public boolean checkResult() throws MqttException {
if (getException() != null) {
throw getException();
}
return true;
}
public MqttException getException() {
return exception;
}
public boolean isComplete() {
return completed;
}
protected boolean isCompletePending() {
return pendingComplete;
}
protected boolean isInUse() {
return (getClient() != null && !isComplete());
}
public void setActionCallback(MqttActionListener listener) {
this.callback = listener;
}
public MqttActionListener getActionCallback() {
return callback;
}
public void waitForCompletion() throws MqttException {
waitForCompletion(-1);
}
public void waitForCompletion(long timeout) throws MqttException {
final String methodName = "waitForCompletion";
// @TRACE 407=key={0} wait max={1} token={2}
log.fine(CLASS_NAME, methodName, "407", new Object[] { getKey(), Long.valueOf(timeout), this });
MqttWireMessage resp = null;
try {
resp = waitForResponse(timeout);
} catch (MqttException e) {
if (e.getReasonCode() != MqttClientException.REASON_CODE_CLIENT_DISCONNECTING /*||
message.getId() != MqttWireMessage.MESSAGE_TYPE_DISCONNECT*/) {
throw e;
}
}
if (resp == null && !completed) {
// @TRACE 406=key={0} timed out token={1}
log.fine(CLASS_NAME, methodName, "406", new Object[] { getKey(), this });
exception = new MqttException(MqttClientException.REASON_CODE_CLIENT_TIMEOUT);
throw exception;
}
checkResult();
}
/**
* Waits for the message delivery to complete, but doesn't throw an exception in
* the case of a NACK. It does still throw an exception if something else goes
* wrong (e.g. an IOException). This is used for packets like CONNECT, which
* have useful information in the ACK that needs to be accessed.
*
* @return the {@link MqttWireMessage}
* @throws MqttException
* if there is an error whilst waiting for the response
*/
protected MqttWireMessage waitForResponse() throws MqttException {
return waitForResponse(-1);
}
protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
final String methodName = "waitForResponse";
synchronized (responseLock) {
// @TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4}
// response={5} token={6}
log.fine(
CLASS_NAME, methodName, "400", new Object[] { getKey(), Long.valueOf(timeout), Boolean.valueOf(sent),
Boolean.valueOf(completed), (exception == null) ? "false" : "true", response, this },
exception);
while (!this.completed) {
if (this.exception == null) {
try {
// @TRACE 408=key={0} wait max={1}
log.fine(CLASS_NAME, methodName, "408", new Object[] { getKey(), Long.valueOf(timeout) });
if (timeout <= 0) {
responseLock.wait();
} else {
responseLock.wait(timeout);
}
} catch (InterruptedException e) {
exception = new MqttException(e);
}
}
if (!this.completed) {
if (this.exception != null) {
// @TRACE 401=failed with exception
log.fine(CLASS_NAME, methodName, "401", null, exception);
throw exception;
}
if (timeout > 0) {
// time up and still not completed
break;
}
}
}
}
// @TRACE 402=key={0} response={1}
log.fine(CLASS_NAME, methodName, "402", new Object[] { getKey(), this.response });
return this.response;
}
/**
* Update the token with any new reason codes, currently only used for PubRec
*
* @param msg
* response message.
* @param ex
* if there was a problem store the exception in the token.
*/
protected void update(MqttWireMessage msg, MqttException ex) {
final String methodName = "markComplete";
// @TRACE 411=>key={0} response={1} excep={2}
log.fine(CLASS_NAME, methodName, "411", new Object[] { getKey(), msg, ex });
synchronized (responseLock){
if(msg instanceof MqttPubRec) {
if(msg.getReasonCodes() != null) {
updateReasonCodes(msg.getReasonCodes());
}
}
}
}
/**
* Updates the reasonCodes Array and extends it with any new reason codes.
* This allows message flows like Qos 2 to combine reason codes together across multiple messages e.g. PubRec and PubComp
* @param newReasonCodes - The additional Reason Codes.
*/
protected void updateReasonCodes(int[] newReasonCodes) {
if(this.reasonCodes == null) {
this.reasonCodes = newReasonCodes;
} else {
int[] updatedReasonCodes = new int[this.reasonCodes.length + newReasonCodes.length];
System.arraycopy(this.reasonCodes, 0, updatedReasonCodes, 0, this.reasonCodes.length);
System.arraycopy(newReasonCodes, 0, updatedReasonCodes, this.reasonCodes.length, newReasonCodes.length);
this.reasonCodes = updatedReasonCodes;
}
}
/**
* Mark the token as complete and ready for users to be notified.
*
* @param msg
* response message. Optional - there are no response messages for
* some flows
* @param ex
* if there was a problem store the exception in the token.
*/
protected void markComplete(MqttWireMessage msg, MqttException ex) {
final String methodName = "markComplete";
// @TRACE 404=>key={0} response={1} excep={2}
log.fine(CLASS_NAME, methodName, "404", new Object[] { getKey(), msg, ex });
synchronized (responseLock) {
// If reason codes are available, store them here.
if (msg instanceof MqttPubAck || msg instanceof MqttPubComp || msg instanceof MqttPubRec
|| msg instanceof MqttPubRel || msg instanceof MqttSubAck || msg instanceof MqttUnsubAck) {
if (msg.getReasonCodes() != null) {
updateReasonCodes(msg.getReasonCodes());
}
}
// ACK means that everything was OK, so mark the message for garbage collection.
if (msg instanceof MqttAck) {
this.message = null;
}
this.pendingComplete = true;
this.response = msg;
this.exception = ex;
}
}
/**
* Notifies this token that a response message (an ACK or NACK) has been
* received.
*/
protected void notifyComplete() {
final String methodName = "notifyComplete";
// @TRACE 411=>key={0} response={1} excep={2}
log.fine(CLASS_NAME, methodName, "404", new Object[] { getKey(), this.response, this.exception });
synchronized (responseLock) {
// If pending complete is set then normally the token can be marked
// as complete and users notified. An abnormal error may have
// caused the client to shutdown beween pending complete being set
// and notifying the user. In this case - the action must be failed.
if (exception == null && pendingComplete) {
completed = true;
pendingComplete = false;
} else {
pendingComplete = false;
}
responseLock.notifyAll();
}
synchronized (sentLock) {
sent = true;
sentLock.notifyAll();
}
}
// /**
// * Notifies this token that an exception has occurred. This is only
// * used for things like IOException, and not for MQTT NACKs.
// */
// protected void notifyException() {
// final String methodName = "notifyException";
// //@TRACE 405=token={0} excep={1}
// log.fine(CLASS_NAME,methodName, "405",new Object[]{this,this.exception});
// synchronized (responseLock) {
// responseLock.notifyAll();
// }
// synchronized (sentLock) {
// sentLock.notifyAll();
// }
// }
public void waitUntilSent() throws MqttException {
final String methodName = "waitUntilSent";
synchronized (sentLock) {
synchronized (responseLock) {
if (this.exception != null) {
throw this.exception;
}
}
while (!sent) {
try {
// @TRACE 409=wait key={0}
log.fine(CLASS_NAME, methodName, "409", new Object[] { getKey() });
sentLock.wait();
} catch (InterruptedException e) {
}
}
while (!sent) {
if (this.exception == null) {
throw ExceptionHelper.createMqttException(MqttClientException.REASON_CODE_UNEXPECTED_ERROR);
}
throw this.exception;
}
}
}
/**
* Notifies this token that the associated message has been sent (i.e. written
* to the TCP/IP socket).
*/
protected void notifySent() {
final String methodName = "notifySent";
// @TRACE 403=> key={0}
log.fine(CLASS_NAME, methodName, "403", new Object[] { getKey() });
synchronized (responseLock) {
this.response = null;
this.completed = false;
}
synchronized (sentLock) {
sent = true;
sentLock.notifyAll();
}
}
public MqttClientInterface getClient() {
return client;
}
protected void setClient(MqttClientInterface client) {
this.client = client;
}
public void reset() throws MqttException {
final String methodName = "reset";
if (isInUse()) {
// Token is already in use - cannot reset
throw new MqttException(MqttClientException.REASON_CODE_TOKEN_INUSE);
}
// @TRACE 410=> key={0}
log.fine(CLASS_NAME, methodName, "410", new Object[] { getKey() });
client = null;
completed = false;
response = null;
sent = false;
exception = null;
userContext = null;
}
public MqttMessage getMessage() {
return message;
}
public MqttWireMessage getWireMessage() {
return response;
}
public void setMessage(MqttMessage msg) {
this.message = msg;
}
public String[] getTopics() {
return topics;
}
public void setTopics(String[] topics) {
this.topics = topics;
}
public Object getUserContext() {
return userContext;
}
public void setUserContext(Object userContext) {
this.userContext = userContext;
}
public void setKey(String key) {
this.key = key;
}
public String getKey() {
return key;
}
public void setException(MqttException exception) {
synchronized (responseLock) {
this.exception = exception;
}
}
public boolean isNotified() {
return notified;
}
public void setNotified(boolean notified) {
this.notified = notified;
}
public String toString() {
StringBuffer tok = new StringBuffer();
tok.append("key=").append(getKey());
tok.append(" ,topics=");
if (getTopics() != null) {
for (int i = 0; i < getTopics().length; i++) {
tok.append(getTopics()[i]).append(", ");
}
}
tok.append(" ,usercontext=").append(getUserContext());
tok.append(" ,isComplete=").append(isComplete());
tok.append(" ,isNotified=").append(isNotified());
tok.append(" ,exception=").append(getException());
tok.append(" ,actioncallback=").append(getActionCallback());
return tok.toString();
}
public int[] getGrantedQos() {
int[] val = new int[0];
if (response instanceof MqttSubAck) {
// TODO - Work out how to map multiple returncodes
if (response != null) {
val = ((MqttSubAck) response).getReturnCodes();
}
}
return val;
}
public boolean getSessionPresent() {
boolean val = false;
if (response instanceof MqttConnAck) {
if (response != null) {
val = ((MqttConnAck) response).getSessionPresent();
}
}
return val;
}
public MqttWireMessage getResponse() {
return response;
}
/**
* Returns the reason codes from the MqttWireMessage.
* These will be present if the messages is of the following types:
* <ul>
* <li>CONNACK - 1 Reason Code Max.</li>
* <li>PUBACK - 1 Reason Code Max.</li>
* <li>PUBREC - 1 Reason Code Max.</li>
* <li>PUBCOMP - 1 Reason Code Max.</li>
* <li>PUBREL - 1 Reason Code Max.</li>
* <li>SUBACK - 1 or more Reason Codes.</li>
* <li>UNSUBACK - 1 or more Reason Codes.</li>
* <li>AUTH - 1 Reason Code Max.</li>
* </ul>
*
* Warning: This method may be removed in favour of MqttWireMessage.getReasonCodes()
*
* May be null if this message does not contain any Reason Codes.
* @return An array of return codes, or null.
*/
public int[] getReasonCodes() {
return reasonCodes;
}
public void setRequestMessage(MqttWireMessage requestMessage) {
this.request = requestMessage;
}
public MqttWireMessage getRequestMessage() {
return this.request;
}
}