From 3772f25840e252047471c0838f5b9713c41cc220 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 18:02:32 +0800 Subject: [PATCH 1/5] fix(stream): check return value. --- source/common/src/tdatablock.c | 1 + source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 5 ++++- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/mergeoperator.c | 6 ++++++ source/libs/executor/src/querytask.c | 20 ++++++++++++++++++- source/libs/executor/src/tsort.c | 25 +++++++++++++++--------- source/util/src/tarray.c | 5 +++-- source/util/src/tlosertree.c | 2 +- 9 files changed, 52 insertions(+), 16 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b489314e21..f50a624ea7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1620,6 +1620,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { if (pBlock == NULL){ return; } + int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 2adc863baf..6c4fd43aa1 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != 1); \ + ASSERT((_c) != 1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0ca9908fb1..801bdcb75a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1692,8 +1692,11 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName); + pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); + QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; } else if (type == QUERY_NODE_VALUE) { pExp->pExpr->nodeType = QUERY_NODE_VALUE; @@ -1761,6 +1764,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType); + QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno); } else if (p1->type == QUERY_NODE_VALUE) { SValueNode* pvn = (SValueNode*)p1; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; @@ -2437,7 +2441,6 @@ void tableListDestroy(STableListInfo* pTableListInfo) { } taosArrayDestroy(pTableListInfo->pTableList); - pTableListInfo->pTableList = NULL; taosMemoryFreeClear(pTableListInfo->groupOffset); taosHashCleanup(pTableListInfo->map); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 502f9ff0f7..f48571c555 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1139,8 +1139,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 3b390c8719..8bc7c2db50 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); + TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno); initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -620,6 +622,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_NON_SORT: { SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + initResultSizeInfo(&pOperator->resultInfo, 1024); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); @@ -629,6 +633,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS case MERGE_TYPE_COLUMNS: { SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno); + initResultSizeInfo(&pOperator->resultInfo, 1); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); TSDB_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 9f4d9c4405..6f0bc40d29 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -64,6 +64,10 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC p->id.queryId = queryId; p->id.taskId = taskId; p->id.str = taosMemoryMalloc(64); + if (p->id.str == NULL) { + return terrno; + } + buildTaskId(taskId, queryId, p->id.str); p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); if (p->id.str == NULL || p->schemaInfos == NULL) { @@ -174,9 +178,16 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } + if (schemaInfo.sw == NULL) { + return terrno; + } + pAPI->metaReaderFn.clearReader(&mr); schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); - + if (schemaInfo.qsw == NULL) { + return terrno; + } + void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; } @@ -186,7 +197,14 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pqSw == NULL) { + return NULL; + } + pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema)); + if (pqSw->pSchema == NULL) { + return NULL; + } for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 896b4db7cd..8181ad3069 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -182,8 +182,8 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { } int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { + *pBlock = NULL; if (pSortHandle->pDataBlock == NULL) { - *pBlock = NULL; return TSDB_CODE_SUCCESS; } return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); @@ -2478,7 +2478,9 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { return code; } - return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + ASSERT(code != 1); + return code; } void tsortClose(SSortHandle* pHandle) { @@ -2808,19 +2810,24 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle } int32_t tsortOpen(SSortHandle* pHandle) { + int32_t code = 0; if (pHandle->opened) { - return 0; + return code; } - if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return TSDB_CODE_INVALID_PARA; + if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + code = TSDB_CODE_INVALID_PARA; + return code; } pHandle->opened = true; - if (tsortIsPQSortApplicable(pHandle)) - return tsortOpenForPQSort(pHandle); - else - return tsortOpenForBufMergeSort(pHandle); + if (tsortIsPQSortApplicable(pHandle)) { + code = tsortOpenForPQSort(pHandle); + } else { + code = tsortOpenForBufMergeSort(pHandle); + } + + return code; } int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index e0d38df5c8..37667e2975 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -95,11 +95,12 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) { tsize = (newSize == tsize) ? (tsize + 2) : newSize; } - pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); - if (pArray->pData == NULL) { + char* p = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); + if (p == NULL) { return terrno; } + pArray->pData = p; pArray->capacity = tsize; } return 0; diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 0e24d54565..7973a84593 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -39,7 +39,7 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, (SMultiwayMergeTreeInfo*)taosMemoryCalloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries); if (pTreeInfo == NULL) { uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + return terrno; } pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo)); From 9b02d103800de03c776703ca05a08c8d1aea5408 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Mon, 5 Aug 2024 18:13:43 +0800 Subject: [PATCH 2/5] fix: replace sleep with ClusterComCheck --- tests/army/cluster/incSnapshot.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/army/cluster/incSnapshot.py b/tests/army/cluster/incSnapshot.py index dfd8d95c9c..a4ddddcee5 100644 --- a/tests/army/cluster/incSnapshot.py +++ b/tests/army/cluster/incSnapshot.py @@ -15,6 +15,10 @@ from frame import * from frame.autogen import * # from frame.server.dnodes import * # from frame.server.cluster import * +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck class TDTestCase(TBase): @@ -61,7 +65,7 @@ class TDTestCase(TBase): # if bFinish: # break self.snapshotAgg() - time.sleep(10) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=3,db_name=f"{self.db}",count_number=60) sc.dnodeStopAll() for i in range(1, 4): path = clusterDnodes.getDnodeDir(i) @@ -75,7 +79,7 @@ class TDTestCase(TBase): sc.dnodeStart(2) sc.dnodeStart(3) sql = "show vnodes;" - time.sleep(10) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=3,db_name=f"{self.db}",count_number=60) while True: bFinish = True param_list = tdSql.query(sql, row_tag=True) From 6297d238e0abfc3fb84b83c8865290593b22ea7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 18:34:56 +0800 Subject: [PATCH 3/5] fix: check return values. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 9 ++++--- source/libs/executor/src/operator.c | 21 ++++++++++++++++ source/libs/executor/src/tsort.c | 3 +-- source/libs/stream/src/streamCheckStatus.c | 7 ------ source/libs/stream/src/streamCheckpoint.c | 28 ++++++++++++++-------- source/util/src/tpagedbuf.c | 10 +++++--- 7 files changed, 54 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 6c4fd43aa1..a11d07c518 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -26,7 +26,7 @@ #define T_LONG_JMP(_obj, _c) \ do { \ - ASSERT((_c) != 1); \ + ASSERT((_c) != -1); \ longjmp((_obj), (_c)); \ } while (0) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 801bdcb75a..fab91e9856 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1434,7 +1434,12 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList pNode.suid = suid; pNode.uid = suid; pNode.tableType = TSDB_SUPER_TABLE; + STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + return terrno; + } + uint8_t digest[17] = {0}; int code = getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, @@ -2411,11 +2416,10 @@ bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList- STableListInfo* tableListCreate() { STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); if (pListInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pListInfo->remainGroups = NULL; + pListInfo->remainGroups = NULL; pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pListInfo->pTableList == NULL) { goto _error; @@ -2431,7 +2435,6 @@ STableListInfo* tableListCreate() { _error: tableListDestroy(pListInfo); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 701ed0ddbc..c204494710 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -295,6 +295,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { @@ -362,6 +366,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL){ + pTaskInfo->code = terrno; + return terrno; + } if (pHandle->vnode) { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, @@ -385,6 +393,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } + if (!pTagScanPhyNode->onlyMetaCtbIdx) { code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -398,6 +411,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(uint64_t)); @@ -436,6 +453,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; STableListInfo* pTableListInfo = tableListCreate(); + if (pTableListInfo == NULL) { + pTaskInfo->code = terrno; + return terrno; + } code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8181ad3069..300d7576d9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2452,7 +2452,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } -static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { +static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) { int32_t code = createInitialSources(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2479,7 +2479,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { } code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); - ASSERT(code != 1); return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 9a2323582c..2de86b8794 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } if (pInfo->stage != stage) { - streamMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__CK) { - streamTaskSetFailedCheckpointId(pTask); - } - streamMutexUnlock(&pTask->lock); - return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index acac5dfc9e..7c41c380a6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,8 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { @@ -468,8 +468,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + continue; } if (pReadyInfo->sendCompleted == 1) { @@ -601,9 +601,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { - pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; - stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, - pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId); + struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + if (pInfo->activeId <= 0) { + stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr); + } else { + pInfo->failedId = pInfo->activeId; + stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId, + pInfo->transId); + } } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { @@ -960,6 +966,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); + bool alreadySend = false; if (pStatus.state != TASK_STATUS__CK) { return false; @@ -971,11 +978,12 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) return false; } - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { + int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList); + for (int32_t i = 0; i < num; ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { - streamMutexUnlock(&pInfo->lock); - return TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num); + continue; } if (pSendInfo->nodeId != downstreamNodeId) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 76cf067842..539959126c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -352,9 +352,8 @@ static SPageInfo* getPageInfoFromPayload(void* page) { int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir) { - *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); - - SDiskbasedBuf* pPBuf = *pBuf; + *pBuf = NULL; + SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); if (pPBuf == NULL) { goto _error; } @@ -394,11 +393,16 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->prefix = (char*)dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); + if (pPBuf->emptyDummyIdList == NULL) { + goto _error; + } // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); + *pBuf = pPBuf; return TSDB_CODE_SUCCESS; + _error: destroyDiskbasedBuf(pPBuf); return TSDB_CODE_OUT_OF_MEMORY; From 6645192ff30ce66887f9dbb3af6b25fc253ce2e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 19:12:42 +0800 Subject: [PATCH 4/5] fix(stream): fix syntax error. --- source/libs/executor/src/executil.c | 4 +++- source/libs/stream/src/streamCheckpoint.c | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fab91e9856..ec144596fd 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2333,7 +2333,9 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); - ASSERT(pTableList->map != NULL && slot != NULL); + if (slot == NULL) { + return -1; + } STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); ASSERT(pKeyInfo->uid == tableUid); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7c41c380a6..9da7a5d9c8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,7 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", + pTask->id.idStr, i); continue; } @@ -468,7 +469,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { - stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i); + stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", + pTask->id.idStr, i); continue; } From 6557b6c7aedae8f40ca639eb52036c76189acafd Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Mon, 5 Aug 2024 19:53:21 +0800 Subject: [PATCH 5/5] fix: add clusterCommonCheck.py to army --- tests/army/cluster/incSnapshot.py | 37 +--- tests/army/frame/clusterCommonCheck.py | 277 +++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 35 deletions(-) create mode 100644 tests/army/frame/clusterCommonCheck.py diff --git a/tests/army/cluster/incSnapshot.py b/tests/army/cluster/incSnapshot.py index a4ddddcee5..1028781fcb 100644 --- a/tests/army/cluster/incSnapshot.py +++ b/tests/army/cluster/incSnapshot.py @@ -15,11 +15,7 @@ from frame import * from frame.autogen import * # from frame.server.dnodes import * # from frame.server.cluster import * -from util.cluster import * -sys.path.append("./6-cluster") -from clusterCommonCreate import * -from clusterCommonCheck import clusterComCheck - +from frame.clusterCommonCheck import * class TDTestCase(TBase): updatecfgDict = { @@ -43,29 +39,12 @@ class TDTestCase(TBase): autoGen.insert_data(1000) tdSql.execute(f"flush database {self.db}") sc.dnodeStop(3) - # clusterDnodes.stoptaosd(1) - # clusterDnodes.starttaosd(3) - # time.sleep(5) - # clusterDnodes.stoptaosd(2) - # clusterDnodes.starttaosd(1) - # time.sleep(5) autoGen.insert_data(5000, True) self.flushDb(True) # wait flush operation over time.sleep(5) - # sql = 'show vnodes;' - # while True: - # bFinish = True - # param_list = tdSql.query(sql, row_tag=True) - # for param in param_list: - # if param[3] == 'leading' or param[3] == 'following': - # bFinish = False - # break - # if bFinish: - # break - self.snapshotAgg() - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=3,db_name=f"{self.db}",count_number=60) + self.snapshotAgg() sc.dnodeStopAll() for i in range(1, 4): path = clusterDnodes.getDnodeDir(i) @@ -80,18 +59,6 @@ class TDTestCase(TBase): sc.dnodeStart(3) sql = "show vnodes;" clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=3,db_name=f"{self.db}",count_number=60) - while True: - bFinish = True - param_list = tdSql.query(sql, row_tag=True) - for param in param_list: - if param[3] == 'offline': - tdLog.exit( - "dnode synchronous fail dnode id: %d, vgroup id:%d status offline" % (param[0], param[1])) - if param[3] == 'leading' or param[3] == 'following': - bFinish = False - break - if bFinish: - break self.timestamp_step = 1000 self.insert_rows = 6000 diff --git a/tests/army/frame/clusterCommonCheck.py b/tests/army/frame/clusterCommonCheck.py new file mode 100644 index 0000000000..cca70a88b7 --- /dev/null +++ b/tests/army/frame/clusterCommonCheck.py @@ -0,0 +1,277 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from frame.log import * +from frame.sql import * +from frame.cases import * +from frame.server.dnodes import * +from frame.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class ClusterComCheck: + def init(self, conn, logSql=False): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkDnodes(self,dnodeNumbers, timeout=100): + count=0 + # print(tdSql) + while count < timeout: + tdSql.query("select * from information_schema.ins_dnodes") + # tdLog.debug(tdSql.res) + status=0 + for i in range(dnodeNumbers): + if tdSql.res[i][4] == "ready": + status+=1 + # tdLog.info(status) + + if status == dnodeNumbers: + tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! " % (dnodeNumbers, count+1)) + return True + time.sleep(1) + count+=1 + + else: + tdSql.query("select * from information_schema.ins_dnodes") + tdLog.debug(tdSql.res) + tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within %ds ! "% (dnodeNumbers, timeout)) + + def checkDbRows(self,dbNumbers): + dbNumbers=int(dbNumbers) + count=0 + while count < 5: + tdSql.query("select * from information_schema.ins_databases where name!='collectd' ;") + count+=1 + if tdSql.checkRows(dbNumbers+2): + tdLog.success("we find %d databases and expect %d in clusters! " %(tdSql.queryRows,dbNumbers+2)) + return True + else: + continue + else : + tdLog.debug(tdSql.res) + tdLog.exit("we find %d databases but expect %d in clusters! " %(tdSql.queryRows,dbNumbers)) + + def checkDb(self,dbNumbers,restartNumber,dbNameIndex, timeout=100): + count=0 + alldbNumbers=(dbNumbers*restartNumber)+2 + while count < timeout: + query_status=0 + for j in range(dbNumbers): + for i in range(alldbNumbers): + tdSql.query("select * from information_schema.ins_databases;") + if "%s_%d"%(dbNameIndex,j) == tdSql.res[i][0] : + if tdSql.res[i][15] == "ready": + query_status+=1 + tdLog.debug("check %s_%d that status is ready "%(dbNameIndex,j)) + else: + sleep(1) + continue + # print(query_status) + if query_status == dbNumbers: + tdLog.success(" check %d database and all databases are ready within %ds! " %(dbNumbers,count+1)) + return True + count+=1 + + else: + tdLog.debug(tdSql.res) + tdLog.debug("query status is %d"%query_status) + tdLog.exit("database is not ready within %ds"%(timeout+1)) + + def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,): + tdSql.execute("use %s"%dbname) + tdSql.query("show %s.stables"%dbname) + tdSql.checkRows(stableCount) + tdSql.query("show %s.tables"%dbname) + tdSql.checkRows(CtableCount) + for i in range(stableCount): + tdSql.query("select count(*) from %s%d"%(stbname,i)) + tdSql.checkData(0,0,rowsPerSTable) + return + + def checkMnodeStatus(self,mnodeNums): + self.mnodeNums=int(mnodeNums) + # self.leaderDnode=int(leaderDnode) + tdLog.debug("start to check status of mnodes") + count=0 + + while count < 10: + time.sleep(1) + tdSql.query("select * from information_schema.ins_mnodes;") + if tdSql.checkRows(self.mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + + if self.mnodeNums == 1: + if tdSql.res[0][2]== 'leader' and tdSql.res[0][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + elif self.mnodeNums == 3 : + if tdSql.res[0][2]=='leader' and tdSql.res[0][3]== 'ready' : + if tdSql.res[1][2]=='follower' and tdSql.res[1][3]== 'ready' : + if tdSql.res[2][2]=='follower' and tdSql.res[2][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.res[1][2]=='leader' and tdSql.res[1][3]== 'ready' : + if tdSql.res[0][2]=='follower' and tdSql.res[0][3]== 'ready' : + if tdSql.res[2][2]=='follower' and tdSql.res[2][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.res[2][2]=='leader' and tdSql.res[2][3]== 'ready' : + if tdSql.res[0][2]=='follower' and tdSql.res[0][3]== 'ready' : + if tdSql.res[1][2]=='follower' and tdSql.res[1][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + elif self.mnodeNums == 2 : + if tdSql.res[0][2]=='leader' and tdSql.res[0][3]== 'ready' : + if tdSql.res[1][2]=='follower' and tdSql.res[1][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + elif tdSql.res[1][2]=='leader' and tdSql.res[1][3]== 'ready' : + if tdSql.res[0][2]=='follower' and tdSql.res[0][3]== 'ready' : + tdLog.success("%d mnodes is ready in 10s"%self.mnodeNums) + return True + count+=1 + else: + tdLog.debug(tdSql.res) + tdLog.exit("cluster of %d mnodes is not ready in 10s " %self.mnodeNums) + + + + + def check3mnodeoff(self,offlineDnodeNo,mnodeNums=3): + count=0 + while count < 10: + time.sleep(1) + tdSql.query("select * from information_schema.ins_mnodes;") + if tdSql.checkRows(mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + else: + tdLog.exit("mnode number is correct") + if offlineDnodeNo == 1: + if tdSql.res[0][2]=='offline' : + if tdSql.res[1][2]=='leader': + if tdSql.res[2][2]=='follower': + tdLog.success("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.res[1][2]=='follower': + if tdSql.res[2][2]=='leader': + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + elif offlineDnodeNo == 2: + if tdSql.res[1][2]=='offline' : + if tdSql.res[0][2]=='leader': + if tdSql.res[2][2]=='follower': + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.res[0][2]=='follower': + if tdSql.res[2][2]=='leader': + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + elif offlineDnodeNo == 3: + if tdSql.res[2][2]=='offline' : + if tdSql.res[0][2]=='leader': + if tdSql.res[1][2]=='follower': + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + elif tdSql.res[0][2]=='follower': + if tdSql.res[1][2]=='leader': + tdLog.debug("stop mnodes on dnode %d successfully in 10s" %offlineDnodeNo) + return True + count+=1 + else: + tdLog.debug(tdSql.res) + tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ") + + def check3mnode2off(self,mnodeNums=3): + count=0 + while count < 10: + time.sleep(1) + tdSql.query("select * from information_schema.ins_mnodes;") + if tdSql.checkRows(mnodeNums) : + tdLog.success("cluster has %d mnodes" %self.mnodeNums ) + else: + tdLog.exit("mnode number is correct") + if tdSql.res[0][2]=='leader' : + if tdSql.res[1][2]=='offline': + if tdSql.res[2][2]=='offline': + tdLog.success("stop mnodes of follower on dnode successfully in 10s") + return True + count+=1 + else: + tdLog.debug(tdSql.res) + tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s") + + def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"): + """ check vgroups status in 10s after db vgroups status is changed """ + vgroup_numbers = int(vgroup_numbers) + self.db_replica = int(db_replica) + tdLog.debug("start to check status of vgroups") + count=0 + last_number=vgroup_numbers-1 + while count < count_number: + time.sleep(1) + count+=1 + print("check vgroup count :", count) + tdSql.query(f"show {db_name}.vgroups;") + if tdSql.getRows() != vgroup_numbers : + continue + if self.db_replica == 1 : + if tdSql.res[0][4] == 'leader' and tdSql.res[last_number][4] == 'leader': + tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") + print("db replica :",tdSql.res[0][0]) + if tdSql.res[0][0] == db_replica: + tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s") + return True + + elif self.db_replica == 3 : + vgroup_status_first=[tdSql.res[0][4],tdSql.res[0][6],tdSql.res[0][8]] + + vgroup_status_last=[tdSql.res[last_number][4],tdSql.res[last_number][6],tdSql.res[last_number][8]] + if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2: + if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2: + tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") + print("db replica :",tdSql.res[0][0]) + if tdSql.res[0][0] == db_replica: + tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s") + return True + else: + tdLog.debug(tdSql.res) + tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ") + caller = inspect.getframeinfo(inspect.stack()[1][0]) + args = (caller.filename, caller.lineno) + tdLog.exit("%s(%d) failed " % args) + + + + + def close(self): + self.cursor.close() + +clusterComCheck = ClusterComCheck()