diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2a40976a8b..9e928a79ac 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -25,13 +25,6 @@ extern "C" { #endif -// TODO remove it -enum { - TMQ_CONF__RESET_OFFSET__NONE = -3, - TMQ_CONF__RESET_OFFSET__EARLIEAST = -2, - TMQ_CONF__RESET_OFFSET__LATEST = -1, -}; - // clang-format off #define IS_META_MSG(x) ( \ x == TDMT_VND_CREATE_STB \ diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ba223984e3..0d3d37dc29 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -20,18 +20,10 @@ #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" -#include "tmsgtype.h" #include "tqueue.h" #include "tref.h" #include "ttimer.h" -#if 0 -#undef tsem_post -#define tsem_post(x) \ - tscInfo("call sem post at %s %d", __FUNCTION__, __LINE__); \ - sem_post(x) -#endif - struct SMqMgmt { int8_t inited; tmr_h timer; @@ -216,6 +208,7 @@ typedef struct { static int32_t tmqAskEp(tmq_t* tmq, bool async); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); +static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -227,7 +220,7 @@ tmq_conf_t* tmq_conf_new() { conf->withTbName = false; conf->autoCommit = true; conf->autoCommitInterval = 5000; - conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; + conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; conf->hbBgEnable = true; return conf; @@ -235,29 +228,35 @@ tmq_conf_t* tmq_conf_new() { void tmq_conf_destroy(tmq_conf_t* conf) { if (conf) { - if (conf->ip) taosMemoryFree(conf->ip); - if (conf->user) taosMemoryFree(conf->user); - if (conf->pass) taosMemoryFree(conf->pass); + if (conf->ip) { + taosMemoryFree(conf->ip); + } + if (conf->user) { + taosMemoryFree(conf->user); + } + if (conf->pass) { + taosMemoryFree(conf->pass); + } taosMemoryFree(conf); } } tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { - if (strcmp(key, "group.id") == 0) { + if (strcasecmp(key, "group.id") == 0) { tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN); return TMQ_CONF_OK; } - if (strcmp(key, "client.id") == 0) { + if (strcasecmp(key, "client.id") == 0) { tstrncpy(conf->clientId, value, 256); return TMQ_CONF_OK; } - if (strcmp(key, "enable.auto.commit") == 0) { - if (strcmp(value, "true") == 0) { + if (strcasecmp(key, "enable.auto.commit") == 0) { + if (strcasecmp(value, "true") == 0) { conf->autoCommit = true; return TMQ_CONF_OK; - } else if (strcmp(value, "false") == 0) { + } else if (strcasecmp(value, "false") == 0) { conf->autoCommit = false; return TMQ_CONF_OK; } else { @@ -265,31 +264,31 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcmp(key, "auto.commit.interval.ms") == 0) { + if (strcasecmp(key, "auto.commit.interval.ms") == 0) { conf->autoCommitInterval = atoi(value); return TMQ_CONF_OK; } - if (strcmp(key, "auto.offset.reset") == 0) { - if (strcmp(value, "none") == 0) { - conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE; + if (strcasecmp(key, "auto.offset.reset") == 0) { + if (strcasecmp(value, "none") == 0) { + conf->resetOffset = TMQ_OFFSET__RESET_NONE; return TMQ_CONF_OK; - } else if (strcmp(value, "earliest") == 0) { - conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; + } else if (strcasecmp(value, "earliest") == 0) { + conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; return TMQ_CONF_OK; - } else if (strcmp(value, "latest") == 0) { - conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST; + } else if (strcasecmp(value, "latest") == 0) { + conf->resetOffset = TMQ_OFFSET__RESET_LATEST; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } } - if (strcmp(key, "msg.with.table.name") == 0) { - if (strcmp(value, "true") == 0) { + if (strcasecmp(key, "msg.with.table.name") == 0) { + if (strcasecmp(value, "true") == 0) { conf->withTbName = true; return TMQ_CONF_OK; - } else if (strcmp(value, "false") == 0) { + } else if (strcasecmp(value, "false") == 0) { conf->withTbName = false; return TMQ_CONF_OK; } else { @@ -297,11 +296,11 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcmp(key, "experimental.snapshot.enable") == 0) { - if (strcmp(value, "true") == 0) { + if (strcasecmp(key, "experimental.snapshot.enable") == 0) { + if (strcasecmp(value, "true") == 0) { conf->snapEnable = true; return TMQ_CONF_OK; - } else if (strcmp(value, "false") == 0) { + } else if (strcasecmp(value, "false") == 0) { conf->snapEnable = false; return TMQ_CONF_OK; } else { @@ -309,42 +308,40 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcmp(key, "experimental.snapshot.batch.size") == 0) { + if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { conf->snapBatchSize = atoi(value); return TMQ_CONF_OK; } - if (strcmp(key, "enable.heartbeat.background") == 0) { - if (strcmp(value, "true") == 0) { + if (strcasecmp(key, "enable.heartbeat.background") == 0) { + if (strcasecmp(value, "true") == 0) { conf->hbBgEnable = true; return TMQ_CONF_OK; - } else if (strcmp(value, "false") == 0) { + } else if (strcasecmp(value, "false") == 0) { conf->hbBgEnable = false; return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } - return TMQ_CONF_OK; } - if (strcmp(key, "td.connect.ip") == 0) { + if (strcasecmp(key, "td.connect.ip") == 0) { conf->ip = taosStrdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "td.connect.user") == 0) { + if (strcasecmp(key, "td.connect.user") == 0) { conf->user = taosStrdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "td.connect.pass") == 0) { + if (strcasecmp(key, "td.connect.pass") == 0) { conf->pass = taosStrdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "td.connect.port") == 0) { + if (strcasecmp(key, "td.connect.port") == 0) { conf->port = atoi(value); return TMQ_CONF_OK; } - if (strcmp(key, "td.connect.db") == 0) { - /*conf->db = taosStrdup(value);*/ + if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; } @@ -352,7 +349,6 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } tmq_list_t* tmq_list_new() { - // return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); } @@ -382,13 +378,31 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { - int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); - if (waitingRspNum == 0) { - tmqCommitDone(pParamSet); +static void updateVgEpset(tmq_t* pTmq, SMqCommitCbParam* pParam, SEpSet* pEpSet) { + int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); + for(int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopic = taosArrayGet(pTmq->clientTopics, i); + if (strcmp(pTopic->topicName, pParam->topicName) != 0) { + continue; + } + + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for(int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if (pClientVg->vgId == pParam->vgId) { + SEp* pEp = GET_ACTIVE_EP(pEpSet); + SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet)); + uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId, + pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); + pClientVg->epSet = *pEpSet; + break; + } + } + + break; } } - +// todo retry to send the commit if failed static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; @@ -402,33 +416,10 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } #endif + // update the epset if needed if (pBuf->pEpSet != NULL) { - // todo extract method taosThreadMutexLock(&pParam->pTmq->lock); - - int32_t numOfTopics = taosArrayGetSize(pParam->pTmq->clientTopics); - for(int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopic = taosArrayGet(pParam->pTmq->clientTopics, i); - if (strcmp(pTopic->topicName, pParam->topicName) != 0) { - continue; - } - - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for(int32_t j = 0; j < numOfVgs; ++j) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->vgId == pParam->vgId) { - SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); - SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet)); - uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId, - pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); - pClientVg->epSet = *pBuf->pEpSet; - break; - } - } - - break; - } - + updateVgEpset(pParam->pTmq, pParam, pBuf->pEpSet); taosThreadMutexUnlock(&pParam->pTmq->lock); } @@ -436,10 +427,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, - * pOffset->version);*/ - - tmqCommitRspCountDown(pParamSet); + tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); return 0; } @@ -522,7 +510,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopic pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; - // send msg atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); @@ -652,9 +639,10 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - // todo race condition: fix it - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, + numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j); if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) { @@ -668,6 +656,8 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, } } + tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum, + numOfTopics); taosThreadMutexUnlock(&tmq->lock); // no request is sent @@ -678,7 +668,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, } // count down since waiting rsp num init as 1 - tmqCommitRspCountDown(pParamSet); + tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0); if (!async) { tsem_wait(&pParamSet->rspSem); @@ -696,9 +686,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { - if (msg) { + if (msg) { // user invoked commit? return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); - } else { + } else { // this for auto commit return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam); } } @@ -986,7 +976,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr()); + tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr()); return NULL; } @@ -1002,9 +992,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); - goto FAIL; + goto _failed; } // init status @@ -1034,22 +1024,20 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); - goto FAIL; + goto _failed; } // init connection pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { - tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), - pTmq->groupId); + tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); tsem_destroy(&pTmq->rspSem); - goto FAIL; + goto _failed; } pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq); if (pTmq->refId < 0) { - tmqFreeImpl(pTmq); - return NULL; + goto _failed; } if (pTmq->hbBgEnable) { @@ -1058,16 +1046,17 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer); } - tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s", pTmq->consumerId, pTmq->groupId); + char buf[80] = {0}; + STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; + tFormatOffset(buf, tListLen(buf), &offset); + tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d", + pTmq->consumerId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf, + pTmq->hbBgEnable); + return pTmq; -FAIL: - if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics); - if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue); - if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask); - if (pTmq->qall) taosFreeQall(pTmq->qall); - taosMemoryFree(pTmq); - +_failed: + tmqFreeImpl(pTmq); return NULL; } @@ -2123,10 +2112,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { // call async cb func if (pParamSet->automatic && tmq->commitCb) { tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam); - } else if (!pParamSet->automatic && pParamSet->userCb) { - // sem post + } else if (!pParamSet->automatic && pParamSet->userCb) { // sem post pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam); } + taosMemoryFree(pParamSet); } else { tsem_post(&pParamSet->rspSem); @@ -2137,4 +2126,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); #endif return 0; -} \ No newline at end of file +} + +void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { + int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, + waitingRspNum); + + if (waitingRspNum == 0) { + tmqCommitDone(pParamSet); + } +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 01a0ee7306..1f7e42e9d2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6586,8 +6586,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts); } else { - ASSERT(0); + return TSDB_CODE_INVALID_PARA; } + return 0; }