refactor: do some internal refactor and add some logs.

This commit is contained in:
Haojun Liao 2023-02-18 17:32:43 +08:00
parent 50ae5e7427
commit 06a3c1c9c0
2 changed files with 15 additions and 13 deletions

View File

@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");

View File

@ -411,19 +411,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
return (void *)buf; return (void *)buf;
} }
SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { SMqSubscribeObj *tNewSubscribeObj(const char* key) {
SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL; if (pSubObj == NULL) {
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN); return NULL;
taosInitRWLatch(&pSubNew->lock); }
pSubNew->vgNum = 0;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubObj->lock);
pSubObj->vgNum = 0;
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set hash free fp // TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ /*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *)); return pSubObj;
return pSubNew;
} }
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {