From 5ecabd071473cf9d4dbf51c1a46fa160af7df635 Mon Sep 17 00:00:00 2001 From: catcherwong Date: Sat, 23 Mar 2024 01:19:59 +0000 Subject: [PATCH 1/2] Add support for mqtt5 Signed-off-by: catcherwong --- mqtt_jmeter/pom.xml | 20 +- .../src/main/java/net/xmeter/Constants.java | 22 +- .../java/net/xmeter/gui/CommonConnUI.java | 203 +++++++++++++++--- .../java/net/xmeter/gui/PubSamplerUI.java | 64 +++++- .../xmeter/samplers/AbstractMQTTSampler.java | 72 +++++++ .../net/xmeter/samplers/ConnectSampler.java | 9 +- .../java/net/xmeter/samplers/PubSampler.java | 90 +++++++- .../samplers/mqtt/ConnectionParameters.java | 98 +++++++++ .../xmeter/samplers/mqtt/MQTT5PublishReq.java | 94 ++++++++ .../xmeter/samplers/mqtt/MQTTConnection.java | 3 +- .../mqtt/fuse/FuseMQTTConnection.java | 7 +- .../mqtt/hivemq/HiveMQTTConnection.java | 8 +- .../samplers/mqtt/paho/PahoMQTT3Client.java | 71 ++++++ .../mqtt/paho/PahoMQTT3Connection.java | 98 +++++++++ .../samplers/mqtt/paho/PahoMQTT5Client.java | 94 ++++++++ .../mqtt/paho/PahoMQTT5Connection.java | 185 ++++++++++++++++ .../samplers/mqtt/paho/PahoMQTTFactory.java | 46 ++++ .../samplers/mqtt/paho/PahoMQTTSpi.java | 11 + .../samplers/mqtt/paho/PahoMQTTSsl.java | 38 ++++ .../xmeter/samplers/mqtt/paho/PahoUtil.java | 146 +++++++++++++ .../services/net.xmeter.samplers.mqtt.MQTTSpi | 1 + 21 files changed, 1323 insertions(+), 57 deletions(-) create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java create mode 100644 mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java diff --git a/mqtt_jmeter/pom.xml b/mqtt_jmeter/pom.xml index 1840dcd..4efd495 100644 --- a/mqtt_jmeter/pom.xml +++ b/mqtt_jmeter/pom.xml @@ -3,7 +3,7 @@ 4.0.0 net.xmeter mqtt-jmeter - 2.0.2 + 2.0.3 5.5 @@ -37,6 +37,24 @@ 1.3.0 + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + org.eclipse.paho + org.eclipse.paho.mqttv5.client + 1.2.5 + + + + org.bouncycastle + bcpkix-jdk15on + 1.70 + + org.apache.jmeter jorphan diff --git a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java index 9fdadce..1e04d79 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/Constants.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/Constants.java @@ -9,6 +9,7 @@ public interface Constants { public static final String PROTOCOL = "mqtt.protocol"; public static final String WS_PATH = "mqtt.ws_path"; + public static final String WS_HEADER = "mqtt.ws_header"; public static final String DUAL_AUTH = "mqtt.dual_ssl_authentication"; public static final String CERT_FILE_PATH1 = "mqtt.keystore_file_path"; public static final String CERT_FILE_PATH2 = "mqtt.clientcert_file_path"; @@ -17,6 +18,8 @@ public interface Constants { public static final String USER_NAME_AUTH = "mqtt.user_name"; public static final String PASSWORD_AUTH = "mqtt.password"; + public static final String AUTH_METHOD = "mqtt.auth_method"; + public static final String AUTH_DATA = "mqtt.auth_data"; public static final String CONN_CLIENT_ID_PREFIX = "mqtt.client_id_prefix"; public static final String CONN_CLIENT_ID_SUFFIX = "mqtt.client_id_suffix"; @@ -26,6 +29,10 @@ public interface Constants { public static final String CONN_RECONN_ATTEMPT_MAX = "mqtt.reconn_attempt_max"; public static final String CONN_CLEAN_SESSION = "mqtt.conn_clean_session"; + + public static final String CONN_USER_PROPERTY = "mqtt.conn_user_property"; + public static final String CONN_CLEAN_START = "mqtt.conn_clean_start"; + public static final String CONN_SESSION_EXPIRY_INTERVAL = "mqtt.conn_session_expiry_interval"; public static final String MESSAGE_TYPE = "mqtt.message_type"; public static final String MESSAGE_FIX_LENGTH = "mqtt.message_type_fixed_length"; @@ -35,6 +42,15 @@ public interface Constants { public static final String QOS_LEVEL = "mqtt.qos_level"; public static final String ADD_TIMESTAMP = "mqtt.add_timestamp"; public static final String RETAINED_MESSAGE = "mqtt.retained_message"; + + public static final String CORRELATION_DATA = "mqtt.correlation_data"; + public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt.message_expiry_interval"; + public static final String USER_PROPERTIES = "mqtt.user_properties"; + public static final String CONTENT_TYPE = "mqtt.content_type"; + public static final String RESPONSE_TOPIC = "mqtt.response_topic"; + public static final String PAYLOAD_FORMAT = "mqtt.payload_format_indicator"; + public static final String TOPIC_ALIAS = "mqtt.topic_alias"; + public static final String SUBSCRIPTION_IDENTIFIER = "mqtt.subscription_identifier"; public static final String SAMPLE_CONDITION_VALUE = "mqtt.sample_condition_value"; public static final String SAMPLE_CONDITION = "mqtt.sample_condition"; @@ -53,7 +69,7 @@ public interface Constants { public static final String MQTT_VERSION_3_1_1 = "3.1.1"; public static final String MQTT_VERSION_3_1 = "3.1"; - + public static final String MQTT_VERSION_5_0 = "5.0"; public static final String SAMPLE_ON_CONDITION_OPTION1 = "specified elapsed time (ms)"; public static final String SAMPLE_ON_CONDITION_OPTION2 = "number of received messages"; @@ -69,9 +85,10 @@ public interface Constants { public static final String WSS_PROTOCOL = "WSS"; public static final String DEFAULT_PROTOCOL = TCP_PROTOCOL; public static final String FUSESOURCE_MQTT_CLIENT_NAME = "fusesource"; + public static final String PAHO_MQTT_CLIENT_NAME = "paho"; public static final String HIVEMQ_MQTT_CLIENT_NAME = "hivemq"; // public static final String DEFAULT_MQTT_CLIENT_NAME = FUSESOURCE_MQTT_CLIENT_NAME; - public static final String DEFAULT_MQTT_CLIENT_NAME = HIVEMQ_MQTT_CLIENT_NAME; + public static final String DEFAULT_MQTT_CLIENT_NAME = PAHO_MQTT_CLIENT_NAME; public static final String JMETER_VARIABLE_PREFIX = "${"; @@ -82,6 +99,7 @@ public interface Constants { public static final String DEFAULT_CONN_KEEP_ALIVE = "300"; public static final String DEFAULT_CONN_ATTEMPT_MAX = "0"; public static final String DEFAULT_CONN_RECONN_ATTEMPT_MAX = "0"; + public static final String DEFAULT_CONN_USER_PROPERTY = "{}"; public static final String DEFAULT_SAMPLE_VALUE_COUNT = "1"; public static final String DEFAULT_SAMPLE_VALUE_ELAPSED_TIME_MILLI_SEC = "1000"; diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java index a1d44cc..3a9e029 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java @@ -10,11 +10,7 @@ import java.util.Arrays; import java.util.List; -import javax.swing.BorderFactory; -import javax.swing.JButton; -import javax.swing.JCheckBox; -import javax.swing.JFileChooser; -import javax.swing.JPanel; +import javax.swing.*; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; @@ -33,17 +29,21 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private final JLabeledTextField serverAddr = new JLabeledTextField("Server name or IP:"); private final JLabeledTextField serverPort = new JLabeledTextField("Port number:", 5); - private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1 }, false, false);; + private final JLabel mqttVersionLabel = new JLabel("MQTT version:"); + private JLabeledChoice mqttVersion = new JLabeledChoice("MQTT version:", new String[] { MQTT_VERSION_3_1, MQTT_VERSION_3_1_1, MQTT_VERSION_5_0 }, false, false); private final JLabeledTextField timeout = new JLabeledTextField("Timeout(s):", 5); private final JLabeledTextField userNameAuth = new JLabeledTextField("User name:"); private final JLabeledTextField passwordAuth = new JLabeledTextField("Password:"); + private final JLabeledTextField authMethod = new JLabeledTextField("Auth Method:"); + private final JLabeledTextField authData = new JLabeledTextField("Auth Data:"); private JLabeledChoice protocols; // private JLabeledChoice clientNames; private JCheckBox dualAuth = new JCheckBox("Dual SSL authentication"); private JLabeledTextField wsPath = new JLabeledTextField("WS Path: ", 10); + private final JLabeledTextField wsHeader = new JLabeledTextField("WS Header: ", 20); // private final JLabeledTextField tksFilePath = new JLabeledTextField("Trust Key Store(*.jks): ", 25); private final JLabeledTextField ccFilePath = new JLabeledTextField("Client Certification(*.p12):", 25); @@ -51,8 +51,17 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ // private final JLabeledTextField tksPassword = new JLabeledTextField("Secret:", 10); private final JLabeledTextField ccPassword = new JLabeledTextField("Secret:", 10); + private final JLabeledTextField caCertFilePath = new JLabeledTextField("CA Cert :", 25); + private final JLabeledTextField clientCertFilePath = new JLabeledTextField("Client Cert:", 25); + private final JLabeledTextField clientKeyFilePath = new JLabeledTextField("Client Key :", 25); + // private JButton tksBrowseButton; private JButton ccBrowseButton; + + private JButton caBrowseButton; + private JButton clientCertBrowseButton; + private JButton clientKeyBrowseButton; + // private static final String TKS_BROWSE = "tks_browse"; private static final String CC_BROWSE = "cc_browse"; @@ -66,6 +75,11 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private final JLabeledTextField connCleanSession = new JLabeledTextField("Clean session:", 3); + public final JLabeledTextField connUserProperty = new JLabeledTextField("User Property:", 10); + + private final JLabeledTextField connCleanStart = new JLabeledTextField("Clean start:", 5); + + private final JLabeledTextField connSessionExpiryInterval = new JLabeledTextField("Session Expiry Interval(s):", 5); // private final List clientNamesList = MQTT.getAvailableNames(); public JPanel createConnPanel() { @@ -75,12 +89,15 @@ public JPanel createConnPanel() { connPanel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "MQTT connection")); connPanel.add(serverAddr); connPanel.add(serverPort); + connPanel.add(mqttVersionLabel); connPanel.add(mqttVersion); JPanel timeoutPannel = new HorizontalPanel(); timeoutPannel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEtchedBorder(), "Timeout")); timeoutPannel.add(timeout); + mqttVersion.addChangeListener(this); + con.add(connPanel); con.add(timeoutPannel); return con; @@ -98,13 +115,22 @@ public JPanel createConnOptions() { JPanel optsPanel1 = new HorizontalPanel(); optsPanel1.add(connKeepAlive); - optsPanelCon.add(optsPanel1); - optsPanel1.add(connAttmptMax); optsPanel1.add(reconnAttmptMax); optsPanel1.add(connCleanSession); optsPanelCon.add(optsPanel1); - + + JPanel optsPanel2 = new HorizontalPanel(); + optsPanel2.add(connUserProperty); + optsPanel2.add(connCleanStart); + optsPanel2.add(connSessionExpiryInterval); + + connUserProperty.setVisible(false); + connCleanStart.setVisible(false); + connSessionExpiryInterval.setVisible(false); + + optsPanelCon.add(optsPanel2); + return optsPanelCon; } @@ -116,6 +142,13 @@ public JPanel createAuthentication() { optsPanel.add(userNameAuth); optsPanel.add(passwordAuth); optsPanelCon.add(optsPanel); + +// JPanel optsPanel1 = new HorizontalPanel(); +// optsPanel1.add(authMethod); +// optsPanel1.add(authData); +// authMethod.setVisible(false); +// authData.setVisible(false); +// optsPanelCon.add(optsPanel1); return optsPanelCon; } @@ -143,6 +176,9 @@ public JPanel createProtocolPanel() { wsPath.setVisible(false); pCenter.add(wsPath); + wsHeader.setVisible(false); + pCenter.add(wsHeader); + pPanel.add(pCenter, BorderLayout.CENTER); dualAuth.setSelected(false); @@ -171,20 +207,53 @@ public JPanel createProtocolPanel() { // panel.add(tksPassword, c); //c.weightx = 0.0; - c.gridx = 0; c.gridy = 1; c.gridwidth = 2; - ccFilePath.setVisible(false); - panel.add(ccFilePath, c); - - c.gridx = 2; c.gridy = 1; c.gridwidth = 1; - ccBrowseButton = new JButton(JMeterUtils.getResString("browse")); - ccBrowseButton.setActionCommand(CC_BROWSE); - ccBrowseButton.addActionListener(this); - ccBrowseButton.setVisible(false); - panel.add(ccBrowseButton, c); +// c.gridx = 0; c.gridy = 1; c.gridwidth = 2; +// ccFilePath.setVisible(false); +// panel.add(ccFilePath, c); +// +// c.gridx = 2; c.gridy = 1; c.gridwidth = 1; +// ccBrowseButton = new JButton(JMeterUtils.getResString("browse")); +// ccBrowseButton.setActionCommand(CC_BROWSE); +// ccBrowseButton.addActionListener(this); +// ccBrowseButton.setVisible(false); +// panel.add(ccBrowseButton, c); - c.gridx = 3; c.gridy = 1; c.gridwidth = 2; - ccPassword.setVisible(false); - panel.add(ccPassword, c); +// c.gridx = 3; c.gridy = 1; c.gridwidth = 2; +// ccPassword.setVisible(false); +// panel.add(ccPassword, c); + + c.gridx = 0; c.gridy = 2; c.gridwidth = 2; + caCertFilePath.setVisible(false); + panel.add(caCertFilePath, c); + + c.gridx = 2; c.gridy = 2; c.gridwidth = 1; + caBrowseButton = new JButton(JMeterUtils.getResString("browse")); + caBrowseButton.setActionCommand("ca_browse"); + caBrowseButton.addActionListener(this); + caBrowseButton.setVisible(false); + panel.add(caBrowseButton, c); + + c.gridx = 0; c.gridy = 3; c.gridwidth = 2; + clientCertFilePath.setVisible(false); + panel.add(clientCertFilePath, c); + + c.gridx = 2; c.gridy = 3; c.gridwidth = 1; + clientCertBrowseButton = new JButton(JMeterUtils.getResString("browse")); + clientCertBrowseButton.setActionCommand("client_cert_browse"); + clientCertBrowseButton.addActionListener(this); + clientCertBrowseButton.setVisible(false); + panel.add(clientCertBrowseButton, c); + + c.gridx = 0; c.gridy = 4; c.gridwidth = 2; + clientKeyFilePath.setVisible(false); + panel.add(clientKeyFilePath, c); + + c.gridx = 2; c.gridy = 4; c.gridwidth = 1; + clientKeyBrowseButton = new JButton(JMeterUtils.getResString("browse")); + clientKeyBrowseButton.setActionCommand("client_key_browse"); + clientKeyBrowseButton.addActionListener(this); + clientKeyBrowseButton.setVisible(false); + panel.add(clientKeyBrowseButton, c); protocolPanel.add(pPanel); protocolPanel.add(panel); @@ -202,6 +271,15 @@ public void actionPerformed(ActionEvent e) { if(CC_BROWSE.equals(action)) { String path = browseAndGetFilePath(); ccFilePath.setText(path); + } else if ("ca_browse".equals(action)) { + String path = browseAndGetFilePath(); + caCertFilePath.setText(path); + } else if ("client_cert_browse".equals(action)) { + String path = browseAndGetFilePath(); + clientCertFilePath.setText(path); + } else if ("client_key_browse".equals(action)) { + String path = browseAndGetFilePath(); + clientKeyFilePath.setText(path); } } private String browseAndGetFilePath() { @@ -223,16 +301,30 @@ public void stateChanged(ChangeEvent e) { // tksFilePath.setVisible(true); // tksBrowseButton.setVisible(true); // tksPassword.setVisible(true); - ccFilePath.setVisible(true); - ccBrowseButton.setVisible(true); - ccPassword.setVisible(true); +// ccFilePath.setVisible(true); +// ccBrowseButton.setVisible(true); +// ccPassword.setVisible(true); + + caCertFilePath.setVisible(true); + caBrowseButton.setVisible(true); + clientCertFilePath.setVisible(true); + clientCertBrowseButton.setVisible(true); + clientKeyFilePath.setVisible(true); + clientKeyBrowseButton.setVisible(true); } else { // tksFilePath.setVisible(false); // tksBrowseButton.setVisible(false); // tksPassword.setVisible(false); - ccFilePath.setVisible(false); - ccBrowseButton.setVisible(false); - ccPassword.setVisible(false); +// ccFilePath.setVisible(false); +// ccBrowseButton.setVisible(false); +// ccPassword.setVisible(false); + + caCertFilePath.setVisible(false); + caBrowseButton.setVisible(false); + clientCertFilePath.setVisible(false); + clientCertBrowseButton.setVisible(false); + clientKeyFilePath.setVisible(false); + clientKeyBrowseButton.setVisible(false); } } else if(e.getSource() == protocols) { boolean isSecure = Util.isSecureProtocol(protocols.getText()); @@ -241,6 +333,8 @@ public void stateChanged(ChangeEvent e) { boolean wsProtocol = Util.isWebSocketProtocol(protocols.getText()); wsPath.setVisible(wsProtocol); wsPath.setEnabled(wsProtocol); + wsHeader.setVisible(wsProtocol); + wsHeader.setEnabled(wsProtocol); // } else if (e.getSource() == clientNames) { // int index = clientNames.getSelectedIndex(); // if (index > -1) { @@ -250,6 +344,14 @@ public void stateChanged(ChangeEvent e) { // } else { // protocols.setValues(new String[0]); // } + } else if(e.getSource() == mqttVersion) { + boolean isMqtt5 = mqttVersion.getText().equals(MQTT_VERSION_5_0); + connUserProperty.setVisible(isMqtt5); + connCleanStart.setVisible(isMqtt5); + connSessionExpiryInterval.setVisible(isMqtt5); + connCleanSession.setVisible(!isMqtt5); +// authMethod.setVisible(isMqtt5); +// authData.setVisible(isMqtt5); } } @@ -260,7 +362,10 @@ public void configure(AbstractMQTTSampler sampler) { mqttVersion.setSelectedIndex(0); } else if(sampler.getMqttVersion().equals(MQTT_VERSION_3_1_1)) { mqttVersion.setSelectedIndex(1); + } else if(sampler.getMqttVersion().equals(MQTT_VERSION_5_0)) { + mqttVersion.setSelectedIndex(2); } + timeout.setText(sampler.getConnTimeout()); // if (sampler.getProtocol().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1) { @@ -270,7 +375,7 @@ public void configure(AbstractMQTTSampler sampler) { // clientNames.setText(sampler.getMqttClientName()); // } - if(sampler.getProtocol().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(sampler.getProtocol().trim().contains(JMETER_VARIABLE_PREFIX)) { List items = Arrays.asList(protocols.getItems()); int index = items.indexOf(sampler.getProtocol()); protocols.setSelectedIndex(index); @@ -283,6 +388,10 @@ public void configure(AbstractMQTTSampler sampler) { wsPath.setVisible(wsProtocol); wsPath.setEnabled(wsProtocol); + wsHeader.setText(sampler.getWsHeader()); + wsHeader.setVisible(wsProtocol); + wsHeader.setEnabled(wsProtocol); + if(sampler.isDualSSLAuth()) { dualAuth.setVisible(true); dualAuth.setSelected(sampler.isDualSSLAuth()); @@ -294,9 +403,11 @@ public void configure(AbstractMQTTSampler sampler) { userNameAuth.setText(sampler.getUserNameAuth()); passwordAuth.setText(sampler.getPasswordAuth()); +// authMethod.setText(sampler.getAuthMethod()); +// authData.setText(sampler.getAuthData()); connNamePrefix.setText(sampler.getConnPrefix()); - if(sampler.isClientIdSuffix()) { + if (sampler.isClientIdSuffix()) { connNameSuffix.setSelected(true); } else { connNameSuffix.setSelected(false); @@ -306,7 +417,15 @@ public void configure(AbstractMQTTSampler sampler) { connAttmptMax.setText(sampler.getConnAttemptMax()); reconnAttmptMax.setText(sampler.getConnReconnAttemptMax()); - connCleanSession.setText(sampler.getConnCleanSession().toString()); + connCleanSession.setText(sampler.getConnCleanSession()); + + connUserProperty.setText(sampler.getConnUserProperty()); + connCleanStart.setText(sampler.getConnCleanStart()); + connSessionExpiryInterval.setText(sampler.getConnSessionExpiryInterval()); + + caCertFilePath.setText(sampler.getCAFilePath()); + clientCertFilePath.setText(sampler.getClientCert2FilePath()); + clientKeyFilePath.setText(sampler.getClientPrivateKeyFilePath()); } @@ -319,6 +438,7 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { // sampler.setMqttClientName(clientNames.getText()); sampler.setProtocol(protocols.getText()); sampler.setWsPath(wsPath.getText()); + sampler.setWsHeader(wsHeader.getText()); sampler.setDualSSLAuth(dualAuth.isSelected()); // sampler.setKeyStoreFilePath(tksFilePath.getText()); // sampler.setKeyStorePassword(tksPassword.getText()); @@ -327,6 +447,8 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { sampler.setUserNameAuth(userNameAuth.getText()); sampler.setPasswordAuth(passwordAuth.getText()); +// sampler.setAuthMethod(authMethod.getText()); +// sampler.setAuthData(authData.getText()); sampler.setConnPrefix(connNamePrefix.getText()); sampler.setClientIdSuffix(connNameSuffix.isSelected()); @@ -336,6 +458,14 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { sampler.setConnReconnAttemptMax(reconnAttmptMax.getText()); sampler.setConnCleanSession(connCleanSession.getText()); + + sampler.setConnUserProperty(connUserProperty.getText()); + sampler.setConnCleanStart(connCleanStart.getText()); + sampler.setConnSessionExpiryInterval(connSessionExpiryInterval.getText()); + + sampler.setCAFilePath(caCertFilePath.getText()); + sampler.setClientCert2FilePath(clientCertFilePath.getText()); + sampler.setClientPrivateKeyFilePath(clientKeyFilePath.getText()); } public static int parseInt(String value) { @@ -357,6 +487,7 @@ public void clearUI() { dualAuth.setSelected(false); wsPath.setText(""); + wsHeader.setText("{}"); // tksFilePath.setText(""); // tksPassword.setText(""); ccFilePath.setText(""); @@ -364,6 +495,8 @@ public void clearUI() { userNameAuth.setText(""); passwordAuth.setText(""); +// authMethod.setText(""); +// authData.setText(""); connNamePrefix.setText(DEFAULT_CONN_PREFIX_FOR_CONN); connNameSuffix.setSelected(true); @@ -372,5 +505,13 @@ public void clearUI() { connKeepAlive.setText(DEFAULT_CONN_KEEP_ALIVE); reconnAttmptMax.setText(DEFAULT_CONN_RECONN_ATTEMPT_MAX); connCleanSession.setText("true"); + + connUserProperty.setText("{}"); + connCleanStart.setText("true"); + connSessionExpiryInterval.setText("0"); + + caCertFilePath.setText(""); + clientCertFilePath.setText(""); + clientKeyFilePath.setText(""); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java index fa3e714..7721d0a 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/PubSamplerUI.java @@ -3,9 +3,7 @@ import java.awt.BorderLayout; import java.util.logging.Logger; -import javax.swing.BorderFactory; -import javax.swing.JCheckBox; -import javax.swing.JPanel; +import javax.swing.*; import javax.swing.event.ChangeEvent; import javax.swing.event.ChangeListener; @@ -30,6 +28,18 @@ public class PubSamplerUI extends AbstractSamplerGui implements Constants, Chang private final JLabeledTextField topicName = new JLabeledTextField("Topic name:"); private JCheckBox timestamp = new JCheckBox("Add timestamp in payload"); + private final JLabeledTextField messageExpiryInterval = new JLabeledTextField("Message Expiry Interval(s):"); + private final JLabeledTextField contentType = new JLabeledTextField("Content Type:"); + private final JLabeledTextField responseTopic = new JLabeledTextField("Response Topic:"); + private final JLabeledTextField correlationData = new JLabeledTextField("Correlation Data:"); + private final JLabeledTextField userProperties = new JLabeledTextField("User Properties:"); + + private final JLabel qosLabel = new JLabel("QoS Level:"); + private final JLabel payloadFormatLabel = new JLabel("Payload Format:"); + private final JLabeledChoice payloadFormat = new JLabeledChoice("Payload Format:", new String[] { "UNSPECIFIED", "UTF_8" }, false, false); + private final JLabeledTextField topicAlias = new JLabeledTextField("Topic Alias:"); + private final JLabeledTextField subscriptionIdentifier = new JLabeledTextField("Subscription Identifier:"); + private JLabeledChoice messageTypes; private final JSyntaxTextArea sendMessage = JSyntaxTextArea.getInstance(10, 50); private final JTextScrollPane messagePanel = JTextScrollPane.getInstance(sendMessage); @@ -59,6 +69,7 @@ private JPanel createPubOption() { qosChoice.addChangeListener(this); JPanel optsPanel = new HorizontalPanel(); + optsPanel.add(qosLabel); optsPanel.add(qosChoice); optsPanel.add(retainedMsg); optsPanel.add(topicName); @@ -66,6 +77,22 @@ private JPanel createPubOption() { optsPanel.add(timestamp); optsPanelCon.add(optsPanel); + JPanel optsPanel1 = new HorizontalPanel(); + optsPanel1.add(userProperties); + optsPanel1.add(messageExpiryInterval); + optsPanel1.add(topicAlias); + optsPanel1.add(payloadFormatLabel); + optsPanel1.add(payloadFormat); + payloadFormat.setToolTipText("Payload format indicator to the message"); + optsPanelCon.add(optsPanel1); + + JPanel optsPanel2 = new HorizontalPanel(); + optsPanel2.add(contentType); + optsPanel2.add(responseTopic); + optsPanel2.add(correlationData); + optsPanel2.add(subscriptionIdentifier); + optsPanelCon.add(optsPanel2); + return optsPanelCon; } @@ -129,7 +156,7 @@ public void configure(TestElement element) { super.configure(element); PubSampler sampler = (PubSampler) element; - if(sampler.getQOS().trim().indexOf(JMETER_VARIABLE_PREFIX) == -1){ + if(sampler.getQOS().trim().contains(JMETER_VARIABLE_PREFIX)){ this.qosChoice.setSelectedIndex(Integer.parseInt(sampler.getQOS())); } else { this.qosChoice.setText(sampler.getQOS()); @@ -149,6 +176,15 @@ public void configure(TestElement element) { stringLength.setText(String.valueOf(sampler.getMessageLength())); sendMessage.setText(sampler.getMessage()); + + contentType.setText(sampler.getContentType()); + messageExpiryInterval.setText(String.valueOf(sampler.getMessageExpiryInterval())); + userProperties.setText(sampler.getUserProperties()); + responseTopic.setText(sampler.getResponseTopic()); + correlationData.setText(sampler.getCorrelationData()); + payloadFormat.setText(sampler.getPayloadFormat()); + topicAlias.setText(sampler.getTopicAlias()); + subscriptionIdentifier.setText(sampler.getSubscriptionIdentifier()); } @Override @@ -161,7 +197,7 @@ private void setupSamplerProperties(PubSampler sampler) { this.configureTestElement(sampler); sampler.setTopic(this.topicName.getText()); - if(this.qosChoice.getText().indexOf(JMETER_VARIABLE_PREFIX) == -1) { + if(this.qosChoice.getText().contains(JMETER_VARIABLE_PREFIX)) { int qos = QOS_0; try { qos = Integer.parseInt(this.qosChoice.getText()); @@ -183,6 +219,15 @@ private void setupSamplerProperties(PubSampler sampler) { sampler.setMessageLength(this.stringLength.getText()); sampler.setMessage(this.sendMessage.getText()); sampler.setRetainedMessage(Boolean.parseBoolean(this.retainedMsg.getText())); + + sampler.setContentType(this.contentType.getText()); + sampler.setCorrelationData(this.correlationData.getText()); + sampler.setMessageExpiryInterval(Long.parseLong(this.messageExpiryInterval.getText())); + sampler.setUserProperties(this.userProperties.getText()); + sampler.setResponseTopic(this.responseTopic.getText()); + sampler.setPayloadFormat(this.payloadFormat.getText()); + sampler.setTopicAlias(this.topicAlias.getText()); + sampler.setSubscriptionIdentifier(this.subscriptionIdentifier.getText()); } @Override @@ -195,5 +240,14 @@ public void clearGui() { this.messageTypes.setSelectedIndex(0); this.stringLength.setText(String.valueOf(DEFAULT_MESSAGE_FIX_LENGTH)); this.sendMessage.setText(""); + + this.messageExpiryInterval.setText("0"); + this.responseTopic.setText(""); + this.contentType.setText(""); + this.userProperties.setText("{}"); + this.correlationData.setText(""); + this.payloadFormat.setSelectedIndex(0); + this.topicAlias.setText(""); + this.subscriptionIdentifier.setText(""); } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java index a79d765..4584a4c 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java @@ -72,6 +72,14 @@ public void setWsPath(String wsPath) { setProperty(WS_PATH, wsPath); } + public String getWsHeader() { + return getPropertyAsString(WS_HEADER, "{}"); + } + + public void setWsHeader(String wsHeader) { + setProperty(WS_HEADER, wsHeader); + } + public boolean isDualSSLAuth() { return getPropertyAsBoolean(DUAL_AUTH, false); } @@ -201,6 +209,70 @@ public void setMqttClientName(String mqttClientName) { setProperty(MQTT_CLIENT_NAME, mqttClientName); } + public void setConnCleanStart(String cleanStart) { + setProperty(CONN_CLEAN_START, cleanStart); + } + + public String getConnCleanStart() { + return getPropertyAsString(CONN_CLEAN_START, "true"); + } + + public String getConnSessionExpiryInterval() { + return getPropertyAsString(CONN_SESSION_EXPIRY_INTERVAL, "0"); + } + + public void setConnSessionExpiryInterval(String sessionExpiryInterval) { + setProperty(CONN_SESSION_EXPIRY_INTERVAL, sessionExpiryInterval); + } + + public String getConnUserProperty() { + return getPropertyAsString(CONN_USER_PROPERTY, "{}"); + } + + public void setConnUserProperty(String connUserProperty) { + setProperty(CONN_USER_PROPERTY, connUserProperty); + } + + public String getCAFilePath() { + return getPropertyAsString("mqtt.ca_file_path", ""); + } + + public void setCAFilePath(String ca) { + setProperty("mqtt.ca_file_path", ca); + } + + public String getClientCert2FilePath() { + return getPropertyAsString("mqtt.client_cert_file_path", ""); + } + + public void setClientCert2FilePath(String cert) { + setProperty("mqtt.client_cert_file_path", cert); + } + + public String getClientPrivateKeyFilePath() { + return getPropertyAsString("mqtt.client_key_file_path", ""); + } + + public void setClientPrivateKeyFilePath(String key) { + setProperty("mqtt.client_key_file_path", key); + } + + public String getAuthMethod() { + return getPropertyAsString(AUTH_METHOD, ""); + } + + public void setAuthMethod(String authMethod) { + setProperty(AUTH_METHOD, authMethod); + } + + public String getAuthData() { + return getPropertyAsString(AUTH_DATA, ""); + } + + public void setAuthData(String authData) { + setProperty(AUTH_DATA, authData); + } + // public int getConCapacity() { // return conCapacity; // } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java index a3e21f9..71358ef 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java @@ -75,6 +75,13 @@ public SampleResult sample(Entry entry) { MQTTSsl ssl = MQTT.getInstance(getMqttClientName()).createSsl(this); parameters.setSsl(ssl); } + + parameters.setConnUserProperty(getConnUserProperty()); + parameters.setCleanStart(Boolean.parseBoolean(getConnCleanStart())); + parameters.setSessionExpiryInterval(Long.parseLong(getConnSessionExpiryInterval())); + parameters.setConnWsHeader(getWsHeader()); + parameters.setAuthMethod(getAuthMethod()); + parameters.setAuthData(getAuthData()); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to establish Connection " + connection , e); result.setSuccessful(false); @@ -83,7 +90,7 @@ public SampleResult sample(Entry entry) { result.setResponseCode("502"); return result; } - + try { client = MQTT.getInstance(getMqttClientName()).createClient(parameters); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java index fb86b04..a085b17 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java @@ -6,6 +6,7 @@ import javax.xml.bind.DatatypeConverter; +import net.xmeter.samplers.mqtt.MQTT5PublishReq; import org.apache.jmeter.samplers.Entry; import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContextService; @@ -82,6 +83,70 @@ public Boolean getRetainedMessage() { return getPropertyAsBoolean(RETAINED_MESSAGE, false); } + public void setMessageExpiryInterval(long messageExpiryInterval) { + setProperty(MESSAGE_EXPIRY_INTERVAL, messageExpiryInterval); + } + + public Long getMessageExpiryInterval() { + return getPropertyAsLong(MESSAGE_EXPIRY_INTERVAL); + } + + public void setContentType(String contentType) { + setProperty(CONTENT_TYPE, contentType); + } + + public String getContentType() { + return getPropertyAsString(CONTENT_TYPE); + } + + public void setResponseTopic(String responseTopic) { + setProperty(RESPONSE_TOPIC, responseTopic); + } + + public String getResponseTopic() { + return getPropertyAsString(RESPONSE_TOPIC); + } + + public void setUserProperties(String userProperties) { + setProperty(USER_PROPERTIES, userProperties); + } + + public String getUserProperties() { + return getPropertyAsString(USER_PROPERTIES); + } + + public void setCorrelationData(String correlationData) { + setProperty(CORRELATION_DATA, correlationData); + } + + public String getCorrelationData() { + return getPropertyAsString(CORRELATION_DATA); + } + + public void setPayloadFormat(String payloadFormat) { + setProperty(PAYLOAD_FORMAT, payloadFormat); + } + + public String getPayloadFormat() { + return getPropertyAsString(PAYLOAD_FORMAT); + } + + public void setTopicAlias(String topicAlias) { + setProperty(TOPIC_ALIAS, topicAlias); + } + + public String getTopicAlias() { + return getPropertyAsString(TOPIC_ALIAS); + } + + public void setSubscriptionIdentifier(String subscriptionIdentifier) { + setProperty(SUBSCRIPTION_IDENTIFIER, subscriptionIdentifier); + } + + public String getSubscriptionIdentifier() { + return getPropertyAsString(SUBSCRIPTION_IDENTIFIER); + } + public static byte[] hexToBinary(String hex) { return DatatypeConverter.parseHexBinary(hex); } @@ -119,7 +184,6 @@ public SampleResult sample(Entry arg0) { tmp = payload.getBytes(); } - int qos = Integer.parseInt(getQOS()); switch (qos) { case 0: @@ -146,13 +210,15 @@ public SampleResult sample(Entry arg0) { toSend = new byte[tmp.length]; System.arraycopy(tmp, 0, toSend, 0 , tmp.length); } - + + MQTT5PublishReq mqtt5PublishReq = getMqtt5PublishReq(); + result.sampleStart(); if (logger.isLoggable(Level.FINE)) { logger.fine("pub [topic]: " + topicName + ", [payload]: " + new String(toSend)); } - MQTTPubResult pubResult = connection.publish(topicName, toSend, qos_enum, retainedMsg); + MQTTPubResult pubResult = connection.publish(topicName, toSend, qos_enum, retainedMsg, mqtt5PublishReq); result.sampleEnd(); result.setSamplerData(new String(toSend)); @@ -161,7 +227,7 @@ public SampleResult sample(Entry arg0) { result.setSuccessful(pubResult.isSuccessful()); if(pubResult.isSuccessful()) { - result.setResponseData("Publish successfuly.".getBytes()); + result.setResponseData("Publish successfully.".getBytes()); result.setResponseMessage(MessageFormat.format("publish successfully for Connection {0}.", connection)); result.setResponseCodeOK(); } else { @@ -188,5 +254,19 @@ public SampleResult sample(Entry arg0) { } return result; } - + + private MQTT5PublishReq getMqtt5PublishReq() { + MQTT5PublishReq mqtt5PublishReq = new MQTT5PublishReq(); + mqtt5PublishReq.setContentType(getContentType()); + mqtt5PublishReq.setResponseTopic(getResponseTopic()); + mqtt5PublishReq.setCorrelationData(getCorrelationData()); + mqtt5PublishReq.setMessageExpiryInterval(getMessageExpiryInterval()); + mqtt5PublishReq.setUserProperties(getUserProperties()); + mqtt5PublishReq.setPayloadFormatIndicator(getPayloadFormat()); + mqtt5PublishReq.setTopicAlias(getTopicAlias()); + mqtt5PublishReq.setSubscriptionIdentifier(getSubscriptionIdentifier()); + + return mqtt5PublishReq; + } + } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java index e9e45bf..41c81d1 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java @@ -1,7 +1,12 @@ package net.xmeter.samplers.mqtt; +import com.fasterxml.jackson.databind.ObjectMapper; import net.xmeter.Util; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + public class ConnectionParameters { private MQTTSsl ssl; private String protocol; @@ -18,6 +23,13 @@ public class ConnectionParameters { private boolean cleanSession; private String path; + private boolean cleanStart; + private long sessionExpiryInterval; + private Map connUserProperty; + private Map connWsHeader; + private String authMethod; + private String authData; + public MQTTSsl getSsl() { return ssl; } @@ -138,4 +150,90 @@ public boolean isSecureProtocol() { public boolean isWebSocketProtocol() { return Util.isWebSocketProtocol(getProtocol()); } + + public boolean isCleanStart() { + return cleanStart; + } + + public void setCleanStart(boolean cleanStart) { + this.cleanStart = cleanStart; + } + + public long getSessionExpiryInterval() { + return sessionExpiryInterval; + } + + public void setSessionExpiryInterval(long sessionExpiryInterval) { + this.sessionExpiryInterval = sessionExpiryInterval; + } + + public Map getConnUserProperty() { + return connUserProperty; + } + + public void setConnUserProperty(String userPropertyJson) { + if (userPropertyJson == null || userPropertyJson.isEmpty()) { + this.connUserProperty = new HashMap<>(); + return; + } + + ObjectMapper mapper = new ObjectMapper(); + try{ + this.connUserProperty = mapper.readValue(userPropertyJson, Map.class); + }catch (IOException e){ + this.connUserProperty = new HashMap<>(); + e.printStackTrace(); + } + } + + public Map getConnWsHeader() { + return connWsHeader; + } + + public void setConnWsHeader(String wsHeaderJson) { + if (wsHeaderJson == null || wsHeaderJson.isEmpty()) { + this.connWsHeader = new HashMap<>(); + return; + } + + ObjectMapper mapper = new ObjectMapper(); + try{ + this.connWsHeader = mapper.readValue(wsHeaderJson, Map.class); + }catch (IOException e){ + this.connWsHeader = new HashMap<>(); + e.printStackTrace(); + } + } + + public boolean shouldAutomaticReconnectWithDefaultConfig(){ + return reconnectMaxAttempts == -1 || reconnectMaxAttempts > 0; + } + + @Override + public String toString() { + return "ConnectionParameters{" + + "host='" + host + '\'' + + "port='" + port + '\'' + + "version='" + version + '\'' + + "connUserProperty='" + connUserProperty + '\'' + + "cleanStart='" + cleanStart + '\'' + + "sessionExpiryInterval='" + sessionExpiryInterval + '\'' + + '}'; + } + + public String getAuthMethod() { + return authMethod; + } + + public void setAuthMethod(String authMethod) { + this.authMethod = authMethod; + } + + public String getAuthData() { + return authData; + } + + public void setAuthData(String authData) { + this.authData = authData; + } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java new file mode 100644 index 0000000..20d68dd --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTT5PublishReq.java @@ -0,0 +1,94 @@ +package net.xmeter.samplers.mqtt; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class MQTT5PublishReq { + private String correlationData; + private long messageExpiryInterval; + private Map userProperties; + private String contentType; + private String responseTopic; + + private String payloadFormatIndicator; + private String topicAlias; + private String subscriptionIdentifier; + + public Map getUserProperties() { + return userProperties; + } + + public void setUserProperties(String userPropertiesJson) { + if (userPropertiesJson == null || userPropertiesJson.isEmpty()) { + this.userProperties = new HashMap<>(); + return; + } + + ObjectMapper mapper = new ObjectMapper(); + try{ + this.userProperties = mapper.readValue(userPropertiesJson, Map.class); + }catch (IOException e){ + this.userProperties = new HashMap<>(); + e.printStackTrace(); + } + } + + public String getCorrelationData() { + return correlationData; + } + + public void setCorrelationData(String correlationData) { + this.correlationData = correlationData; + } + + public long getMessageExpiryInterval() { + return messageExpiryInterval; + } + + public void setMessageExpiryInterval(long messageExpiryInterval) { + this.messageExpiryInterval = messageExpiryInterval; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getResponseTopic() { + return responseTopic; + } + + public void setResponseTopic(String responseTopic) { + this.responseTopic = responseTopic; + } + + public String getPayloadFormatIndicator() { + return payloadFormatIndicator; + } + + public void setPayloadFormatIndicator(String payloadFormatIndicator) { + this.payloadFormatIndicator = payloadFormatIndicator; + } + + public String getTopicAlias() { + return topicAlias; + } + + public void setTopicAlias(String topicAlias) { + this.topicAlias = topicAlias; + } + + public String getSubscriptionIdentifier() { + return subscriptionIdentifier; + } + + public void setSubscriptionIdentifier(String subscriptionIdentifier) { + this.subscriptionIdentifier = subscriptionIdentifier; + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java index 3a10e34..66d5ac4 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/MQTTConnection.java @@ -7,9 +7,10 @@ public interface MQTTConnection { String getClientId(); void disconnect() throws Exception; - MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained); + MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req); void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure); void setSubListener(MQTTSubListener listener); } + diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java index 6b3ced2..35beba1 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/fuse/FuseMQTTConnection.java @@ -8,6 +8,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import net.xmeter.samplers.mqtt.*; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.Callback; @@ -16,10 +17,6 @@ import org.fusesource.mqtt.client.QoS; import net.xmeter.samplers.PubCallback; -import net.xmeter.samplers.mqtt.MQTTConnection; -import net.xmeter.samplers.mqtt.MQTTPubResult; -import net.xmeter.samplers.mqtt.MQTTQoS; -import net.xmeter.samplers.mqtt.MQTTSubListener; class FuseMQTTConnection implements MQTTConnection { private static final Logger logger = Logger.getLogger(FuseMQTTConnection.class.getCanonicalName()); @@ -63,7 +60,7 @@ public void onFailure(Throwable throwable) { } @Override - public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qos, boolean retained) { + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qos, boolean retained, MQTT5PublishReq req) { final Object pubLock = new Object(); QoS fuseQos = FuseUtil.map(qos); PubCallback pubCallback = new PubCallback(pubLock, fuseQos); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java index 187da95..e8376e1 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/hivemq/HiveMQTTConnection.java @@ -18,11 +18,7 @@ import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode; -import net.xmeter.samplers.mqtt.MQTTClientException; -import net.xmeter.samplers.mqtt.MQTTConnection; -import net.xmeter.samplers.mqtt.MQTTPubResult; -import net.xmeter.samplers.mqtt.MQTTQoS; -import net.xmeter.samplers.mqtt.MQTTSubListener; +import net.xmeter.samplers.mqtt.*; class HiveMQTTConnection implements MQTTConnection { private static final Logger logger = Logger.getLogger(HiveMQTTConnection.class.getCanonicalName()); @@ -56,7 +52,7 @@ public void disconnect() { } @Override - public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained) { + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) { try { client.publishWith() .topic(topicName) diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java new file mode 100644 index 0000000..3f95521 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Client.java @@ -0,0 +1,71 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.Constants; +import net.xmeter.samplers.mqtt.*; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + + +import java.util.function.Consumer; +import java.util.logging.Logger; + +public class PahoMQTT3Client implements MQTTClient { + + private static final Logger logger = Logger.getLogger(PahoMQTT3Client.class.getCanonicalName()); + private final ConnectionParameters parameters; + private final MqttAsyncClient client; + + PahoMQTT3Client(ConnectionParameters parameters) throws MqttException { + this.parameters = parameters; + MemoryPersistence persistence = new MemoryPersistence(); + this.client = new MqttAsyncClient(PahoUtil.createHostAddress(parameters), this.getClientId(), persistence); + } + + + @Override + public String getClientId() { + return parameters.getClientId(); + } + + @Override + public MQTTConnection connect() throws Exception { + + MqttConnectOptions connectOptions = new MqttConnectOptions(); + connectOptions.setCleanSession(parameters.isCleanSession()); + connectOptions.setKeepAliveInterval(parameters.getKeepAlive()); + connectOptions.setConnectionTimeout(parameters.getConnectTimeout()); + + if (parameters.getVersion().equals(Constants.MQTT_VERSION_3_1_1)) { + connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); + } else if (parameters.getVersion().equals(Constants.MQTT_VERSION_3_1)) { + connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1); + } + + if (parameters.shouldAutomaticReconnectWithDefaultConfig()) { + connectOptions.setAutomaticReconnect(true); + } + if (parameters.getUsername() != null) { + connectOptions.setUserName(parameters.getUsername()); + } + if (parameters.getPassword() != null) { + connectOptions.setPassword(parameters.getPassword().toCharArray()); + } + + if (parameters.isSecureProtocol()) { + PahoMQTTSsl ssl = (PahoMQTTSsl) parameters.getSsl(); + connectOptions.setSocketFactory(PahoUtil.getSocketFactory(ssl.getCaFilePath(), ssl.getClientCertFilePath(), ssl.getClientKeyFilePath(), "")); + } + + try { + client.connect(connectOptions) + .waitForCompletion(parameters.getConnectTimeout() * 1000L); + + logger.info(() -> "Connected client: " + parameters.getClientId()); + return new PahoMQTT3Connection(client); + } catch (MqttException e) { + throw new MQTTClientException("Connection error " + client, e); + } + } +} + + diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java new file mode 100644 index 0000000..5c0832e --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT3Connection.java @@ -0,0 +1,98 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.samplers.mqtt.*; +import org.eclipse.paho.client.mqttv3.*; + +import java.util.function.Consumer; +import java.util.logging.Logger; + +class PahoMQTT3Connection implements MQTTConnection { + + private static final Logger logger = Logger.getLogger(PahoMQTT3Connection.class.getCanonicalName()); + + private final MqttAsyncClient client; + + private MQTTSubListener listener; + + PahoMQTT3Connection(MqttAsyncClient client) { + this.client = client; + } + + @Override + public boolean isConnectionSucc() { + return client.isConnected(); + } + + @Override + public String getClientId() { + return client.getClientId(); + } + + @Override + public void disconnect() throws Exception { + client.disconnect(); + } + + @Override + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) { + try { + MqttMessage msg = new MqttMessage(); + msg.setPayload(message); + msg.setQos((PahoUtil.map(qoS))); + msg.setRetained(retained); + client.publish(topicName, msg); + return new MQTTPubResult(true); + } catch (Exception error) { + return new MQTTPubResult(false, error.getMessage()); + } + } + + @Override + public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) { + int pahoQos = PahoUtil.map(qos); + + for (String topicName : topicNames) { + try { + IMqttActionListener subListener = new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken iMqttToken) { + onSuccess.run(); + } + + @Override + public void onFailure(IMqttToken iMqttToken, Throwable throwable) { + onFailure.accept(throwable); + } + }; + + client.subscribe(topicName, pahoQos, "", subListener, (topic, message) -> { + try { + handleMessageArrived(topic, message); + } catch (Exception e) { + logger.severe(e.getMessage()); + } + }); + } catch (MqttException e) { + logger.warning("Failed to subscribe " + topicName + ", error: " + e.getMessage()); + } + } + } + + @Override + public void setSubListener(MQTTSubListener listener) { + this.listener = listener; + } + + private void handleMessageArrived(String topic, MqttMessage message) { + this.listener.accept(topic, new String(message.getPayload()), () -> { + }); + } + + @Override + public String toString() { + return "PahoMQTT3Connection{" + + "clientId='" + client.getClientId() + '\'' + + '}'; + } +} + diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java new file mode 100644 index 0000000..6124434 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java @@ -0,0 +1,94 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.samplers.mqtt.ConnectionParameters; +import net.xmeter.samplers.mqtt.MQTTClient; +import net.xmeter.samplers.mqtt.MQTTClientException; +import net.xmeter.samplers.mqtt.MQTTConnection; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +public class PahoMQTT5Client implements MQTTClient { + + private static final Logger logger = Logger.getLogger(PahoMQTT5Client.class.getCanonicalName()); + private final ConnectionParameters parameters; + private final MqttAsyncClient client; + + PahoMQTT5Client(ConnectionParameters parameters) throws MqttException { + this.parameters = parameters; + MemoryPersistence persistence = new MemoryPersistence(); + this.client = new MqttAsyncClient(PahoUtil.createHostAddress(parameters), this.getClientId(), persistence); + } + + + @Override + public String getClientId() { + return parameters.getClientId(); + } + + @Override + public MQTTConnection connect() throws Exception { + + MqttConnectionOptions connectOptions = new MqttConnectionOptions(); + connectOptions.setCleanStart(parameters.isCleanStart()); + connectOptions.setKeepAliveInterval(parameters.getKeepAlive()); + connectOptions.setConnectionTimeout(parameters.getConnectTimeout()); + connectOptions.setSessionExpiryInterval(parameters.getSessionExpiryInterval()); + + List userProperty = PahoUtil.ConvertUserProperty(parameters.getConnUserProperty()); + if (!userProperty.isEmpty()) { + connectOptions.setUserProperties(userProperty); + } + + if (parameters.shouldAutomaticReconnectWithDefaultConfig()) { + connectOptions.setAutomaticReconnect(true); + } + if (parameters.getUsername() != null) { + connectOptions.setUserName(parameters.getUsername()); + } + if (parameters.getPassword() != null) { + connectOptions.setPassword(parameters.getPassword().getBytes()); + } + + if (parameters.isSecureProtocol()) { + PahoMQTTSsl ssl = (PahoMQTTSsl) parameters.getSsl(); + connectOptions.setSocketFactory(PahoUtil.getSocketFactory(ssl.getCaFilePath(), ssl.getClientCertFilePath(), ssl.getClientKeyFilePath(), "")); + } + + if (parameters.getConnWsHeader() != null && !parameters.getConnWsHeader().isEmpty()) { + connectOptions.setCustomWebSocketHeaders(parameters.getConnWsHeader()); + } + +// if (parameters.getAuthMethod() != null && !parameters.getAuthMethod().isEmpty()) { +// connectOptions.setAuthMethod(parameters.getAuthMethod()); +// } + +// if (parameters.getAuthData() != null && !parameters.getAuthData().isEmpty()) { +// connectOptions.setAuthData(parameters.getAuthData().getBytes()); +// } + + try { + client.connect(connectOptions) + .waitForCompletion(parameters.getConnectTimeout() * 1000L); + + logger.info(() -> "Connected client: " + parameters.getClientId()); + return new PahoMQTT5Connection(client); + } catch (MqttException e) { + throw new MQTTClientException("Connection error " + client, e); + } + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java new file mode 100644 index 0000000..a8bc0f4 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Connection.java @@ -0,0 +1,185 @@ +package net.xmeter.samplers.mqtt.paho; + + +import net.xmeter.samplers.mqtt.*; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Logger; + +class PahoMQTT5Connection implements MQTTConnection { + + private static final Logger logger = Logger.getLogger(PahoMQTT5Connection.class.getCanonicalName()); + + private final MqttAsyncClient client; + + private MQTTSubListener listener; + + PahoMQTT5Connection(MqttAsyncClient client) { + this.client = client; + } + + @Override + public boolean isConnectionSucc() { + return client.isConnected(); + } + + @Override + public String getClientId() { + return client.getClientId(); + } + + @Override + public void disconnect() throws Exception { + client.disconnect(); + } + + @Override + public MQTTPubResult publish(String topicName, byte[] message, MQTTQoS qoS, boolean retained, MQTT5PublishReq req) { + try { + MqttMessage msg = new MqttMessage(); + msg.setPayload(message); + msg.setQos((PahoUtil.map(qoS))); + msg.setRetained(retained); + msg.setProperties(buildMqtt5MessageFeature(req)); + + client.publish(topicName, msg); + return new MQTTPubResult(true); + } catch (Exception error) { + return new MQTTPubResult(false, error.getMessage()); + } + } + + @Override + public void subscribe(String[] topicNames, MQTTQoS qos, Runnable onSuccess, Consumer onFailure) { + int pahoQos = PahoUtil.map(qos); + + for (String topicName : topicNames) { + try { + + MqttActionListener subListener = new MqttActionListener() { + @Override + public void onSuccess(IMqttToken iMqttToken) { + onSuccess.run(); + } + + @Override + public void onFailure(IMqttToken iMqttToken, Throwable throwable) { + onFailure.accept(throwable); + } + }; + + // the subscribe method has issue when pass MqttActionListener and IMqttMessageListener + // https://github.com/eclipse/paho.mqtt.java/issues/1019 + // https://github.com/eclipse/paho.mqtt.java/issues/826 + // for `messageArrived` use `client.setCallback` to replace at this time. + client.setCallback(new MqttCallback() { + @Override + public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) { + + } + + @Override + public void mqttErrorOccurred(MqttException e) { + + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + try { + handleMessageArrived(topic, message); + } catch (Exception e) { + logger.severe(e.getMessage()); + } + } + + @Override + public void deliveryComplete(IMqttToken iMqttToken) { + + } + + @Override + public void connectComplete(boolean b, String s) { + + } + + @Override + public void authPacketArrived(int i, MqttProperties mqttProperties) { + + } + }); + +// MqttSubscription mqttSubscription = new MqttSubscription(topicName); +// mqttSubscription.setQos(pahoQos); +// mqttSubscription.setNoLocal(false); +// mqttSubscription.setRetainHandling(0); +// mqttSubscription.setRetainAsPublished(false); + + client.subscribe(topicName, pahoQos, "", subListener); + } catch (MqttException e) { + logger.warning("Failed to subscribe " + topicName + ", error: " + e.getMessage()); + } + } + } + + @Override + public void setSubListener(MQTTSubListener listener) { + this.listener = listener; + } + + private void handleMessageArrived(String topic, MqttMessage message) { + this.listener.accept(topic, new String(message.getPayload()), () -> { + }); + } + + private MqttProperties buildMqtt5MessageFeature(MQTT5PublishReq req){ + MqttProperties properties = new MqttProperties(); + properties.setMessageExpiryInterval(req.getMessageExpiryInterval()); + + if (req.getCorrelationData() != null && !req.getCorrelationData().trim().isEmpty()) { + properties.setCorrelationData(req.getCorrelationData().getBytes()); + } + + List userProperty = PahoUtil.ConvertUserProperty(req.getUserProperties()); + if (!userProperty.isEmpty()) { + properties.setUserProperties(userProperty); + } + + if (req.getResponseTopic() != null && !req.getResponseTopic().isEmpty()) { + properties.setResponseTopic(req.getResponseTopic()); + } + + if (req.getContentType() != null && !req.getContentType().isEmpty()){ + properties.setContentType(req.getContentType()); + } + + if (!req.getPayloadFormatIndicator().isEmpty()) { + properties.setPayloadFormat(req.getPayloadFormatIndicator().equals("UTF_8")); + } + + if (req.getTopicAlias() != null && !req.getTopicAlias().isEmpty()) { + properties.setTopicAlias(Integer.parseInt(req.getTopicAlias())); + } + + if (req.getSubscriptionIdentifier() != null && !req.getSubscriptionIdentifier().isEmpty()) { + properties.setSubscriptionIdentifier(Integer.parseInt(req.getSubscriptionIdentifier())); + } + +// logger.severe(properties.toString()); + + return properties; + } + + @Override + public String toString() { + return "PahoMQTT5Connection{" + + "clientId='" + client.getClientId() + '\'' + + '}'; + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java new file mode 100644 index 0000000..e3743ab --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTFactory.java @@ -0,0 +1,46 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.Constants; +import net.xmeter.Util; +import net.xmeter.samplers.AbstractMQTTSampler; +import net.xmeter.samplers.mqtt.ConnectionParameters; +import net.xmeter.samplers.mqtt.MQTTClient; +import net.xmeter.samplers.mqtt.MQTTFactory; +import net.xmeter.samplers.mqtt.MQTTSsl; + +import java.util.List; + +class PahoMQTTFactory implements MQTTFactory { + @Override + public String getName() { + return Constants.PAHO_MQTT_CLIENT_NAME; + } + + @Override + public List getSupportedProtocols() { + return PahoUtil.ALLOWED_PROTOCOLS; + } + + @Override + public MQTTClient createClient(ConnectionParameters parameters) throws Exception { + String mqttVersion = parameters.getVersion(); + if (Constants.MQTT_VERSION_3_1.equals(mqttVersion) || Constants.MQTT_VERSION_3_1_1.equals(mqttVersion)){ + return new PahoMQTT3Client(parameters); + } + else { + return new PahoMQTT5Client(parameters); + } + } + + @Override + public MQTTSsl createSsl(AbstractMQTTSampler sampler) throws Exception { + if (!sampler.isDualSSLAuth()){ + return new PahoMQTTSsl("","",""); + } else { + return new PahoMQTTSsl( + sampler.getCAFilePath(), + sampler.getClientCert2FilePath(), + sampler.getClientPrivateKeyFilePath()); + } + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java new file mode 100644 index 0000000..35237c9 --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSpi.java @@ -0,0 +1,11 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.samplers.mqtt.*; + + +public class PahoMQTTSpi implements MQTTSpi { + @Override + public MQTTFactory factory() { + return new PahoMQTTFactory(); + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java new file mode 100644 index 0000000..5d9748f --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTTSsl.java @@ -0,0 +1,38 @@ +package net.xmeter.samplers.mqtt.paho; + +import net.xmeter.samplers.mqtt.MQTTSsl; + +import javax.net.ssl.SSLContext; + +class PahoMQTTSsl implements MQTTSsl { + private final String caFilePath; + private final String clientCertFilePath; + private final String clientKeyFilePath; + + PahoMQTTSsl(String ca, String cc, String ck) { + this.caFilePath = ca; + this.clientCertFilePath = cc; + this.clientKeyFilePath = ck; + } + + String getCaFilePath(){ + return caFilePath; + } + + String getClientCertFilePath(){ + return clientCertFilePath; + } + + String getClientKeyFilePath(){ + return clientKeyFilePath; + } + + @Override + public String toString() { + return "PahoMQTTSsl{" + + "ca='" + caFilePath + '\'' + + "cc='" + clientCertFilePath + '\'' + + "ck='" + clientKeyFilePath + '\'' + + '}'; + } +} diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java new file mode 100644 index 0000000..d87f46e --- /dev/null +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoUtil.java @@ -0,0 +1,146 @@ +package net.xmeter.samplers.mqtt.paho; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.FileReader; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.Security; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import net.xmeter.Constants; +import net.xmeter.samplers.mqtt.ConnectionParameters; +import net.xmeter.samplers.mqtt.MQTTQoS; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMDecryptorProvider; +import org.bouncycastle.openssl.PEMEncryptedKeyPair; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +class PahoUtil { + static final List ALLOWED_PROTOCOLS; + + static { + ALLOWED_PROTOCOLS = new ArrayList<>(); + ALLOWED_PROTOCOLS.add(Constants.TCP_PROTOCOL); + ALLOWED_PROTOCOLS.add(Constants.SSL_PROTOCOL); + ALLOWED_PROTOCOLS.add(Constants.WS_PROTOCOL); + ALLOWED_PROTOCOLS.add(Constants.WSS_PROTOCOL); + } + + static int map(MQTTQoS qos) { + switch (qos) { + case AT_MOST_ONCE: + return 0; + case AT_LEAST_ONCE: + return 1; + case EXACTLY_ONCE: + return 2; + default: + throw new IllegalArgumentException("Unknown QoS: " + qos); + } + } + + static String createHostAddress(ConnectionParameters parameters) { + return String.format("%s://%s:%d", + parameters.getProtocol().toLowerCase(), + parameters.getHost(), + parameters.getPort()); + } + + static List ConvertUserProperty(Map userProperty) { + if (!userProperty.isEmpty()) { + ArrayList userDefinedProperties = new ArrayList<>(); + for (Map.Entry entry : userProperty.entrySet()) { + userDefinedProperties.add(new UserProperty(entry.getKey(), entry.getValue())); + } + return userDefinedProperties; + } else { + return new ArrayList(); + } + } + + static SSLSocketFactory getSingleSocketFactory(final String caCrtFile) throws Exception { + Security.addProvider(new BouncyCastleProvider()); + + // Load CA certificates + KeyStore caKs = loadCAKeyStore(caCrtFile); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(caKs); + SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + sslContext.init(null, tmf.getTrustManagers(), null); + return sslContext.getSocketFactory(); + } + + static SSLSocketFactory getSocketFactory(final String caCrtFile, + final String crtFile, final String keyFile, final String password) + throws Exception { + Security.addProvider(new BouncyCastleProvider()); + + // Load CA certificates + KeyStore caKs = loadCAKeyStore(caCrtFile); + + // Load client certificate chain and key + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); + + // Load the entire client certificate chain + Certificate[] chain; + try (FileInputStream fis = new FileInputStream(crtFile)) { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Collection certs = cf.generateCertificates(fis); + chain = certs.toArray(new Certificate[0]); + } + + // Load client private key + try (PEMParser pemParser = new PEMParser(new FileReader(keyFile))) { + Object object = pemParser.readObject(); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); + KeyPair key = converter.getKeyPair((PEMKeyPair) object); + ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), chain); + } + + // Set up key managers and trust managers + TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); + tmf.init(caKs); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, password.toCharArray()); + + // finally, create SSL socket factory + SSLContext context = SSLContext.getInstance("TLSv1.2"); + context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + return context.getSocketFactory(); + } + + private static KeyStore loadCAKeyStore(String caCrtFile) throws Exception { + KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); + caKs.load(null, null); + try (FileInputStream fis = new FileInputStream(caCrtFile)) { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Collection caCerts = cf.generateCertificates(fis); + int certIndex = 1; + for (Certificate caCert : caCerts) { + String alias = "ca-certificate-" + certIndex++; + caKs.setCertificateEntry(alias, caCert); + } + } + return caKs; + } +} diff --git a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi index 4ccdcc5..b1500f3 100644 --- a/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi +++ b/mqtt_jmeter/src/main/resources/META-INF/services/net.xmeter.samplers.mqtt.MQTTSpi @@ -1,2 +1,3 @@ net.xmeter.samplers.mqtt.fuse.FuseMQTTSpi net.xmeter.samplers.mqtt.hivemq.HiveMQTTSpi +net.xmeter.samplers.mqtt.paho.PahoMQTTSpi From e1f1f94f2bb70001c1d81acbbfaeedc4f3e7b4e5 Mon Sep 17 00:00:00 2001 From: catcherwong Date: Sun, 24 Mar 2024 08:36:22 +0000 Subject: [PATCH 2/2] clean useless code Signed-off-by: catcherwong --- .../main/java/net/xmeter/gui/CommonConnUI.java | 17 ----------------- .../xmeter/samplers/AbstractMQTTSampler.java | 16 ---------------- .../net/xmeter/samplers/ConnectSampler.java | 2 -- .../samplers/mqtt/ConnectionParameters.java | 18 ------------------ .../samplers/mqtt/paho/PahoMQTT5Client.java | 8 -------- 5 files changed, 61 deletions(-) diff --git a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java index 3a9e029..1a89a45 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/gui/CommonConnUI.java @@ -35,8 +35,6 @@ public class CommonConnUI implements ChangeListener, ActionListener, Constants{ private final JLabeledTextField userNameAuth = new JLabeledTextField("User name:"); private final JLabeledTextField passwordAuth = new JLabeledTextField("Password:"); - private final JLabeledTextField authMethod = new JLabeledTextField("Auth Method:"); - private final JLabeledTextField authData = new JLabeledTextField("Auth Data:"); private JLabeledChoice protocols; // private JLabeledChoice clientNames; @@ -142,13 +140,6 @@ public JPanel createAuthentication() { optsPanel.add(userNameAuth); optsPanel.add(passwordAuth); optsPanelCon.add(optsPanel); - -// JPanel optsPanel1 = new HorizontalPanel(); -// optsPanel1.add(authMethod); -// optsPanel1.add(authData); -// authMethod.setVisible(false); -// authData.setVisible(false); -// optsPanelCon.add(optsPanel1); return optsPanelCon; } @@ -350,8 +341,6 @@ public void stateChanged(ChangeEvent e) { connCleanStart.setVisible(isMqtt5); connSessionExpiryInterval.setVisible(isMqtt5); connCleanSession.setVisible(!isMqtt5); -// authMethod.setVisible(isMqtt5); -// authData.setVisible(isMqtt5); } } @@ -403,8 +392,6 @@ public void configure(AbstractMQTTSampler sampler) { userNameAuth.setText(sampler.getUserNameAuth()); passwordAuth.setText(sampler.getPasswordAuth()); -// authMethod.setText(sampler.getAuthMethod()); -// authData.setText(sampler.getAuthData()); connNamePrefix.setText(sampler.getConnPrefix()); if (sampler.isClientIdSuffix()) { @@ -447,8 +434,6 @@ public void setupSamplerProperties(AbstractMQTTSampler sampler) { sampler.setUserNameAuth(userNameAuth.getText()); sampler.setPasswordAuth(passwordAuth.getText()); -// sampler.setAuthMethod(authMethod.getText()); -// sampler.setAuthData(authData.getText()); sampler.setConnPrefix(connNamePrefix.getText()); sampler.setClientIdSuffix(connNameSuffix.isSelected()); @@ -495,8 +480,6 @@ public void clearUI() { userNameAuth.setText(""); passwordAuth.setText(""); -// authMethod.setText(""); -// authData.setText(""); connNamePrefix.setText(DEFAULT_CONN_PREFIX_FOR_CONN); connNameSuffix.setSelected(true); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java index 4584a4c..137bcc7 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/AbstractMQTTSampler.java @@ -257,22 +257,6 @@ public void setClientPrivateKeyFilePath(String key) { setProperty("mqtt.client_key_file_path", key); } - public String getAuthMethod() { - return getPropertyAsString(AUTH_METHOD, ""); - } - - public void setAuthMethod(String authMethod) { - setProperty(AUTH_METHOD, authMethod); - } - - public String getAuthData() { - return getPropertyAsString(AUTH_DATA, ""); - } - - public void setAuthData(String authData) { - setProperty(AUTH_DATA, authData); - } - // public int getConCapacity() { // return conCapacity; // } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java index 71358ef..2d26463 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java @@ -80,8 +80,6 @@ public SampleResult sample(Entry entry) { parameters.setCleanStart(Boolean.parseBoolean(getConnCleanStart())); parameters.setSessionExpiryInterval(Long.parseLong(getConnSessionExpiryInterval())); parameters.setConnWsHeader(getWsHeader()); - parameters.setAuthMethod(getAuthMethod()); - parameters.setAuthData(getAuthData()); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to establish Connection " + connection , e); result.setSuccessful(false); diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java index 41c81d1..ac40a7b 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/ConnectionParameters.java @@ -27,8 +27,6 @@ public class ConnectionParameters { private long sessionExpiryInterval; private Map connUserProperty; private Map connWsHeader; - private String authMethod; - private String authData; public MQTTSsl getSsl() { return ssl; @@ -220,20 +218,4 @@ public String toString() { "sessionExpiryInterval='" + sessionExpiryInterval + '\'' + '}'; } - - public String getAuthMethod() { - return authMethod; - } - - public void setAuthMethod(String authMethod) { - this.authMethod = authMethod; - } - - public String getAuthData() { - return authData; - } - - public void setAuthData(String authData) { - this.authData = authData; - } } diff --git a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java index 6124434..18c7ff3 100644 --- a/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java +++ b/mqtt_jmeter/src/main/java/net/xmeter/samplers/mqtt/paho/PahoMQTT5Client.java @@ -73,14 +73,6 @@ public MQTTConnection connect() throws Exception { connectOptions.setCustomWebSocketHeaders(parameters.getConnWsHeader()); } -// if (parameters.getAuthMethod() != null && !parameters.getAuthMethod().isEmpty()) { -// connectOptions.setAuthMethod(parameters.getAuthMethod()); -// } - -// if (parameters.getAuthData() != null && !parameters.getAuthData().isEmpty()) { -// connectOptions.setAuthData(parameters.getAuthData().getBytes()); -// } - try { client.connect(connectOptions) .waitForCompletion(parameters.getConnectTimeout() * 1000L);