diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8b9fc84334..a1ae1e429d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -33,16 +33,16 @@ extern "C" { #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} // clang-format on -#define WAL_PROTO_VER 0 -#define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) -#define WAL_LOG_SUFFIX "log" -#define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL -#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) +#define WAL_PROTO_VER 0 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) +#define WAL_LOG_SUFFIX "log" +#define WAL_INDEX_SUFFIX "idx" +#define WAL_REFRESH_MS 1000 +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL +#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) typedef enum { TAOS_WAL_WRITE = 1, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 581cd636e8..87f5e5fa40 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) { destroyRequest(pRequest); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; - if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->rsp.blockDataLen); + taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); // taosx taosArrayDestroy(pRsp->rsp.createTableLen); taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree); @@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) { taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; - if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); - if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); - if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); - if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->rsp.blockDataLen); + taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); pRsp->resInfo.pRspMsg = NULL; doFreeReqResultInfo(&pRsp->resInfo); taosMemoryFree(pRsp); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index db717a4e4e..4352ec69d3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { return 0; } +static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + // do nothing + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { + SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; + tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->dataRsp.blockDataLen); + taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFree(pRsp->metaRsp.metaRsp); + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { + SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); + taosArrayDestroy(pRsp->taosxRsp.blockDataLen); + taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); + taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + // taosx + taosArrayDestroy(pRsp->taosxRsp.createTableLen); + taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); + } +} + void tmqClearUnhandleMsg(tmq_t* tmq) { - SMqRspWrapper* msg = NULL; + SMqRspWrapper* rspWrapper = NULL; while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper) { + tmqFreeRspWrapper(rspWrapper); + taosFreeQitem(rspWrapper); + } else { break; + } } - msg = NULL; + rspWrapper = NULL; taosReadAllQitems(tmq->mqueue, tmq->qall); while (1) { - taosGetQitem(tmq->qall, (void**)&msg); - if (msg) - taosFreeQitem(msg); - else + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper) { + tmqFreeRspWrapper(rspWrapper); + taosFreeQitem(rspWrapper); + } else { break; + } } } @@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) tDeleteSMqAskEpRsp(rspMsg); *pReset = true; } else { + tmqFreeRspWrapper(rspWrapper); *pReset = false; } } else { @@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { @@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { @@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } } else { @@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) { - tscDebug("return since poll err"); + tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId); /*return NULL;*/ } diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 0ed68dce95..37dcec4f85 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -32,9 +32,9 @@ typedef struct SMetaStbStatsEntry { } SMetaStbStatsEntry; typedef struct STagFilterResEntry { - uint64_t suid; // uid for super table - SList list; // the linked list of md5 digest, extracted from the serialized tag query condition - uint32_t qTimes;// queried times for current super table + uint64_t suid; // uid for super table + SList list; // the linked list of md5 digest, extracted from the serialized tag query condition + uint32_t qTimes; // queried times for current super table } STagFilterResEntry; struct SMetaCache { @@ -126,13 +126,14 @@ int32_t metaCacheOpen(SMeta* pMeta) { goto _err2; } - pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5*1024*1024, -1, 0.5); + pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5); if (pCache->sTagFilterResCache.pUidResCache == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err2; } - pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); + pCache->sTagFilterResCache.pTableEntry = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK); if (pCache->sTagFilterResCache.pTableEntry == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err2; @@ -419,7 +420,8 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) { return code; } -int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { +int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, + bool* acquireRes) { uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf; // generate the composed key for LRU cache @@ -428,8 +430,8 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK pBuf[0] = suid; memcpy(&pBuf[1], pKey, keyLen); - int32_t len = keyLen + sizeof(uint64_t); - LRUHandle *pHandle = taosLRUCacheLookup(pCache, pBuf, len); + int32_t len = keyLen + sizeof(uint64_t); + LRUHandle* pHandle = taosLRUCacheLookup(pCache, pBuf, len); if (pHandle == NULL) { *acquireRes = 0; return TSDB_CODE_SUCCESS; @@ -439,7 +441,7 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK *acquireRes = 1; const char* p = taosLRUCacheValue(pMeta->pCache->sTagFilterResCache.pUidResCache, pHandle); - int32_t size = *(int32_t*) p; + int32_t size = *(int32_t*)p; taosArrayAddBatch(pList1, p + sizeof(int32_t), size); (*pEntry)->qTimes += 1; @@ -467,12 +469,15 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK // remove the keys, of which query uid lists have been replaced already. size_t s = taosArrayGetSize(pList); - for(int32_t i = 0; i < s; ++i) { + for (int32_t i = 0; i < s; ++i) { SListNode** p1 = taosArrayGet(pList, i); tdListPopNode(&(*pEntry)->list, *p1); + taosMemoryFree(*p1); } - (*pEntry)->qTimes = 0; // reset the query times + (*pEntry)->qTimes = 0; // reset the query times + + taosArrayDestroy(pList); } } @@ -487,7 +492,8 @@ static void freePayload(const void* key, size_t keyLen, void* value) { } // check both the payload size and selectivity ratio -int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) { +int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, + int32_t payloadLen, double selectivityRatio) { if (selectivityRatio > tsSelectivityRatio) { metaDebug("vgId:%d, suid:%" PRIu64 " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f", @@ -525,9 +531,10 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int ASSERT(sizeof(uint64_t) + keyLen == 24); // add to cache. - taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW); - metaDebug("vgId:%d, suid:%"PRIu64" list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), - suid, (int32_t) taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); + taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, + TAOS_LRU_PRIORITY_LOW); + metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid, + (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); return TSDB_CODE_SUCCESS; } @@ -539,7 +546,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { return TSDB_CODE_SUCCESS; } - int32_t keyLen = sizeof(uint64_t) * 3; + int32_t keyLen = sizeof(uint64_t) * 3; uint64_t p[3] = {0}; p[0] = suid; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 191fc1b49c..389c8013f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } taosWUnLockLatch(&pTq->pushLock); - code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (pHandle) { + if (pHandle->pRef) { + walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); + } + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } } code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); @@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL } if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { - ASSERT(0); + tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey); } return 0; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2cd1bd7dec..1a43ddecdd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -314,7 +314,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, + pOperator->exprSupp.numOfExprs); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -331,7 +332,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, + pOperator->exprSupp.numOfExprs); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } @@ -469,8 +471,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* initResultRowInfo(&pInfo->binfo.resultRowInfo); setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, + optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -776,6 +778,12 @@ static void destroyPartitionOperatorInfo(void* param) { taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + + int32_t size = taosArrayGetSize(pInfo->sortedGroupArray); + for (int32_t i = 0; i < size; i++) { + SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i); + taosArrayDestroy(pGp->pPageList); + } taosArrayDestroy(pInfo->sortedGroupArray); void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL); @@ -850,7 +858,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1141,8 +1150,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, + destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index af361323a7..9d8fad6fea 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -31,7 +31,7 @@ extern "C" { #define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 -#define QW_DEFAULT_SCH_TASK_NUMBER 10000 +#define QW_DEFAULT_SCH_TASK_NUMBER 3000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 @@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = (c); \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index ce3b493638..f3d073634d 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; - + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { @@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) { int8_t nodeType = mgmt->nodeType; int32_t nodeId = mgmt->nodeId; + int32_t taskCount = 0; + int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); taosTmrStop(mgmt->hbTimer); @@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) { uint64_t qId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); @@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) { qwFreeTaskCtx(ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); + taskCount++; } taosHashCleanup(mgmt->ctxHash); @@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) { while (pIter) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); + pIter = taosHashIterate(mgmt->schHash, pIter); + schStatusCount++; } taosHashCleanup(mgmt->schHash); @@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) { qwCloseRef(); - qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); + qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt, + taskCount, schStatusCount); } int32_t qwOpenRef(void) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c5db4105d7..81f73b1226 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -8,9 +8,9 @@ #include "qwMsg.h" #include "tcommon.h" #include "tdatablock.h" +#include "tglobal.h" #include "tmsg.h" #include "tname.h" -#include "tglobal.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (queryStop) { *queryStop = true; } - + return TSDB_CODE_SUCCESS; } @@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_ERR_RET(code); } - QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); @@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } if (0 == ctx->level) { - QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows, + ctx->level); break; } if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { - QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, + pOutput->numOfRows); break; } } @@ -538,7 +540,7 @@ _return: SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { + if (!rsped) { qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); } } @@ -650,8 +652,8 @@ _return: code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - void *rsp = NULL; - int32_t dataLen = 0; + void *rsp = NULL; + int32_t dataLen = 0; SOutputData sOutput = {0}; if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { return TSDB_CODE_SUCCESS; @@ -671,8 +673,8 @@ _return: qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, - tstrerror(code), dataLen); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); } } @@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { do { ctx = NULL; - + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || + 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); uint64_t qId, tId; - int32_t eId; + int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; @@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_LOCK(QW_WRITE, &ctx->lock); QW_TASK_DLOG_E("start to force stop task"); - + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { pIter = taosHashIterate(mgmt->ctxHash, pIter); continue; } - + if (QW_QUERY_RUNNING(ctx)) { qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index f53088fac6..702d05f576 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -223,6 +223,7 @@ void walClose(SWal *pWal) { taosMemoryFree(pRef); } taosHashCleanup(pWal->pRefHash); + pWal->pRefHash = NULL; taosThreadMutexUnlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index eae5d9f1a7..e86111109c 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) { return pRef; } -#if 1 void walCloseRef(SWal *pWal, int64_t refId) { SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); if (ppRef == NULL) return; SWalRef *pRef = *ppRef; + if (pRef) { + wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); + } else { + wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId); + } taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); taosMemoryFree(pRef); } -#endif int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal;