diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79139a5fe5..77bbd0be1a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,7 +24,7 @@ #include "tref.h" #include "ttimer.h" -#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 +#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 struct SMqMgmt { @@ -148,7 +148,8 @@ typedef struct { typedef struct { int8_t tmqRspType; - int32_t epoch; + int32_t epoch; // epoch can be used to guard the vgHandle + int32_t vgId; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; @@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pRspWrapper->vgId = pVg->vgId; pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } } else { - SMqClientVg* pVg = pollRspWrapper->vgHandle; tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout); #if 0 tmqHandleAllDelayedTask(tmq); @@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } +static void displayConsumeStatistics(const tmq_t* pTmq) { + int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); + + tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); + int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); + for (int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + } + } + + tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); +} + int32_t tmq_consumer_close(tmq_t* tmq) { tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); + displayConsumeStatistics(tmq); if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; - } - - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", - tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); - - tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); - - tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); - int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); - for (int32_t j = 0; j < numOfVgs; ++j) { - SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); - tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + // if auto commit is set, commit before close consumer. Otherwise, do nothing. + if (tmq->autoCommit) { + int32_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp != 0) { + return rsp; } } - tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); while (1) { - rsp = tmq_subscribe(tmq, lst); + int32_t rsp = tmq_subscribe(tmq, lst); if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { break; } else { @@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } tmq_list_destroy(lst); + } else { + tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee2bd007ce..c2b38f5cd1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 327d233d30..c4800e5051 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { -//#if 0 -// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); -// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); -// -// if (pRsp->withSchema) { -// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); -// } else { -// A(taosArrayGetSize(pRsp->blockSchema) == 0); -// } -// -// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { -// if (pRsp->blockNum > 0) { -// A(pRsp->rspOffset.version > pRsp->reqOffset.version); -// } else { -// A(pRsp->rspOffset.version >= pRsp->reqOffset.version); -// } -// } -//#endif -// -// int32_t len = 0; -// int32_t code = 0; -// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// return -1; -// } -// -// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; -// ((SMqRspHead*)buf)->epoch = pReq->epoch; -// ((SMqRspHead*)buf)->consumerId = pReq->consumerId; -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// tEncodeSTaosxRsp(&encoder, pRsp); -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = pMsg->info, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// -// char buf1[80] = {0}; -// char buf2[80] = {0}; -// tFormatOffset(buf1, 80, &pRsp->reqOffset); -// tFormatOffset(buf2, 80, &pRsp->rspOffset); -// -// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", -// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); -// return 0; -//} - static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64, + ", ts:%" PRId64", reqId:0x%"PRIx64, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts); + dataRsp.rspOffset.ts, pRequest->reqId); tDeleteSMqDataRsp(&dataRsp); return code; @@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); -// if (terrno == 0) { // failed to seek to given ver, but no errors happen. -// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); -// return code; -// } else { // error happens, return to consumers - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; -// } + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; } SWalCont* pHead = &pCkHead->head; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 69fb9e4c3d..7f9563ae5f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -183,22 +183,24 @@ end: return tbSuid == realTbSuid; } -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", - pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); *fetchOffset = offset - 1; code = -1; goto END; } - tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, - TMSG_INFO((*ppCkHead)->head.msgType)); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index ad93cc567c..95981c2f08 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id // all queried tables have been dropped already, return immediately. if (p->pSchema == NULL) { - taosMemoryFree(p); tsdbWarn("all queried tables has been dropped, try next group, %s", idstr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e5f3c7406d..5ad9276c6c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN pBlockNum->numOfBlocks += 1; } - if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { + if (taosArrayGetSize(pScanInfo->pBlockList) > 0) { numOfQTable += 1; } } @@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { - pBlockScanInfo = - *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { - code = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, - taosHashGetSize(pReader->status.pTableMap), pReader->idStr); goto _err; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29a23cd90b..ee92805d27 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1194,6 +1194,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pKey); } + + taosMemoryFree(pCur); return code; } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index cb7b501298..69debe7ab5 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { - g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); + + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } - char tmpString[128]; - taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); + char tmpString[128]; + taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); } void build_consumer(SThreadInfo* pInfo) {