rocketmq-externals
rocketmq-externals copied to clipboard
[rocketmq-connect-runtime : removeConnectorConfig not triggering listeners]
I noticed in removeConnectorConfig(),
@Override
public void removeConnectorConfig(String connectorName) {
ConnectKeyValue config = new ConnectKeyValue();
config.put(RuntimeConfigDefine.UPDATE_TIMESATMP, System.currentTimeMillis());
config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
Map<String, ConnectKeyValue> connectorConfig = new HashMap<>();
connectorConfig.put(connectorName, config);
List<ConnectKeyValue> taskConfigList = new ArrayList<>();
taskConfigList.add(config);
connectorKeyValueStore.put(connectorName, config);
putTaskConfigs(connectorName, taskConfigList);
sendSynchronizeConfig();
// TODO why not trigger listener ?
}
However in the recomputeTaskConfigs() called by putConnectorConfig(), after writing to the KeyValueStore, it would trigger the listeners (which is the RebalanceService and a new doRebalance would be executed on config update).
public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp) {
// TODO taskConfigs could be incorrect
List<KeyValue> taskConfigs = connector.taskConfigs();
List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
for (KeyValue keyValue : taskConfigs) {
ConnectKeyValue newKeyValue = new ConnectKeyValue();
for (String key : keyValue.keySet()) {
newKeyValue.put(key, keyValue.getString(key));
}
newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESATMP, currentTimestamp);
converterdConfigs.add(newKeyValue);
}
putTaskConfigs(connectorName, converterdConfigs);
sendSynchronizeConfig();
triggerListener();
}
Wondering what is the design consideration not triggering the listener when removing the connectors.