Skip to content

feat: Expose one config adjust the blocking behavior [QP-466] #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 30, 2025

Conversation

Joel-hanson
Copy link
Contributor

Description

  • Enhancement to the IBM MQ Source Connector to expose two configurable values to adjust the blocking behavior of the JMS receive methods when retrieving messages from MQ
  • Added one new config and update one existing config:
    mq.message.receive.timeout -> mq.receive.timeout.ms: Timeout (ms) for the initial receive call.
    mq.receive.subsequent.timeout.ms: Timeout (ms) for subsequent receives.
  • If timeout > 0 → uses receive(timeout), else falls back to receiveNoWait().

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • Integration test to see if the config is taking effect
  • Manual testing

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules

@Joel-hanson Joel-hanson force-pushed the qp-466-subcall-config branch from 3225272 to 9dc0948 Compare April 29, 2025 08:56
@Joel-hanson Joel-hanson requested a review from dalelane April 29, 2025 08:56
- Enhancement to the IBM MQ Source Connector to
  expose two configurable values to adjust the
  blocking behavior of the JMS receive methods when
  retrieving messages from MQ

Signed-off-by: Joel Hanson <[email protected]>
Copy link
Contributor

@dalelane dalelane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is looking very good - the config option rename is a bit of a blocker IMO, but otherwise I had only minor/cosmetic comments

@@ -71,12 +71,17 @@ public class JMSWorker {
private boolean connected = false; // Whether connected to MQ
private AtomicBoolean closeNow; // Whether close has been requested
private AbstractConfig config;
private long receiveTimeout; // Receive timeout for the jms consumer
private long initialReceiveTimeoutMs; // Receive timeout for the jms consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private long initialReceiveTimeoutMs; // Receive timeout for the jms consumer
private long initialReceiveTimeoutMs; // Receive timeout for the jms consumer on the first call in each Connect poll

CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT,
CONFIG_GROUP_MQ, 26,
Width.MEDIUM,
"MQ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"MQ",
CONFIG_GROUP_MQ,

CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT,
ConfigDef.Importance.LOW,
CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT,
"MQ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"MQ",
CONFIG_GROUP_MQ,

@@ -163,12 +163,18 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";

public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.receive.timeout.ms";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.receive.timeout.ms";
public static final String CONFIG_MAX_RECEIVE_TIMEOUT = "mq.message.receive.timeout";

While I think your proposed name would be an improvement, I'm afraid it's too late to change this now - some existing deployments of the connector could potentially be using the existing mq.message.receive.timeout config to modify the timeout, which would stop having an impact if we made this change.

It's okay for us to add new config options (with defaults that maintain existing behaviour) but we can't remove or rename existing config options (without a major version bump at least)

public static final String CONFIG_DOCUMENTATION_MAX_RECEIVE_TIMEOUT = "How long the connector should wait (in milliseconds) for a message to arrive if no message is available immediately";
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "message receive timeout";
public static final String CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT = "Initial receive timeout (ms)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating the description is fine, though

README.md Outdated
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| mq.receive.timeout.ms | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| mq.receive.timeout.ms | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| mq.receive.timeout.ms | The timeout (in milliseconds) for the first request to receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |

@Joel-hanson Joel-hanson force-pushed the qp-466-subcall-config branch 2 times, most recently from b662a8e to 5c4aab4 Compare April 29, 2025 11:25
@Joel-hanson Joel-hanson force-pushed the qp-466-subcall-config branch from 5c4aab4 to 886a53c Compare April 29, 2025 11:26
@Joel-hanson Joel-hanson requested a review from dalelane April 29, 2025 11:38
@Joel-hanson Joel-hanson merged commit e2b103d into main Apr 30, 2025
2 checks passed
@Joel-hanson Joel-hanson deleted the qp-466-subcall-config branch April 30, 2025 04:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants