diff --git a/mqtt_jmeter/pom.xml b/mqtt_jmeter/pom.xml
index 1840dcd..4efd495 100644
--- a/mqtt_jmeter/pom.xml
+++ b/mqtt_jmeter/pom.xml
@@ -3,7 +3,7 @@
4.0.0
net.xmeter
mqtt-jmeter
- 2.0.2
+ 2.0.3
5.5
@@ -37,6 +37,24 @@
1.3.0
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.mqttv5.client
+ 1.2.5
+
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ 1.70
+
+
org.apache.jmeter
jorphan
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java
index 9fdadce..1e04d79 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java
@@ -9,6 +9,7 @@ public interface Constants {
public static final String PROTOCOL = "mqtt.protocol";
public static final String WS_PATH = "mqtt.ws_path";
+ public static final String WS_HEADER = "mqtt.ws_header";
public static final String DUAL_AUTH = "mqtt.dual_ssl_authentication";
public static final String CERT_FILE_PATH1 = "mqtt.keystore_file_path";
public static final String CERT_FILE_PATH2 = "mqtt.clientcert_file_path";
@@ -17,6 +18,8 @@ public interface Constants {
public static final String USER_NAME_AUTH = "mqtt.user_name";
public static final String PASSWORD_AUTH = "mqtt.password";
+ public static final String AUTH_METHOD = "mqtt.auth_method";
+ public static final String AUTH_DATA = "mqtt.auth_data";
public static final String CONN_CLIENT_ID_PREFIX = "mqtt.client_id_prefix";
public static final String CONN_CLIENT_ID_SUFFIX = "mqtt.client_id_suffix";
@@ -26,6 +29,10 @@ public interface Constants {
public static final String CONN_RECONN_ATTEMPT_MAX = "mqtt.reconn_attempt_max";
public static final String CONN_CLEAN_SESSION = "mqtt.conn_clean_session";
+
+ public static final String CONN_USER_PROPERTY = "mqtt.conn_user_property";
+ public static final String CONN_CLEAN_START = "mqtt.conn_clean_start";
+ public static final String CONN_SESSION_EXPIRY_INTERVAL = "mqtt.conn_session_expiry_interval";
public static final String MESSAGE_TYPE = "mqtt.message_type";
public static final String MESSAGE_FIX_LENGTH = "mqtt.message_type_fixed_length";
@@ -35,6 +42,15 @@ public interface Constants {
public static final String QOS_LEVEL = "mqtt.qos_level";
public static final String ADD_TIMESTAMP = "mqtt.add_timestamp";
public static final String RETAINED_MESSAGE = "mqtt.retained_message";
+
+ public static final String CORRELATION_DATA = "mqtt.correlation_data";
+ public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt.message_expiry_interval";
+ public static final String USER_PROPERTIES = "mqtt.user_properties";
+ public static final String CONTENT_TYPE = "mqtt.content_type";
+ public static final String RESPONSE_TOPIC = "mqtt.response_topic";
+ public static final String PAYLOAD_FORMAT = "mqtt.payload_format_indicator";
+ public static final String TOPIC_ALIAS = "mqtt.topic_alias";
+ public static final String SUBSCRIPTION_IDENTIFIER = "mqtt.subscription_identifier";
public static final String SAMPLE_CONDITION_VALUE = "mqtt.sample_condition_value";
public static final String SAMPLE_CONDITION = "mqtt.sample_condition";
@@ -53,7 +69,7 @@ public interface Constants {
public static final String MQTT_VERSION_3_1_1 = "3.1.1";
public static final String MQTT_VERSION_3_1 = "3.1";
-
+ public static final String MQTT_VERSION_5_0 = "5.0";
public static final String SAMPLE_ON_CONDITION_OPTION1 = "specified elapsed time (ms)";
public static final String SAMPLE_ON_CONDITION_OPTION2 = "number of received messages";
@@ -69,9 +85,10 @@ public interface Constants {
public static final String WSS_PROTOCOL = "WSS";
public static final String DEFAULT_PROTOCOL = TCP_PROTOCOL;
public static final String FUSESOURCE_MQTT_CLIENT_NAME = "fusesource";
+ public static final String PAHO_MQTT_CLIENT_NAME = "paho";
public static final String HIVEMQ_MQTT_CLIENT_NAME = "hivemq";
// public static final String DEFAULT_MQTT_CLIENT_NAME = FUSESOURCE_MQTT_CLIENT_NAME;
- public static final String DEFAULT_MQTT_CLIENT_NAME = HIVEMQ_MQTT_CLIENT_NAME;
+ public static final String DEFAULT_MQTT_CLIENT_NAME = PAHO_MQTT_CLIENT_NAME;
public static final String JMETER_VARIABLE_PREFIX = "${";
@@ -82,6 +99,7 @@ public interface Constants {
public static final String DEFAULT_CONN_KEEP_ALIVE = "300";
public static final String DEFAULT_CONN_ATTEMPT_MAX = "0";
public static final String DEFAULT_CONN_RECONN_ATTEMPT_MAX = "0";
+ public static final String DEFAULT_CONN_USER_PROPERTY = "{}";
public static final String DEFAULT_SAMPLE_VALUE_COUNT = "1";
public static final String DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC = "1000";
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java
index a1d44cc..1a89a45 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java
@@ -10,11 +10,7 @@
import java.util.Arrays;
import java.util.List;
-import javax.swing.BorderFactory;
-import javax.swing.JButton;
-import javax.swing.JCheckBox;
-import javax.swing.JFileChooser;
-import javax.swing.JPanel;
+import javax.swing.*;
import javax.swing.event.ChangeEvent;
import javax.swing.event.ChangeListener;
@@ -33,7 +29,8 @@
public class CommonConnUI implements ChangeListener, ActionListener, Constants{
private final JLabeledTextField serverAddr = new JLabeledTextField("Server name or IP:");
private final JLabeledTextField serverPort = new JLabeledTextField("Port number:", 5);
- private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false);;
+ private final JLabel mqttVersionLabel = new JLabel("MQTT version:");
+ private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1, MQTT_VERSION_5_0 }, false, false);
private final JLabeledTextField timeout = new JLabeledTextField("Timeout(s):", 5);
private final JLabeledTextField userNameAuth = new JLabeledTextField("User name:");
@@ -44,6 +41,7 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{
private JCheckBox dualAuth = new JCheckBox("Dual SSL authentication");
private JLabeledTextField wsPath = new JLabeledTextField("WS Path: ", 10);
+ private final JLabeledTextField wsHeader = new JLabeledTextField("WS Header: ", 20);
// private final JLabeledTextField tksFilePath = new JLabeledTextField("Trust Key Store(*.jks): ", 25);
private final JLabeledTextField ccFilePath = new JLabeledTextField("Client Certification(*.p12):", 25);
@@ -51,8 +49,17 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{
// private final JLabeledTextField tksPassword = new JLabeledTextField("Secret:", 10);
private final JLabeledTextField ccPassword = new JLabeledTextField("Secret:", 10);
+ private final JLabeledTextField caCertFilePath = new JLabeledTextField("CA Cert :", 25);
+ private final JLabeledTextField clientCertFilePath = new JLabeledTextField("Client Cert:", 25);
+ private final JLabeledTextField clientKeyFilePath = new JLabeledTextField("Client Key :", 25);
+
// private JButton tksBrowseButton;
private JButton ccBrowseButton;
+
+ private JButton caBrowseButton;
+ private JButton clientCertBrowseButton;
+ private JButton clientKeyBrowseButton;
+
// private static final String TKS_BROWSE = "tks_browse";
private static final String CC_BROWSE = "cc_browse";
@@ -66,6 +73,11 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{
private final JLabeledTextField connCleanSession = new JLabeledTextField("Clean session:", 3);
+ public final JLabeledTextField connUserProperty = new JLabeledTextField("User Property:", 10);
+
+ private final JLabeledTextField connCleanStart = new JLabeledTextField("Clean start:", 5);
+
+ private final JLabeledTextField connSessionExpiryInterval = new JLabeledTextField("Session Expiry Interval(s):", 5);
// private final List clientNamesList = MQTT.getAvailableNames();
public JPanel createConnPanel() {
@@ -75,12 +87,15 @@ public JPanel createConnPanel() {
connPanel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "MQTT connection"));
connPanel.add(serverAddr);
connPanel.add(serverPort);
+ connPanel.add(mqttVersionLabel);
connPanel.add(mqttVersion);
JPanel timeoutPannel = new HorizontalPanel();
timeoutPannel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Timeout"));
timeoutPannel.add(timeout);
+ mqttVersion.addChangeListener(this);
+
con.add(connPanel);
con.add(timeoutPannel);
return con;
@@ -98,13 +113,22 @@ public JPanel createConnOptions() {
JPanel optsPanel1 = new HorizontalPanel();
optsPanel1.add(connKeepAlive);
- optsPanelCon.add(optsPanel1);
-
optsPanel1.add(connAttmptMax);
optsPanel1.add(reconnAttmptMax);
optsPanel1.add(connCleanSession);
optsPanelCon.add(optsPanel1);
-
+
+ JPanel optsPanel2 = new HorizontalPanel();
+ optsPanel2.add(connUserProperty);
+ optsPanel2.add(connCleanStart);
+ optsPanel2.add(connSessionExpiryInterval);
+
+ connUserProperty.setVisible(false);
+ connCleanStart.setVisible(false);
+ connSessionExpiryInterval.setVisible(false);
+
+ optsPanelCon.add(optsPanel2);
+
return optsPanelCon;
}
@@ -143,6 +167,9 @@ public JPanel createProtocolPanel() {
wsPath.setVisible(false);
pCenter.add(wsPath);
+ wsHeader.setVisible(false);
+ pCenter.add(wsHeader);
+
pPanel.add(pCenter, BorderLayout.CENTER);
dualAuth.setSelected(false);
@@ -171,20 +198,53 @@ public JPanel createProtocolPanel() {
// panel.add(tksPassword, c);
//c.weightx = 0.0;
- c.gridx = 0; c.gridy = 1; c.gridwidth = 2;
- ccFilePath.setVisible(false);
- panel.add(ccFilePath, c);
-
- c.gridx = 2; c.gridy = 1; c.gridwidth = 1;
- ccBrowseButton = new JButton(JMeterUtils.getResString("browse"));
- ccBrowseButton.setActionCommand(CC_BROWSE);
- ccBrowseButton.addActionListener(this);
- ccBrowseButton.setVisible(false);
- panel.add(ccBrowseButton, c);
+// c.gridx = 0; c.gridy = 1; c.gridwidth = 2;
+// ccFilePath.setVisible(false);
+// panel.add(ccFilePath, c);
+//
+// c.gridx = 2; c.gridy = 1; c.gridwidth = 1;
+// ccBrowseButton = new JButton(JMeterUtils.getResString("browse"));
+// ccBrowseButton.setActionCommand(CC_BROWSE);
+// ccBrowseButton.addActionListener(this);
+// ccBrowseButton.setVisible(false);
+// panel.add(ccBrowseButton, c);
- c.gridx = 3; c.gridy = 1; c.gridwidth = 2;
- ccPassword.setVisible(false);
- panel.add(ccPassword, c);
+// c.gridx = 3; c.gridy = 1; c.gridwidth = 2;
+// ccPassword.setVisible(false);
+// panel.add(ccPassword, c);
+
+ c.gridx = 0; c.gridy = 2; c.gridwidth = 2;
+ caCertFilePath.setVisible(false);
+ panel.add(caCertFilePath, c);
+
+ c.gridx = 2; c.gridy = 2; c.gridwidth = 1;
+ caBrowseButton = new JButton(JMeterUtils.getResString("browse"));
+ caBrowseButton.setActionCommand("ca_browse");
+ caBrowseButton.addActionListener(this);
+ caBrowseButton.setVisible(false);
+ panel.add(caBrowseButton, c);
+
+ c.gridx = 0; c.gridy = 3; c.gridwidth = 2;
+ clientCertFilePath.setVisible(false);
+ panel.add(clientCertFilePath, c);
+
+ c.gridx = 2; c.gridy = 3; c.gridwidth = 1;
+ clientCertBrowseButton = new JButton(JMeterUtils.getResString("browse"));
+ clientCertBrowseButton.setActionCommand("client_cert_browse");
+ clientCertBrowseButton.addActionListener(this);
+ clientCertBrowseButton.setVisible(false);
+ panel.add(clientCertBrowseButton, c);
+
+ c.gridx = 0; c.gridy = 4; c.gridwidth = 2;
+ clientKeyFilePath.setVisible(false);
+ panel.add(clientKeyFilePath, c);
+
+ c.gridx = 2; c.gridy = 4; c.gridwidth = 1;
+ clientKeyBrowseButton = new JButton(JMeterUtils.getResString("browse"));
+ clientKeyBrowseButton.setActionCommand("client_key_browse");
+ clientKeyBrowseButton.addActionListener(this);
+ clientKeyBrowseButton.setVisible(false);
+ panel.add(clientKeyBrowseButton, c);
protocolPanel.add(pPanel);
protocolPanel.add(panel);
@@ -202,6 +262,15 @@ public void actionPerformed(ActionEvent e) {
if(CC_BROWSE.equals(action)) {
String path = browseAndGetFilePath();
ccFilePath.setText(path);
+ } else if ("ca_browse".equals(action)) {
+ String path = browseAndGetFilePath();
+ caCertFilePath.setText(path);
+ } else if ("client_cert_browse".equals(action)) {
+ String path = browseAndGetFilePath();
+ clientCertFilePath.setText(path);
+ } else if ("client_key_browse".equals(action)) {
+ String path = browseAndGetFilePath();
+ clientKeyFilePath.setText(path);
}
}
private String browseAndGetFilePath() {
@@ -223,16 +292,30 @@ public void stateChanged(ChangeEvent e) {
// tksFilePath.setVisible(true);
// tksBrowseButton.setVisible(true);
// tksPassword.setVisible(true);
- ccFilePath.setVisible(true);
- ccBrowseButton.setVisible(true);
- ccPassword.setVisible(true);
+// ccFilePath.setVisible(true);
+// ccBrowseButton.setVisible(true);
+// ccPassword.setVisible(true);
+
+ caCertFilePath.setVisible(true);
+ caBrowseButton.setVisible(true);
+ clientCertFilePath.setVisible(true);
+ clientCertBrowseButton.setVisible(true);
+ clientKeyFilePath.setVisible(true);
+ clientKeyBrowseButton.setVisible(true);
} else {
// tksFilePath.setVisible(false);
// tksBrowseButton.setVisible(false);
// tksPassword.setVisible(false);
- ccFilePath.setVisible(false);
- ccBrowseButton.setVisible(false);
- ccPassword.setVisible(false);
+// ccFilePath.setVisible(false);
+// ccBrowseButton.setVisible(false);
+// ccPassword.setVisible(false);
+
+ caCertFilePath.setVisible(false);
+ caBrowseButton.setVisible(false);
+ clientCertFilePath.setVisible(false);
+ clientCertBrowseButton.setVisible(false);
+ clientKeyFilePath.setVisible(false);
+ clientKeyBrowseButton.setVisible(false);
}
} else if(e.getSource() == protocols) {
boolean isSecure = Util.isSecureProtocol(protocols.getText());
@@ -241,6 +324,8 @@ public void stateChanged(ChangeEvent e) {
boolean wsProtocol = Util.isWebSocketProtocol(protocols.getText());
wsPath.setVisible(wsProtocol);
wsPath.setEnabled(wsProtocol);
+ wsHeader.setVisible(wsProtocol);
+ wsHeader.setEnabled(wsProtocol);
// } else if (e.getSource() == clientNames) {
// int index = clientNames.getSelectedIndex();
// if (index > -1) {
@@ -250,6 +335,12 @@ public void stateChanged(ChangeEvent e) {
// } else {
// protocols.setValues(new String[0]);
// }
+ } else if(e.getSource() == mqttVersion) {
+ boolean isMqtt5 = mqttVersion.getText().equals(MQTT_VERSION_5_0);
+ connUserProperty.setVisible(isMqtt5);
+ connCleanStart.setVisible(isMqtt5);
+ connSessionExpiryInterval.setVisible(isMqtt5);
+ connCleanSession.setVisible(!isMqtt5);
}
}
@@ -260,7 +351,10 @@ public void configure(AbstractMQTTSampler sampler) {
mqttVersion.setSelectedIndex(0);
} else if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1_1)) {
mqttVersion.setSelectedIndex(1);
+ } else if(sampler.getMqttVersion().equals(MQTT_VERSION_5_0)) {
+ mqttVersion.setSelectedIndex(2);
}
+
timeout.setText(sampler.getConnTimeout());
// if (sampler.getProtocol().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1) {
@@ -270,7 +364,7 @@ public void configure(AbstractMQTTSampler sampler) {
// clientNames.setText(sampler.getMqttClientName());
// }
- if(sampler.getProtocol().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1) {
+ if(sampler.getProtocol().trim().contains(JMETER_VARIABLE_PREFIX)) {
List items = Arrays.asList(protocols.getItems());
int index = items.indexOf(sampler.getProtocol());
protocols.setSelectedIndex(index);
@@ -283,6 +377,10 @@ public void configure(AbstractMQTTSampler sampler) {
wsPath.setVisible(wsProtocol);
wsPath.setEnabled(wsProtocol);
+ wsHeader.setText(sampler.getWsHeader());
+ wsHeader.setVisible(wsProtocol);
+ wsHeader.setEnabled(wsProtocol);
+
if(sampler.isDualSSLAuth()) {
dualAuth.setVisible(true);
dualAuth.setSelected(sampler.isDualSSLAuth());
@@ -296,7 +394,7 @@ public void configure(AbstractMQTTSampler sampler) {
passwordAuth.setText(sampler.getPasswordAuth());
connNamePrefix.setText(sampler.getConnPrefix());
- if(sampler.isClientIdSuffix()) {
+ if (sampler.isClientIdSuffix()) {
connNameSuffix.setSelected(true);
} else {
connNameSuffix.setSelected(false);
@@ -306,7 +404,15 @@ public void configure(AbstractMQTTSampler sampler) {
connAttmptMax.setText(sampler.getConnAttemptMax());
reconnAttmptMax.setText(sampler.getConnReconnAttemptMax());
- connCleanSession.setText(sampler.getConnCleanSession().toString());
+ connCleanSession.setText(sampler.getConnCleanSession());
+
+ connUserProperty.setText(sampler.getConnUserProperty());
+ connCleanStart.setText(sampler.getConnCleanStart());
+ connSessionExpiryInterval.setText(sampler.getConnSessionExpiryInterval());
+
+ caCertFilePath.setText(sampler.getCAFilePath());
+ clientCertFilePath.setText(sampler.getClientCert2FilePath());
+ clientKeyFilePath.setText(sampler.getClientPrivateKeyFilePath());
}
@@ -319,6 +425,7 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) {
// sampler.setMqttClientName(clientNames.getText());
sampler.setProtocol(protocols.getText());
sampler.setWsPath(wsPath.getText());
+ sampler.setWsHeader(wsHeader.getText());
sampler.setDualSSLAuth(dualAuth.isSelected());
// sampler.setKeyStoreFilePath(tksFilePath.getText());
// sampler.setKeyStorePassword(tksPassword.getText());
@@ -336,6 +443,14 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) {
sampler.setConnReconnAttemptMax(reconnAttmptMax.getText());
sampler.setConnCleanSession(connCleanSession.getText());
+
+ sampler.setConnUserProperty(connUserProperty.getText());
+ sampler.setConnCleanStart(connCleanStart.getText());
+ sampler.setConnSessionExpiryInterval(connSessionExpiryInterval.getText());
+
+ sampler.setCAFilePath(caCertFilePath.getText());
+ sampler.setClientCert2FilePath(clientCertFilePath.getText());
+ sampler.setClientPrivateKeyFilePath(clientKeyFilePath.getText());
}
public static int parseInt(String value) {
@@ -357,6 +472,7 @@ public void clearUI() {
dualAuth.setSelected(false);
wsPath.setText("");
+ wsHeader.setText("{}");
// tksFilePath.setText("");
// tksPassword.setText("");
ccFilePath.setText("");
@@ -372,5 +488,13 @@ public void clearUI() {
connKeepAlive.setText(DEFAULT_CONN_KEEP_ALIVE);
reconnAttmptMax.setText(DEFAULT_CONN_RECONN_ATTEMPT_MAX);
connCleanSession.setText("true");
+
+ connUserProperty.setText("{}");
+ connCleanStart.setText("true");
+ connSessionExpiryInterval.setText("0");
+
+ caCertFilePath.setText("");
+ clientCertFilePath.setText("");
+ clientKeyFilePath.setText("");
}
}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java
index fa3e714..7721d0a 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java
@@ -3,9 +3,7 @@
import java.awt.BorderLayout;
import java.util.logging.Logger;
-import javax.swing.BorderFactory;
-import javax.swing.JCheckBox;
-import javax.swing.JPanel;
+import javax.swing.*;
import javax.swing.event.ChangeEvent;
import javax.swing.event.ChangeListener;
@@ -30,6 +28,18 @@ public class PubSamplerUI extends AbstractSamplerGui implements Constants, Chang
private final JLabeledTextField topicName = new JLabeledTextField("Topic name:");
private JCheckBox timestamp = new JCheckBox("Add timestamp in payload");
+ private final JLabeledTextField messageExpiryInterval = new JLabeledTextField("Message Expiry Interval(s):");
+ private final JLabeledTextField contentType = new JLabeledTextField("Content Type:");
+ private final JLabeledTextField responseTopic = new JLabeledTextField("Response Topic:");
+ private final JLabeledTextField correlationData = new JLabeledTextField("Correlation Data:");
+ private final JLabeledTextField userProperties = new JLabeledTextField("User Properties:");
+
+ private final JLabel qosLabel = new JLabel("QoS Level:");
+ private final JLabel payloadFormatLabel = new JLabel("Payload Format:");
+ private final JLabeledChoice payloadFormat = new JLabeledChoice("Payload Format:", new String[] { "UNSPECIFIED", "UTF_8" }, false, false);
+ private final JLabeledTextField topicAlias = new JLabeledTextField("Topic Alias:");
+ private final JLabeledTextField subscriptionIdentifier = new JLabeledTextField("Subscription Identifier:");
+
private JLabeledChoice messageTypes;
private final JSyntaxTextArea sendMessage = JSyntaxTextArea.getInstance(10, 50);
private final JTextScrollPane messagePanel = JTextScrollPane.getInstance(sendMessage);
@@ -59,6 +69,7 @@ private JPanel createPubOption() {
qosChoice.addChangeListener(this);
JPanel optsPanel = new HorizontalPanel();
+ optsPanel.add(qosLabel);
optsPanel.add(qosChoice);
optsPanel.add(retainedMsg);
optsPanel.add(topicName);
@@ -66,6 +77,22 @@ private JPanel createPubOption() {
optsPanel.add(timestamp);
optsPanelCon.add(optsPanel);
+ JPanel optsPanel1 = new HorizontalPanel();
+ optsPanel1.add(userProperties);
+ optsPanel1.add(messageExpiryInterval);
+ optsPanel1.add(topicAlias);
+ optsPanel1.add(payloadFormatLabel);
+ optsPanel1.add(payloadFormat);
+ payloadFormat.setToolTipText("Payload format indicator to the message");
+ optsPanelCon.add(optsPanel1);
+
+ JPanel optsPanel2 = new HorizontalPanel();
+ optsPanel2.add(contentType);
+ optsPanel2.add(responseTopic);
+ optsPanel2.add(correlationData);
+ optsPanel2.add(subscriptionIdentifier);
+ optsPanelCon.add(optsPanel2);
+
return optsPanelCon;
}
@@ -129,7 +156,7 @@ public void configure(TestElement element) {
super.configure(element);
PubSampler sampler = (PubSampler) element;
- if(sampler.getQOS().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1){
+ if(sampler.getQOS().trim().contains(JMETER_VARIABLE_PREFIX)){
this.qosChoice.setSelectedIndex(Integer.parseInt(sampler.getQOS()));
} else {
this.qosChoice.setText(sampler.getQOS());
@@ -149,6 +176,15 @@ public void configure(TestElement element) {
stringLength.setText(String.valueOf(sampler.getMessageLength()));
sendMessage.setText(sampler.getMessage());
+
+ contentType.setText(sampler.getContentType());
+ messageExpiryInterval.setText(String.valueOf(sampler.getMessageExpiryInterval()));
+ userProperties.setText(sampler.getUserProperties());
+ responseTopic.setText(sampler.getResponseTopic());
+ correlationData.setText(sampler.getCorrelationData());
+ payloadFormat.setText(sampler.getPayloadFormat());
+ topicAlias.setText(sampler.getTopicAlias());
+ subscriptionIdentifier.setText(sampler.getSubscriptionIdentifier());
}
@Override
@@ -161,7 +197,7 @@ private void setupSamplerProperties(PubSampler sampler) {
this.configureTestElement(sampler);
sampler.setTopic(this.topicName.getText());
- if(this.qosChoice.getText().indexOf(JMETER_VARIABLE_PREFIX) == -1) {
+ if(this.qosChoice.getText().contains(JMETER_VARIABLE_PREFIX)) {
int qos = QOS_0;
try {
qos = Integer.parseInt(this.qosChoice.getText());
@@ -183,6 +219,15 @@ private void setupSamplerProperties(PubSampler sampler) {
sampler.setMessageLength(this.stringLength.getText());
sampler.setMessage(this.sendMessage.getText());
sampler.setRetainedMessage(Boolean.parseBoolean(this.retainedMsg.getText()));
+
+ sampler.setContentType(this.contentType.getText());
+ sampler.setCorrelationData(this.correlationData.getText());
+ sampler.setMessageExpiryInterval(Long.parseLong(this.messageExpiryInterval.getText()));
+ sampler.setUserProperties(this.userProperties.getText());
+ sampler.setResponseTopic(this.responseTopic.getText());
+ sampler.setPayloadFormat(this.payloadFormat.getText());
+ sampler.setTopicAlias(this.topicAlias.getText());
+ sampler.setSubscriptionIdentifier(this.subscriptionIdentifier.getText());
}
@Override
@@ -195,5 +240,14 @@ public void clearGui() {
this.messageTypes.setSelectedIndex(0);
this.stringLength.setText(String.valueOf(DEFAULT_MESSAGE_FIX_LENGTH));
this.sendMessage.setText("");
+
+ this.messageExpiryInterval.setText("0");
+ this.responseTopic.setText("");
+ this.contentType.setText("");
+ this.userProperties.setText("{}");
+ this.correlationData.setText("");
+ this.payloadFormat.setSelectedIndex(0);
+ this.topicAlias.setText("");
+ this.subscriptionIdentifier.setText("");
}
}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java
index a79d765..137bcc7 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java
@@ -72,6 +72,14 @@ public void setWsPath(String wsPath) {
setProperty(WS_PATH, wsPath);
}
+ public String getWsHeader() {
+ return getPropertyAsString(WS_HEADER, "{}");
+ }
+
+ public void setWsHeader(String wsHeader) {
+ setProperty(WS_HEADER, wsHeader);
+ }
+
public boolean isDualSSLAuth() {
return getPropertyAsBoolean(DUAL_AUTH, false);
}
@@ -201,6 +209,54 @@ public void setMqttClientName(String mqttClientName) {
setProperty(MQTT_CLIENT_NAME, mqttClientName);
}
+ public void setConnCleanStart(String cleanStart) {
+ setProperty(CONN_CLEAN_START, cleanStart);
+ }
+
+ public String getConnCleanStart() {
+ return getPropertyAsString(CONN_CLEAN_START, "true");
+ }
+
+ public String getConnSessionExpiryInterval() {
+ return getPropertyAsString(CONN_SESSION_EXPIRY_INTERVAL, "0");
+ }
+
+ public void setConnSessionExpiryInterval(String sessionExpiryInterval) {
+ setProperty(CONN_SESSION_EXPIRY_INTERVAL, sessionExpiryInterval);
+ }
+
+ public String getConnUserProperty() {
+ return getPropertyAsString(CONN_USER_PROPERTY, "{}");
+ }
+
+ public void setConnUserProperty(String connUserProperty) {
+ setProperty(CONN_USER_PROPERTY, connUserProperty);
+ }
+
+ public String getCAFilePath() {
+ return getPropertyAsString("mqtt.ca_file_path", "");
+ }
+
+ public void setCAFilePath(String ca) {
+ setProperty("mqtt.ca_file_path", ca);
+ }
+
+ public String getClientCert2FilePath() {
+ return getPropertyAsString("mqtt.client_cert_file_path", "");
+ }
+
+ public void setClientCert2FilePath(String cert) {
+ setProperty("mqtt.client_cert_file_path", cert);
+ }
+
+ public String getClientPrivateKeyFilePath() {
+ return getPropertyAsString("mqtt.client_key_file_path", "");
+ }
+
+ public void setClientPrivateKeyFilePath(String key) {
+ setProperty("mqtt.client_key_file_path", key);
+ }
+
// public int getConCapacity() {
// return conCapacity;
// }
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java
index a3e21f9..2d26463 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java
@@ -75,6 +75,11 @@ public SampleResult sample(Entry entry) {
MQTTSsl ssl = MQTT.getInstance(getMqttClientName()).createSsl(this);
parameters.setSsl(ssl);
}
+
+ parameters.setConnUserProperty(getConnUserProperty());
+ parameters.setCleanStart(Boolean.parseBoolean(getConnCleanStart()));
+ parameters.setSessionExpiryInterval(Long.parseLong(getConnSessionExpiryInterval()));
+ parameters.setConnWsHeader(getWsHeader());
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to establish Connection " + connection , e);
result.setSuccessful(false);
@@ -83,7 +88,7 @@ public SampleResult sample(Entry entry) {
result.setResponseCode("502");
return result;
}
-
+
try {
client = MQTT.getInstance(getMqttClientName()).createClient(parameters);
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java
index fb86b04..a085b17 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java
@@ -6,6 +6,7 @@
import javax.xml.bind.DatatypeConverter;
+import net.xmeter.samplers.mqtt.MQTT5PublishReq;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
@@ -82,6 +83,70 @@ public Boolean getRetainedMessage() {
return getPropertyAsBoolean(RETAINED_MESSAGE, false);
}
+ public void setMessageExpiryInterval(long messageExpiryInterval) {
+ setProperty(MESSAGE_EXPIRY_INTERVAL, messageExpiryInterval);
+ }
+
+ public Long getMessageExpiryInterval() {
+ return getPropertyAsLong(MESSAGE_EXPIRY_INTERVAL);
+ }
+
+ public void setContentType(String contentType) {
+ setProperty(CONTENT_TYPE, contentType);
+ }
+
+ public String getContentType() {
+ return getPropertyAsString(CONTENT_TYPE);
+ }
+
+ public void setResponseTopic(String responseTopic) {
+ setProperty(RESPONSE_TOPIC, responseTopic);
+ }
+
+ public String getResponseTopic() {
+ return getPropertyAsString(RESPONSE_TOPIC);
+ }
+
+ public void setUserProperties(String userProperties) {
+ setProperty(USER_PROPERTIES, userProperties);
+ }
+
+ public String getUserProperties() {
+ return getPropertyAsString(USER_PROPERTIES);
+ }
+
+ public void setCorrelationData(String correlationData) {
+ setProperty(CORRELATION_DATA, correlationData);
+ }
+
+ public String getCorrelationData() {
+ return getPropertyAsString(CORRELATION_DATA);
+ }
+
+ public void setPayloadFormat(String payloadFormat) {
+ setProperty(PAYLOAD_FORMAT, payloadFormat);
+ }
+
+ public String getPayloadFormat() {
+ return getPropertyAsString(PAYLOAD_FORMAT);
+ }
+
+ public void setTopicAlias(String topicAlias) {
+ setProperty(TOPIC_ALIAS, topicAlias);
+ }
+
+ public String getTopicAlias() {
+ return getPropertyAsString(TOPIC_ALIAS);
+ }
+
+ public void setSubscriptionIdentifier(String subscriptionIdentifier) {
+ setProperty(SUBSCRIPTION_IDENTIFIER, subscriptionIdentifier);
+ }
+
+ public String getSubscriptionIdentifier() {
+ return getPropertyAsString(SUBSCRIPTION_IDENTIFIER);
+ }
+
public static byte[] hexToBinary(String hex) {
return DatatypeConverter.parseHexBinary(hex);
}
@@ -119,7 +184,6 @@ public SampleResult sample(Entry arg0) {
tmp = payload.getBytes();
}
-
int qos = Integer.parseInt(getQOS());
switch (qos) {
case 0:
@@ -146,13 +210,15 @@ public SampleResult sample(Entry arg0) {
toSend = new byte[tmp.length];
System.arraycopy(tmp, 0, toSend, 0 , tmp.length);
}
-
+
+ MQTT5PublishReq mqtt5PublishReq = getMqtt5PublishReq();
+
result.sampleStart();
if (logger.isLoggable(Level.FINE)) {
logger.fine("pub [topic]: " + topicName + ", [payload]: " + new String(toSend));
}
- MQTTPubResult pubResult = connection.publish(topicName, toSend, qos_enum, retainedMsg);
+ MQTTPubResult pubResult = connection.publish(topicName, toSend, qos_enum, retainedMsg, mqtt5PublishReq);
result.sampleEnd();
result.setSamplerData(new String(toSend));
@@ -161,7 +227,7 @@ public SampleResult sample(Entry arg0) {
result.setSuccessful(pubResult.isSuccessful());
if(pubResult.isSuccessful()) {
- result.setResponseData("Publish successfuly.".getBytes());
+ result.setResponseData("Publish successfully.".getBytes());
result.setResponseMessage(MessageFormat.format("publish successfully for Connection {0}.", connection));
result.setResponseCodeOK();
} else {
@@ -188,5 +254,19 @@ public SampleResult sample(Entry arg0) {
}
return result;
}
-
+
+ private MQTT5PublishReq getMqtt5PublishReq() {
+ MQTT5PublishReq mqtt5PublishReq = new MQTT5PublishReq();
+ mqtt5PublishReq.setContentType(getContentType());
+ mqtt5PublishReq.setResponseTopic(getResponseTopic());
+ mqtt5PublishReq.setCorrelationData(getCorrelationData());
+ mqtt5PublishReq.setMessageExpiryInterval(getMessageExpiryInterval());
+ mqtt5PublishReq.setUserProperties(getUserProperties());
+ mqtt5PublishReq.setPayloadFormatIndicator(getPayloadFormat());
+ mqtt5PublishReq.setTopicAlias(getTopicAlias());
+ mqtt5PublishReq.setSubscriptionIdentifier(getSubscriptionIdentifier());
+
+ return mqtt5PublishReq;
+ }
+
}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java
index e9e45bf..ac40a7b 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java
@@ -1,7 +1,12 @@
package net.xmeter.samplers.mqtt;
+import com.fasterxml.jackson.databind.ObjectMapper;
import net.xmeter.Util;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
public class ConnectionParameters {
private MQTTSsl ssl;
private String protocol;
@@ -18,6 +23,11 @@ public class ConnectionParameters {
private boolean cleanSession;
private String path;
+ private boolean cleanStart;
+ private long sessionExpiryInterval;
+ private Map connUserProperty;
+ private Map connWsHeader;
+
public MQTTSsl getSsl() {
return ssl;
}
@@ -138,4 +148,74 @@ public boolean isSecureProtocol() {
public boolean isWebSocketProtocol() {
return Util.isWebSocketProtocol(getProtocol());
}
+
+ public boolean isCleanStart() {
+ return cleanStart;
+ }
+
+ public void setCleanStart(boolean cleanStart) {
+ this.cleanStart = cleanStart;
+ }
+
+ public long getSessionExpiryInterval() {
+ return sessionExpiryInterval;
+ }
+
+ public void setSessionExpiryInterval(long sessionExpiryInterval) {
+ this.sessionExpiryInterval = sessionExpiryInterval;
+ }
+
+ public Map getConnUserProperty() {
+ return connUserProperty;
+ }
+
+ public void setConnUserProperty(String userPropertyJson) {
+ if (userPropertyJson == null || userPropertyJson.isEmpty()) {
+ this.connUserProperty = new HashMap<>();
+ return;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ try{
+ this.connUserProperty = mapper.readValue(userPropertyJson, Map.class);
+ }catch (IOException e){
+ this.connUserProperty = new HashMap<>();
+ e.printStackTrace();
+ }
+ }
+
+ public Map getConnWsHeader() {
+ return connWsHeader;
+ }
+
+ public void setConnWsHeader(String wsHeaderJson) {
+ if (wsHeaderJson == null || wsHeaderJson.isEmpty()) {
+ this.connWsHeader = new HashMap<>();
+ return;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ try{
+ this.connWsHeader = mapper.readValue(wsHeaderJson, Map.class);
+ }catch (IOException e){
+ this.connWsHeader = new HashMap<>();
+ e.printStackTrace();
+ }
+ }
+
+ public boolean shouldAutomaticReconnectWithDefaultConfig(){
+ return reconnectMaxAttempts == -1 || reconnectMaxAttempts > 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ConnectionParameters{" +
+ "host='" + host + '\'' +
+ "port='" + port + '\'' +
+ "version='" + version + '\'' +
+ "connUserProperty='" + connUserProperty + '\'' +
+ "cleanStart='" + cleanStart + '\'' +
+ "sessionExpiryInterval='" + sessionExpiryInterval + '\'' +
+ '}';
+ }
}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java
new file mode 100644
index 0000000..20d68dd
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java
@@ -0,0 +1,94 @@
+package net.xmeter.samplers.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MQTT5PublishReq {
+ private String correlationData;
+ private long messageExpiryInterval;
+ private Map userProperties;
+ private String contentType;
+ private String responseTopic;
+
+ private String payloadFormatIndicator;
+ private String topicAlias;
+ private String subscriptionIdentifier;
+
+ public Map getUserProperties() {
+ return userProperties;
+ }
+
+ public void setUserProperties(String userPropertiesJson) {
+ if (userPropertiesJson == null || userPropertiesJson.isEmpty()) {
+ this.userProperties = new HashMap<>();
+ return;
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ try{
+ this.userProperties = mapper.readValue(userPropertiesJson, Map.class);
+ }catch (IOException e){
+ this.userProperties = new HashMap<>();
+ e.printStackTrace();
+ }
+ }
+
+ public String getCorrelationData() {
+ return correlationData;
+ }
+
+ public void setCorrelationData(String correlationData) {
+ this.correlationData = correlationData;
+ }
+
+ public long getMessageExpiryInterval() {
+ return messageExpiryInterval;
+ }
+
+ public void setMessageExpiryInterval(long messageExpiryInterval) {
+ this.messageExpiryInterval = messageExpiryInterval;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getResponseTopic() {
+ return responseTopic;
+ }
+
+ public void setResponseTopic(String responseTopic) {
+ this.responseTopic = responseTopic;
+ }
+
+ public String getPayloadFormatIndicator() {
+ return payloadFormatIndicator;
+ }
+
+ public void setPayloadFormatIndicator(String payloadFormatIndicator) {
+ this.payloadFormatIndicator = payloadFormatIndicator;
+ }
+
+ public String getTopicAlias() {
+ return topicAlias;
+ }
+
+ public void setTopicAlias(String topicAlias) {
+ this.topicAlias = topicAlias;
+ }
+
+ public String getSubscriptionIdentifier() {
+ return subscriptionIdentifier;
+ }
+
+ public void setSubscriptionIdentifier(String subscriptionIdentifier) {
+ this.subscriptionIdentifier = subscriptionIdentifier;
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java
index 3a10e34..66d5ac4 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java
@@ -7,9 +7,10 @@ public interface MQTTConnection {
String getClientId();
void disconnect() throws Exception;
- MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained);
+ MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req);
void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure);
void setSubListener(MQTTSubListener listener);
}
+
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java
index 6b3ced2..35beba1 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java
@@ -8,6 +8,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import net.xmeter.samplers.mqtt.*;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
@@ -16,10 +17,6 @@
import org.fusesource.mqtt.client.QoS;
import net.xmeter.samplers.PubCallback;
-import net.xmeter.samplers.mqtt.MQTTConnection;
-import net.xmeter.samplers.mqtt.MQTTPubResult;
-import net.xmeter.samplers.mqtt.MQTTQoS;
-import net.xmeter.samplers.mqtt.MQTTSubListener;
class FuseMQTTConnection implements MQTTConnection {
private static final Logger logger = Logger.getLogger(FuseMQTTConnection.class.getCanonicalName());
@@ -63,7 +60,7 @@ public void onFailure(Throwable throwable) {
}
@Override
- public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qos, boolean retained) {
+ public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qos, boolean retained, MQTT5PublishReq req) {
final Object pubLock = new Object();
QoS fuseQos = FuseUtil.map(qos);
PubCallback pubCallback = new PubCallback(pubLock, fuseQos);
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java
index 187da95..e8376e1 100644
--- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java
@@ -18,11 +18,7 @@
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode;
-import net.xmeter.samplers.mqtt.MQTTClientException;
-import net.xmeter.samplers.mqtt.MQTTConnection;
-import net.xmeter.samplers.mqtt.MQTTPubResult;
-import net.xmeter.samplers.mqtt.MQTTQoS;
-import net.xmeter.samplers.mqtt.MQTTSubListener;
+import net.xmeter.samplers.mqtt.*;
class HiveMQTTConnection implements MQTTConnection {
private static final Logger logger = Logger.getLogger(HiveMQTTConnection.class.getCanonicalName());
@@ -56,7 +52,7 @@ public void disconnect() {
}
@Override
- public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained) {
+ public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) {
try {
client.publishWith()
.topic(topicName)
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java
new file mode 100644
index 0000000..3f95521
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java
@@ -0,0 +1,71 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.Constants;
+import net.xmeter.samplers.mqtt.*;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+public class PahoMQTT3Client implements MQTTClient {
+
+ private static final Logger logger = Logger.getLogger(PahoMQTT3Client.class.getCanonicalName());
+ private final ConnectionParameters parameters;
+ private final MqttAsyncClient client;
+
+ PahoMQTT3Client(ConnectionParameters parameters) throws MqttException {
+ this.parameters = parameters;
+ MemoryPersistence persistence = new MemoryPersistence();
+ this.client = new MqttAsyncClient(PahoUtil.createHostAddress(parameters), this.getClientId(), persistence);
+ }
+
+
+ @Override
+ public String getClientId() {
+ return parameters.getClientId();
+ }
+
+ @Override
+ public MQTTConnection connect() throws Exception {
+
+ MqttConnectOptions connectOptions = new MqttConnectOptions();
+ connectOptions.setCleanSession(parameters.isCleanSession());
+ connectOptions.setKeepAliveInterval(parameters.getKeepAlive());
+ connectOptions.setConnectionTimeout(parameters.getConnectTimeout());
+
+ if (parameters.getVersion().equals(Constants.MQTT_VERSION_3_1_1)) {
+ connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
+ } else if (parameters.getVersion().equals(Constants.MQTT_VERSION_3_1)) {
+ connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
+ }
+
+ if (parameters.shouldAutomaticReconnectWithDefaultConfig()) {
+ connectOptions.setAutomaticReconnect(true);
+ }
+ if (parameters.getUsername() != null) {
+ connectOptions.setUserName(parameters.getUsername());
+ }
+ if (parameters.getPassword() != null) {
+ connectOptions.setPassword(parameters.getPassword().toCharArray());
+ }
+
+ if (parameters.isSecureProtocol()) {
+ PahoMQTTSsl ssl = (PahoMQTTSsl) parameters.getSsl();
+ connectOptions.setSocketFactory(PahoUtil.getSocketFactory(ssl.getCaFilePath(), ssl.getClientCertFilePath(), ssl.getClientKeyFilePath(), ""));
+ }
+
+ try {
+ client.connect(connectOptions)
+ .waitForCompletion(parameters.getConnectTimeout() * 1000L);
+
+ logger.info(() -> "Connected client: " + parameters.getClientId());
+ return new PahoMQTT3Connection(client);
+ } catch (MqttException e) {
+ throw new MQTTClientException("Connection error " + client, e);
+ }
+ }
+}
+
+
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java
new file mode 100644
index 0000000..5c0832e
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java
@@ -0,0 +1,98 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.samplers.mqtt.*;
+import org.eclipse.paho.client.mqttv3.*;
+
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+class PahoMQTT3Connection implements MQTTConnection {
+
+ private static final Logger logger = Logger.getLogger(PahoMQTT3Connection.class.getCanonicalName());
+
+ private final MqttAsyncClient client;
+
+ private MQTTSubListener listener;
+
+ PahoMQTT3Connection(MqttAsyncClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean isConnectionSucc() {
+ return client.isConnected();
+ }
+
+ @Override
+ public String getClientId() {
+ return client.getClientId();
+ }
+
+ @Override
+ public void disconnect() throws Exception {
+ client.disconnect();
+ }
+
+ @Override
+ public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) {
+ try {
+ MqttMessage msg = new MqttMessage();
+ msg.setPayload(message);
+ msg.setQos((PahoUtil.map(qoS)));
+ msg.setRetained(retained);
+ client.publish(topicName, msg);
+ return new MQTTPubResult(true);
+ } catch (Exception error) {
+ return new MQTTPubResult(false, error.getMessage());
+ }
+ }
+
+ @Override
+ public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) {
+ int pahoQos = PahoUtil.map(qos);
+
+ for (String topicName : topicNames) {
+ try {
+ IMqttActionListener subListener = new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken iMqttToken) {
+ onSuccess.run();
+ }
+
+ @Override
+ public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
+ onFailure.accept(throwable);
+ }
+ };
+
+ client.subscribe(topicName, pahoQos, "", subListener, (topic, message) -> {
+ try {
+ handleMessageArrived(topic, message);
+ } catch (Exception e) {
+ logger.severe(e.getMessage());
+ }
+ });
+ } catch (MqttException e) {
+ logger.warning("Failed to subscribe " + topicName + ", error: " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void setSubListener(MQTTSubListener listener) {
+ this.listener = listener;
+ }
+
+ private void handleMessageArrived(String topic, MqttMessage message) {
+ this.listener.accept(topic, new String(message.getPayload()), () -> {
+ });
+ }
+
+ @Override
+ public String toString() {
+ return "PahoMQTT3Connection{" +
+ "clientId='" + client.getClientId() + '\'' +
+ '}';
+ }
+}
+
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java
new file mode 100644
index 0000000..18c7ff3
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java
@@ -0,0 +1,86 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.samplers.mqtt.ConnectionParameters;
+import net.xmeter.samplers.mqtt.MQTTClient;
+import net.xmeter.samplers.mqtt.MQTTClientException;
+import net.xmeter.samplers.mqtt.MQTTConnection;
+import org.eclipse.paho.mqttv5.client.*;
+import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+public class PahoMQTT5Client implements MQTTClient {
+
+ private static final Logger logger = Logger.getLogger(PahoMQTT5Client.class.getCanonicalName());
+ private final ConnectionParameters parameters;
+ private final MqttAsyncClient client;
+
+ PahoMQTT5Client(ConnectionParameters parameters) throws MqttException {
+ this.parameters = parameters;
+ MemoryPersistence persistence = new MemoryPersistence();
+ this.client = new MqttAsyncClient(PahoUtil.createHostAddress(parameters), this.getClientId(), persistence);
+ }
+
+
+ @Override
+ public String getClientId() {
+ return parameters.getClientId();
+ }
+
+ @Override
+ public MQTTConnection connect() throws Exception {
+
+ MqttConnectionOptions connectOptions = new MqttConnectionOptions();
+ connectOptions.setCleanStart(parameters.isCleanStart());
+ connectOptions.setKeepAliveInterval(parameters.getKeepAlive());
+ connectOptions.setConnectionTimeout(parameters.getConnectTimeout());
+ connectOptions.setSessionExpiryInterval(parameters.getSessionExpiryInterval());
+
+ List userProperty = PahoUtil.ConvertUserProperty(parameters.getConnUserProperty());
+ if (!userProperty.isEmpty()) {
+ connectOptions.setUserProperties(userProperty);
+ }
+
+ if (parameters.shouldAutomaticReconnectWithDefaultConfig()) {
+ connectOptions.setAutomaticReconnect(true);
+ }
+ if (parameters.getUsername() != null) {
+ connectOptions.setUserName(parameters.getUsername());
+ }
+ if (parameters.getPassword() != null) {
+ connectOptions.setPassword(parameters.getPassword().getBytes());
+ }
+
+ if (parameters.isSecureProtocol()) {
+ PahoMQTTSsl ssl = (PahoMQTTSsl) parameters.getSsl();
+ connectOptions.setSocketFactory(PahoUtil.getSocketFactory(ssl.getCaFilePath(), ssl.getClientCertFilePath(), ssl.getClientKeyFilePath(), ""));
+ }
+
+ if (parameters.getConnWsHeader() != null && !parameters.getConnWsHeader().isEmpty()) {
+ connectOptions.setCustomWebSocketHeaders(parameters.getConnWsHeader());
+ }
+
+ try {
+ client.connect(connectOptions)
+ .waitForCompletion(parameters.getConnectTimeout() * 1000L);
+
+ logger.info(() -> "Connected client: " + parameters.getClientId());
+ return new PahoMQTT5Connection(client);
+ } catch (MqttException e) {
+ throw new MQTTClientException("Connection error " + client, e);
+ }
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java
new file mode 100644
index 0000000..a8bc0f4
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java
@@ -0,0 +1,185 @@
+package net.xmeter.samplers.mqtt.paho;
+
+
+import net.xmeter.samplers.mqtt.*;
+import org.eclipse.paho.mqttv5.client.*;
+import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.MqttSubscription;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
+
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+class PahoMQTT5Connection implements MQTTConnection {
+
+ private static final Logger logger = Logger.getLogger(PahoMQTT5Connection.class.getCanonicalName());
+
+ private final MqttAsyncClient client;
+
+ private MQTTSubListener listener;
+
+ PahoMQTT5Connection(MqttAsyncClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean isConnectionSucc() {
+ return client.isConnected();
+ }
+
+ @Override
+ public String getClientId() {
+ return client.getClientId();
+ }
+
+ @Override
+ public void disconnect() throws Exception {
+ client.disconnect();
+ }
+
+ @Override
+ public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) {
+ try {
+ MqttMessage msg = new MqttMessage();
+ msg.setPayload(message);
+ msg.setQos((PahoUtil.map(qoS)));
+ msg.setRetained(retained);
+ msg.setProperties(buildMqtt5MessageFeature(req));
+
+ client.publish(topicName, msg);
+ return new MQTTPubResult(true);
+ } catch (Exception error) {
+ return new MQTTPubResult(false, error.getMessage());
+ }
+ }
+
+ @Override
+ public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) {
+ int pahoQos = PahoUtil.map(qos);
+
+ for (String topicName : topicNames) {
+ try {
+
+ MqttActionListener subListener = new MqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken iMqttToken) {
+ onSuccess.run();
+ }
+
+ @Override
+ public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
+ onFailure.accept(throwable);
+ }
+ };
+
+ // the subscribe method has issue when pass MqttActionListener and IMqttMessageListener
+ // https://github.com/eclipse/paho.mqtt.java/issues/1019
+ // https://github.com/eclipse/paho.mqtt.java/issues/826
+ // for `messageArrived` use `client.setCallback` to replace at this time.
+ client.setCallback(new MqttCallback() {
+ @Override
+ public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
+
+ }
+
+ @Override
+ public void mqttErrorOccurred(MqttException e) {
+
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ try {
+ handleMessageArrived(topic, message);
+ } catch (Exception e) {
+ logger.severe(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttToken iMqttToken) {
+
+ }
+
+ @Override
+ public void connectComplete(boolean b, String s) {
+
+ }
+
+ @Override
+ public void authPacketArrived(int i, MqttProperties mqttProperties) {
+
+ }
+ });
+
+// MqttSubscription mqttSubscription = new MqttSubscription(topicName);
+// mqttSubscription.setQos(pahoQos);
+// mqttSubscription.setNoLocal(false);
+// mqttSubscription.setRetainHandling(0);
+// mqttSubscription.setRetainAsPublished(false);
+
+ client.subscribe(topicName, pahoQos, "", subListener);
+ } catch (MqttException e) {
+ logger.warning("Failed to subscribe " + topicName + ", error: " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void setSubListener(MQTTSubListener listener) {
+ this.listener = listener;
+ }
+
+ private void handleMessageArrived(String topic, MqttMessage message) {
+ this.listener.accept(topic, new String(message.getPayload()), () -> {
+ });
+ }
+
+ private MqttProperties buildMqtt5MessageFeature(MQTT5PublishReq req){
+ MqttProperties properties = new MqttProperties();
+ properties.setMessageExpiryInterval(req.getMessageExpiryInterval());
+
+ if (req.getCorrelationData() != null && !req.getCorrelationData().trim().isEmpty()) {
+ properties.setCorrelationData(req.getCorrelationData().getBytes());
+ }
+
+ List userProperty = PahoUtil.ConvertUserProperty(req.getUserProperties());
+ if (!userProperty.isEmpty()) {
+ properties.setUserProperties(userProperty);
+ }
+
+ if (req.getResponseTopic() != null && !req.getResponseTopic().isEmpty()) {
+ properties.setResponseTopic(req.getResponseTopic());
+ }
+
+ if (req.getContentType() != null && !req.getContentType().isEmpty()){
+ properties.setContentType(req.getContentType());
+ }
+
+ if (!req.getPayloadFormatIndicator().isEmpty()) {
+ properties.setPayloadFormat(req.getPayloadFormatIndicator().equals("UTF_8"));
+ }
+
+ if (req.getTopicAlias() != null && !req.getTopicAlias().isEmpty()) {
+ properties.setTopicAlias(Integer.parseInt(req.getTopicAlias()));
+ }
+
+ if (req.getSubscriptionIdentifier() != null && !req.getSubscriptionIdentifier().isEmpty()) {
+ properties.setSubscriptionIdentifier(Integer.parseInt(req.getSubscriptionIdentifier()));
+ }
+
+// logger.severe(properties.toString());
+
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return "PahoMQTT5Connection{" +
+ "clientId='" + client.getClientId() + '\'' +
+ '}';
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java
new file mode 100644
index 0000000..e3743ab
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java
@@ -0,0 +1,46 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.Constants;
+import net.xmeter.Util;
+import net.xmeter.samplers.AbstractMQTTSampler;
+import net.xmeter.samplers.mqtt.ConnectionParameters;
+import net.xmeter.samplers.mqtt.MQTTClient;
+import net.xmeter.samplers.mqtt.MQTTFactory;
+import net.xmeter.samplers.mqtt.MQTTSsl;
+
+import java.util.List;
+
+class PahoMQTTFactory implements MQTTFactory {
+ @Override
+ public String getName() {
+ return Constants.PAHO_MQTT_CLIENT_NAME;
+ }
+
+ @Override
+ public List getSupportedProtocols() {
+ return PahoUtil.ALLOWED_PROTOCOLS;
+ }
+
+ @Override
+ public MQTTClient createClient(ConnectionParameters parameters) throws Exception {
+ String mqttVersion = parameters.getVersion();
+ if (Constants.MQTT_VERSION_3_1.equals(mqttVersion) || Constants.MQTT_VERSION_3_1_1.equals(mqttVersion)){
+ return new PahoMQTT3Client(parameters);
+ }
+ else {
+ return new PahoMQTT5Client(parameters);
+ }
+ }
+
+ @Override
+ public MQTTSsl createSsl(AbstractMQTTSampler sampler) throws Exception {
+ if (!sampler.isDualSSLAuth()){
+ return new PahoMQTTSsl("","","");
+ } else {
+ return new PahoMQTTSsl(
+ sampler.getCAFilePath(),
+ sampler.getClientCert2FilePath(),
+ sampler.getClientPrivateKeyFilePath());
+ }
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java
new file mode 100644
index 0000000..35237c9
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java
@@ -0,0 +1,11 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.samplers.mqtt.*;
+
+
+public class PahoMQTTSpi implements MQTTSpi {
+ @Override
+ public MQTTFactory factory() {
+ return new PahoMQTTFactory();
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java
new file mode 100644
index 0000000..5d9748f
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java
@@ -0,0 +1,38 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import net.xmeter.samplers.mqtt.MQTTSsl;
+
+import javax.net.ssl.SSLContext;
+
+class PahoMQTTSsl implements MQTTSsl {
+ private final String caFilePath;
+ private final String clientCertFilePath;
+ private final String clientKeyFilePath;
+
+ PahoMQTTSsl(String ca, String cc, String ck) {
+ this.caFilePath = ca;
+ this.clientCertFilePath = cc;
+ this.clientKeyFilePath = ck;
+ }
+
+ String getCaFilePath(){
+ return caFilePath;
+ }
+
+ String getClientCertFilePath(){
+ return clientCertFilePath;
+ }
+
+ String getClientKeyFilePath(){
+ return clientKeyFilePath;
+ }
+
+ @Override
+ public String toString() {
+ return "PahoMQTTSsl{" +
+ "ca='" + caFilePath + '\'' +
+ "cc='" + clientCertFilePath + '\'' +
+ "ck='" + clientKeyFilePath + '\'' +
+ '}';
+ }
+}
diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java
new file mode 100644
index 0000000..d87f46e
--- /dev/null
+++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java
@@ -0,0 +1,146 @@
+package net.xmeter.samplers.mqtt.paho;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import net.xmeter.Constants;
+import net.xmeter.samplers.mqtt.ConnectionParameters;
+import net.xmeter.samplers.mqtt.MQTTQoS;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openssl.PEMDecryptorProvider;
+import org.bouncycastle.openssl.PEMEncryptedKeyPair;
+import org.bouncycastle.openssl.PEMKeyPair;
+import org.bouncycastle.openssl.PEMParser;
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+class PahoUtil {
+ static final List ALLOWED_PROTOCOLS;
+
+ static {
+ ALLOWED_PROTOCOLS = new ArrayList<>();
+ ALLOWED_PROTOCOLS.add(Constants.TCP_PROTOCOL);
+ ALLOWED_PROTOCOLS.add(Constants.SSL_PROTOCOL);
+ ALLOWED_PROTOCOLS.add(Constants.WS_PROTOCOL);
+ ALLOWED_PROTOCOLS.add(Constants.WSS_PROTOCOL);
+ }
+
+ static int map(MQTTQoS qos) {
+ switch (qos) {
+ case AT_MOST_ONCE:
+ return 0;
+ case AT_LEAST_ONCE:
+ return 1;
+ case EXACTLY_ONCE:
+ return 2;
+ default:
+ throw new IllegalArgumentException("Unknown QoS: " + qos);
+ }
+ }
+
+ static String createHostAddress(ConnectionParameters parameters) {
+ return String.format("%s://%s:%d",
+ parameters.getProtocol().toLowerCase(),
+ parameters.getHost(),
+ parameters.getPort());
+ }
+
+ static List ConvertUserProperty(Map userProperty) {
+ if (!userProperty.isEmpty()) {
+ ArrayList userDefinedProperties = new ArrayList<>();
+ for (Map.Entry entry : userProperty.entrySet()) {
+ userDefinedProperties.add(new UserProperty(entry.getKey(), entry.getValue()));
+ }
+ return userDefinedProperties;
+ } else {
+ return new ArrayList();
+ }
+ }
+
+ static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception {
+ Security.addProvider(new BouncyCastleProvider());
+
+ // Load CA certificates
+ KeyStore caKs = loadCAKeyStore(caCrtFile);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(caKs);
+ SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
+ sslContext.init(null, tmf.getTrustManagers(), null);
+ return sslContext.getSocketFactory();
+ }
+
+ static SSLSocketFactory getSocketFactory(final String caCrtFile,
+ final String crtFile, final String keyFile, final String password)
+ throws Exception {
+ Security.addProvider(new BouncyCastleProvider());
+
+ // Load CA certificates
+ KeyStore caKs = loadCAKeyStore(caCrtFile);
+
+ // Load client certificate chain and key
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+ ks.load(null, null);
+
+ // Load the entire client certificate chain
+ Certificate[] chain;
+ try (FileInputStream fis = new FileInputStream(crtFile)) {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ Collection extends Certificate> certs = cf.generateCertificates(fis);
+ chain = certs.toArray(new Certificate[0]);
+ }
+
+ // Load client private key
+ try (PEMParser pemParser = new PEMParser(new FileReader(keyFile))) {
+ Object object = pemParser.readObject();
+ JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
+ KeyPair key = converter.getKeyPair((PEMKeyPair) object);
+ ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), chain);
+ }
+
+ // Set up key managers and trust managers
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
+ tmf.init(caKs);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, password.toCharArray());
+
+ // finally, create SSL socket factory
+ SSLContext context = SSLContext.getInstance("TLSv1.2");
+ context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ return context.getSocketFactory();
+ }
+
+ private static KeyStore loadCAKeyStore(String caCrtFile) throws Exception {
+ KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
+ caKs.load(null, null);
+ try (FileInputStream fis = new FileInputStream(caCrtFile)) {
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ Collection extends Certificate> caCerts = cf.generateCertificates(fis);
+ int certIndex = 1;
+ for (Certificate caCert : caCerts) {
+ String alias = "ca-certificate-" + certIndex++;
+ caKs.setCertificateEntry(alias, caCert);
+ }
+ }
+ return caKs;
+ }
+}
diff --git a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi
index 4ccdcc5..b1500f3 100644
--- a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi
+++ b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi
@@ -1,2 +1,3 @@
net.xmeter.samplers.mqtt.fuse.FuseMQTTSpi
net.xmeter.samplers.mqtt.hivemq.HiveMQTTSpi
+net.xmeter.samplers.mqtt.paho.PahoMQTTSpi