From 450c5b447c7a08e2326654a804b36710b33889f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 10:35:13 +0800 Subject: [PATCH 01/11] fix(tmq): quit the scan asap. --- source/libs/executor/src/executor.c | 4 ++-- source/libs/executor/src/scanoperator.c | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 656ffda0ca..b8b58cc9a0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -755,8 +755,8 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo, rspCode); + qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); while(qTaskIsExecuting(pTaskInfo)) { taosMsleep(10); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84317c825b..dfbd354bd3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -772,7 +772,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { while (1) { SSDataBlock* result = doGroupedTableScan(pOperator); - if (result || (pOperator->status == OP_EXEC_DONE)) { + if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) { return result; } @@ -1666,6 +1666,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; + terrno = 0; + if (tqNextBlock(pInfo->tqReader, &ret) < 0) { // if the end is reached, terrno is 0 if (terrno != 0) { From b0fb912da8e26e46d858e5923231be7ce1bb0086 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 11:48:52 +0800 Subject: [PATCH 02/11] fix(mnd): fix the invalid access of NULL ptr. --- source/dnode/mnode/impl/src/mndTrans.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 39b4252618..65ba76b21f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1086,15 +1086,18 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { goto _OVER; } + pTrans->lastErrorNo = pRsp->code; + STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; pAction->errCode = pRsp->code; - pTrans->lastErrorNo = pRsp->code; + mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, + mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode); + } else { + mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code); } - mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage), - action, pRsp->code, pAction->acceptableCode, pAction->retryCode); mndTransExecute(pMnode, pTrans, true); _OVER: From a88de7e18bddd517d04bf612fefaf9e8668157bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 15:55:16 +0800 Subject: [PATCH 03/11] fix(tmq): build sync api based on async api. --- source/client/src/clientTmq.c | 499 +++++++++++++++++----------------- 1 file changed, 243 insertions(+), 256 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a16ddce0aa..0a4e85068d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -27,6 +27,8 @@ #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 +typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam); + struct SMqMgmt { int8_t inited; tmr_h timer; @@ -109,6 +111,10 @@ struct tmq_t { tsem_t rspSem; }; +typedef struct SAskEpInfo { + int32_t code; +} SAskEpInfo; + enum { TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__WAIT, @@ -169,11 +175,10 @@ typedef struct { } SMqSubscribeCbParam; typedef struct { - int64_t refId; - int32_t epoch; - int32_t code; - int32_t async; - tsem_t rspSem; + int64_t refId; + int32_t epoch; + void* pParam; + __tmq_askep_fn_t pUserFn; } SMqAskEpCbParam; typedef struct { @@ -189,16 +194,13 @@ typedef struct { typedef struct { int64_t refId; int32_t epoch; - int8_t automatic; - int8_t async; int32_t waitingRspNum; int32_t totalRspNum; - int32_t rspErr; + int32_t code; tmq_commit_cb* userCb; /*SArray* successfulOffsets;*/ /*SArray* failedOffsets;*/ - void* userParam; - tsem_t rspSem; + void* userParam; } SMqCommitCbParamSet; typedef struct { @@ -209,12 +211,14 @@ typedef struct { tmq_t* pTmq; } SMqCommitCbParam; -static int32_t tmqAskEp(tmq_t* tmq, bool async); +static int32_t doAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, int32_t index, int32_t totalVgroups); -static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); +static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); +static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param); +static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -443,7 +447,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { // taosMemoryFree(pBuf->pData); // taosMemoryFree(pBuf->pEpSet); // -// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); +// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); // return 0; // } // @@ -453,7 +457,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); - tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); + commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); return 0; } @@ -461,8 +465,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN int32_t index, int32_t totalVgroups) { STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); if (pOffset == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } pOffset->val = pVg->currentOffset; @@ -476,13 +479,13 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN int32_t code = 0; tEncodeSize(tEncodeSTqOffset, pOffset, len, code); if (code < 0) { - return -1; + return TSDB_CODE_INVALID_PARA; } void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); if (buf == NULL) { taosMemoryFree(pOffset); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); @@ -499,7 +502,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN if (pParam == NULL) { taosMemoryFree(pOffset); taosMemoryFree(buf); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } pParam->params = pParamSet; @@ -515,7 +518,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN taosMemoryFree(pOffset); taosMemoryFree(buf); taosMemoryFree(pParam); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } pMsgSendInfo->msgInfo = (SDataBuf){ @@ -546,129 +549,117 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); - return 0; + + return TSDB_CODE_SUCCESS; } -static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) { - char* topic; - int32_t vgId; - if (TD_RES_TMQ(msg)) { - SMqRspObj* pRspObj = (SMqRspObj*)msg; - topic = pRspObj->topic; +static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) { + char* pTopicName = NULL; + int32_t vgId = 0; + int32_t code = 0; + + if (pRes == NULL || tmq == NULL) { + pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam); + return; + } + + if (TD_RES_TMQ(pRes)) { + SMqRspObj* pRspObj = (SMqRspObj*)pRes; + pTopicName = pRspObj->topic; vgId = pRspObj->vgId; - } else if (TD_RES_TMQ_META(msg)) { - SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; - topic = pMetaRspObj->topic; + } else if (TD_RES_TMQ_META(pRes)) { + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; + pTopicName = pMetaRspObj->topic; vgId = pMetaRspObj->vgId; - } else if (TD_RES_TMQ_METADATA(msg)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg; - topic = pRspObj->topic; + } else if (TD_RES_TMQ_METADATA(pRes)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; + pTopicName = pRspObj->topic; vgId = pRspObj->vgId; } else { - return TSDB_CODE_TMQ_INVALID_MSG; + pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam); + return; } SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); + return; } pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - pParamSet->automatic = 0; - pParamSet->async = async; - pParamSet->userCb = userCb; + pParamSet->userCb = pCommitFp; pParamSet->userParam = userParam; - tsem_init(&pParamSet->rspSem, 0, 0); - int32_t code = -1; - - taosThreadMutexLock(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics); - for (int32_t i = 0; i < numOfTopics; i++) { + tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + + int32_t i = 0; + for (; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - if (strcmp(pTopic->topicName, topic) != 0) { - continue; - } - - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (int32_t j = 0; j < numOfVgroups; j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->vgId != vgId) { - continue; - } - - if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) { - tsem_destroy(&pParamSet->rspSem); - taosMemoryFree(pParamSet); - goto FAIL; - } - goto HANDLE_RSP; - } + if (strcmp(pTopic->topicName, pTopicName) == 0) { + break; } } -HANDLE_RSP: - if (pParamSet->totalRspNum == 0) { - tsem_destroy(&pParamSet->rspSem); + if (i == numOfTopics) { + tscWarn("consumer:0x" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName, + numOfTopics); taosMemoryFree(pParamSet); - taosThreadMutexUnlock(&tmq->lock); - return 0; + pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + return; } - if (!async) { - taosThreadMutexUnlock(&tmq->lock); - tsem_wait(&pParamSet->rspSem); - code = pParamSet->rspErr; - tsem_destroy(&pParamSet->rspSem); + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + + int32_t j = 0; + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + for (j = 0; j < numOfVgroups; j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg->vgId == vgId) { + break; + } + } + + if (j == numOfVgroups) { + tscWarn("consumer:0x" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId, + numOfVgroups, pTopicName); taosMemoryFree(pParamSet); - return code; - } else { - code = 0; + pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + return; } -FAIL: - taosThreadMutexUnlock(&tmq->lock); - if (code != 0 && async) { - userCb(tmq, code, userParam); - } + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { + code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); - return 0; + // failed to commit, callback user function directly. + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pParamSet); + pCommitFp(tmq, code, userParam); + } + } else { // do not perform commit, callback user function directly. + taosMemoryFree(pParamSet); + pCommitFp(tmq, code, userParam); + } } -static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { - int32_t code = -1; - +static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - if (async) { - if (automatic) { - tmq->commitCb(tmq, code, tmq->commitCbUserParam); - } else { - userCb(tmq, code, userParam); - } - } - return -1; + pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam); + return; } pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - - pParamSet->automatic = automatic; - pParamSet->async = async; - pParamSet->userCb = userCb; + pParamSet->userCb = pCommitFp; pParamSet->userParam = userParam; - tsem_init(&pParamSet->rspSem, 0, 0); // init as 1 to prevent concurrency issue pParamSet->waitingRspNum = 1; - taosThreadMutexLock(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); @@ -682,7 +673,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); + int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno), @@ -693,7 +684,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm // update the offset value. pVg->committedOffset = pVg->currentOffset; } else { - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups); } } @@ -701,39 +692,16 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); - taosThreadMutexUnlock(&tmq->lock); // no request is sent if (pParamSet->totalRspNum == 0) { - tsem_destroy(&pParamSet->rspSem); taosMemoryFree(pParamSet); - return 0; + pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + return; } // count down since waiting rsp num init as 1 - tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0); - - if (!async) { - tsem_wait(&pParamSet->rspSem); - code = pParamSet->rspErr; - tsem_destroy(&pParamSet->rspSem); - taosMemoryFree(pParamSet); -#if 0 - taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); - taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); -#endif - } - - return code; -} - -static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, - void* userParam) { - if (msg) { // user invoked commit - return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); - } else { // this for auto commit - return doAutoCommit(tmq, automatic, async, userCb, userParam); - } + commitRspCountDown(pParamSet, tmq->consumerId, "", 0); } static void generateTimedTask(int64_t refId, int32_t type) { @@ -855,7 +823,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - tmqAskEp(pTmq, true); + asyncAskEp(pTmq, addToQueueCallbackFn, NULL); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; @@ -863,12 +831,11 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); - + asyncCommitAllOffsets(pTmq, pTmq->commitCb, pTmq->commitCbUserParam); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId, + tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { @@ -1213,7 +1180,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { + while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { goto FAIL; } @@ -1513,28 +1480,30 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { return set; } -static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { +int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; - int8_t async = pParam->async; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { - if (!async) { - tsem_destroy(&pParam->rspSem); - } else { - taosMemoryFree(pParam); - } + terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; + pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); + taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; + taosMemoryFree(pParam); + return terrno; } - pParam->code = code; if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async, - tstrerror(code)); - goto END; + tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + pParam->pUserFn(tmq, code, NULL, pParam->pParam); + + taosReleaseRef(tmqMgmt.rsetId, pParam->refId); + + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pParam); + return code; } // tmq's epoch is monotonically increase, @@ -1545,6 +1514,7 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (head->epoch <= epoch) { tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", tmq->consumerId, head->epoch, epoch); + if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { SMqAskEpRsp rsp; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); @@ -1553,45 +1523,17 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { tDeleteSMqAskEpRsp(&rsp); } - goto END; - } - - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, - head->epoch, epoch); - - if (!async) { - SMqAskEpRsp rsp; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); - tmqUpdateEp(tmq, head->epoch, &rsp); - tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); - if (pWrapper == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = -1; - goto END; - } - - pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; - pWrapper->epoch = head->epoch; - memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); - - taosWriteQitem(tmq->mqueue, pWrapper); - tsem_post(&tmq->rspSem); + tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, + head->epoch, epoch); + pParam->pUserFn(tmq, code, pMsg, pParam->pParam); } -END: taosReleaseRef(tmqMgmt.rsetId, pParam->refId); - if (!async) { - tsem_post(&pParam->rspSem); - } else { - taosMemoryFree(pParam); - } - taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); + taosMemoryFree(pParam); return code; } @@ -1980,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { + while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > 40) { return NULL; } @@ -2162,96 +2104,159 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { - tmqCommitInner(tmq, msg, 0, 1, cb, param); -} - -int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { - return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL); -} - -int32_t tmqAskEp(tmq_t* tmq, bool async) { - int32_t code = TSDB_CODE_SUCCESS; -#if 0 - int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); - if (epStatus == 1) { - int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); - if (epSkipCnt < 5000) return 0; +void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { + if (pRes == NULL) { // here needs to commit all offsets. + asyncCommitAllOffsets(tmq, cb, param); + } else { // only commit one offset + asyncCommitOffset(tmq, pRes, cb, param); } - atomic_store_32(&tmq->epSkipCnt, 0); -#endif +} +typedef struct SSyncCommitInfo { + tsem_t sem; + int32_t code; +} SSyncCommitInfo; + +static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) { + SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param; + pInfo->code = code; + tsem_post(&pInfo->sem); +} + +int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { + int32_t code = 0; + + SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); + tsem_init(&pInfo->sem, 0, 0); + pInfo->code = 0; + + if (pRes == NULL) { + asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo); + } else { + asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); + } + + tsem_wait(&pInfo->sem); + + code = pInfo->code; + taosMemoryFree(pInfo); + + tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code)); + return code; +} + +void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { + SAskEpInfo* pInfo = param; + pInfo->code = code; + + if (code == TSDB_CODE_SUCCESS) { + SMqRspHead* head = pDataBuf->pData; + + SMqAskEpRsp rsp; + tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); + tmqUpdateEp(pTmq, head->epoch, &rsp); + tDeleteSMqAskEpRsp(&rsp); + } + + tsem_post(&pTmq->rspSem); +} + +void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return; + } + + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); + if (pWrapper == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; + } + + SMqRspHead* head = pDataBuf->pData; + + pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; + pWrapper->epoch = head->epoch; + memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead)); + tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg); + + taosWriteQitem(pTmq->mqueue, pWrapper); +} + +int32_t doAskEp(tmq_t* pTmq) { + SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); + + asyncAskEp(pTmq, updateEpCallbackFn, pInfo); + tsem_wait(&pTmq->rspSem); + + int32_t code = pInfo->code; + taosMemoryFree(pInfo); + return code; +} + +void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { SMqAskEpReq req = {0}; - req.consumerId = tmq->consumerId; - req.epoch = tmq->epoch; - strcpy(req.cgroup, tmq->groupId); + req.consumerId = pTmq->consumerId; + req.epoch = pTmq->epoch; + strcpy(req.cgroup, pTmq->groupId); int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId); - return -1; + tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); + askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); + return; } void* pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); + askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); + return; } if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen); + tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); taosMemoryFree(pReq); - return -1; + + askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); + return; } SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId); + tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); taosMemoryFree(pReq); - return -1; + + askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); + return; } - pParam->refId = tmq->refId; - pParam->epoch = tmq->epoch; - pParam->async = async; - tsem_init(&pParam->rspSem, 0, 0); + pParam->refId = pTmq->refId; + pParam->epoch = pTmq->epoch; + pParam->pUserFn = askEpFn; + pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(pReq); - return -1; + askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); + return; } - sendInfo->msgInfo = (SDataBuf){ - .pData = pReq, - .len = tlen, - .handle = NULL, - }; + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL }; sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; - sendInfo->fp = tmqAskEpCb; + sendInfo->fp = askEpCallbackFn; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async, - sendInfo->requestId); + SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); + tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - if (!async) { - tsem_wait(&pParam->rspSem); - code = pParam->code; - taosMemoryFree(pParam); - } - - return code; + asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); } int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { @@ -2263,38 +2268,20 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - if (!pParamSet->async) { - tsem_destroy(&pParamSet->rspSem); - } taosMemoryFree(pParamSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; return -1; } // if no more waiting rsp - if (pParamSet->async) { - // call async cb func - if (pParamSet->automatic && tmq->commitCb) { - tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam); - } else if (!pParamSet->automatic && pParamSet->userCb) { // sem post - pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam); - } - - taosMemoryFree(pParamSet); - } else { - tsem_post(&pParamSet->rspSem); - } - -#if 0 - taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); - taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); -#endif + pParamSet->userCb(tmq, pParamSet->code, pParamSet->userParam); + taosMemoryFree(pParamSet); taosReleaseRef(tmqMgmt.rsetId, refId); return 0; } -void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { +void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, From f57899e0701a1ca7d6fab203a897d6b763497866 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 16:06:38 +0800 Subject: [PATCH 04/11] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dfbd354bd3..468b535dc3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1639,7 +1639,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, + qDebug("queue scan tsdb return %d rows brange:%" PRId64 " - %" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id); pTaskInfo->streamInfo.returned = 1; return pResult; From 05b97544b072ed459d515c5a0f8b790cdb3f0bfb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 16:12:30 +0800 Subject: [PATCH 05/11] fix(tmq): fix the syntax error. --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0a4e85068d..4558a57609 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -623,7 +623,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p } if (j == numOfVgroups) { - tscWarn("consumer:0x" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId, + tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId, numOfVgroups, pTopicName); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); From 32109e7322242ab1e0432dc53e8a903762f317f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 16:21:46 +0800 Subject: [PATCH 06/11] fix(tmq): fix the syntax error. --- source/client/src/clientTmq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c521501ec6..86d4aee8ed 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -605,7 +605,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p } if (i == numOfTopics) { - tscWarn("consumer:0x" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName, + tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName, numOfTopics); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); From ed21ef04201eebeda89a283d8bbdb00c352237cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 17:06:33 +0800 Subject: [PATCH 07/11] fix(tmq): fix sync commit error. --- source/client/src/clientTmq.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 86d4aee8ed..a627d5f190 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1315,7 +1315,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { char buf[80]; tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset); - tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, + tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; @@ -2132,9 +2132,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { pInfo->code = 0; if (pRes == NULL) { - asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo); - } else { asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); + } else { + asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo); } tsem_wait(&pInfo->sem); From 066a173d28e01ad8a10db36b8bf2da673c1c3440 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 18:39:45 +0800 Subject: [PATCH 08/11] fix(tmq): fix the invalid write. --- source/client/src/clientTmq.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a627d5f190..ade16b7f1a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -113,6 +113,7 @@ struct tmq_t { typedef struct SAskEpInfo { int32_t code; + tsem_t sem; } SAskEpInfo; enum { @@ -2138,8 +2139,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { } tsem_wait(&pInfo->sem); - code = pInfo->code; + + tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code)); @@ -2159,7 +2161,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par tDeleteSMqAskEpRsp(&rsp); } - tsem_post(&pTmq->rspSem); + tsem_post(&pInfo->sem); } void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { @@ -2186,11 +2188,13 @@ void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* p int32_t doAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); + tsem_init(&pInfo->sem, 0, 0); asyncAskEp(pTmq, updateEpCallbackFn, pInfo); - tsem_wait(&pTmq->rspSem); + tsem_wait(&pInfo->sem); int32_t code = pInfo->code; + tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); return code; } From ae4234cce07ab985ad5a79d57b1610fc95e7ab66 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 19:36:27 +0800 Subject: [PATCH 09/11] fix(tmq): add lock when check table list. --- source/libs/executor/src/scanoperator.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 667a6d6896..2907c8d056 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -771,7 +771,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList); + STableKeyInfo tInfo = {0}; while (1) { SSDataBlock* result = doGroupedTableScan(pOperator); @@ -781,14 +782,21 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // if no data, switch to next table and continue scan pInfo->currentTable++; + + taosRLockLatch(&pTaskInfo->lock); + numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); + taosRUnLockLatch(&pTaskInfo->lock); return NULL; } - STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); - tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); - qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, + tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); + taosRUnLockLatch(&pTaskInfo->lock); + + tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1); + qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); From f1384206f387192c6ca9c9f6415414abcac9c7b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Mar 2023 19:46:10 +0800 Subject: [PATCH 10/11] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndTrans.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 65ba76b21f..9d8a662b6b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1086,12 +1086,12 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { goto _OVER; } - pTrans->lastErrorNo = pRsp->code; - STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; pAction->errCode = pRsp->code; + pTrans->lastErrorNo = pRsp->code; + mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode); } else { From 23e8edd1481ba548ff61a860c47dc34ee42d3687 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 1 Apr 2023 00:14:01 +0800 Subject: [PATCH 11/11] fix(tmq): set the default commit callback fn. --- source/client/src/clientTmq.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ade16b7f1a..59a407656d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -198,7 +198,7 @@ typedef struct { int32_t waitingRspNum; int32_t totalRspNum; int32_t code; - tmq_commit_cb* userCb; + tmq_commit_cb* callbackFn; /*SArray* successfulOffsets;*/ /*SArray* failedOffsets;*/ void* userParam; @@ -590,7 +590,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - pParamSet->userCb = pCommitFp; + pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); @@ -656,7 +656,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - pParamSet->userCb = pCommitFp; + pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; // init as 1 to prevent concurrency issue @@ -810,6 +810,12 @@ OVER: taosReleaseRef(tmqMgmt.rsetId, refId); } +static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { + if (code != 0) { + tscDebug("consumer:0x%"PRIx64", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); + } +} + int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); taosReadAllQitems(pTmq->delayedTask, qall); @@ -833,7 +839,9 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - asyncCommitAllOffsets(pTmq, pTmq->commitCb, pTmq->commitCbUserParam); + tmq_commit_cb* pCallbackFn = pTmq->commitCb? pTmq->commitCb:defaultCommitCbFn; + + asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; @@ -1029,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - /*pTmq->epStatus = 0;*/ - /*pTmq->epSkipCnt = 0;*/ // set conf strcpy(pTmq->clientId, conf->clientId); @@ -2279,7 +2285,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } // if no more waiting rsp - pParamSet->userCb(tmq, pParamSet->code, pParamSet->userParam); + pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); taosMemoryFree(pParamSet); taosReleaseRef(tmqMgmt.rsetId, refId);