diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 7c1ca294e7..649a7a4524 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -347,11 +347,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SExprSupp* pSup = &pInfo->pseudoExprSup; code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, pTaskInfo, NULL); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - (*ppRes) = NULL; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid); pInfo->indexOfBufferedRes += 1; @@ -414,11 +410,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid; code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, pTaskInfo, NULL); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - (*ppRes) = NULL; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } } diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 638536349d..eb49057d89 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -938,7 +938,9 @@ _return: } if (code) { + qError("%s failed since %s", __func__, tstrerror(code)); pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); } else { seqStableJoinComposeRes(pStbJoin, *pRes); } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 2db007541b..767796977c 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -1094,6 +1094,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) { return TSDB_CODE_SUCCESS; @@ -1101,23 +1102,26 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { if (pExchangeInfo->dynamicOp) { code = addDynamicExchangeSource(pOperator); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } int64_t st = taosGetTimestampUs(); if (!pExchangeInfo->seqLoadData) { - int32_t code = prepareConcurrentlyLoad(pOperator); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = prepareConcurrentlyLoad(pOperator); + QUERY_CHECK_CODE(code, lino, _end); pExchangeInfo->openedTs = taosGetTimestampUs(); } OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3ce20dbbd9..2df235c0d9 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -450,7 +450,7 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** QRY_PARAM_CHECK(ppRes); if (pOperator->status == OP_EXEC_DONE) { - return TSDB_CODE_SUCCESS; + return code; } if (pOperator->status == OP_RES_TO_RETURN) { @@ -502,6 +502,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } else { (*ppRes) = buildGroupResultDataBlockByHash(pOperator); } @@ -1533,8 +1534,9 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = NULL; return code; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 347c48b4d1..1f43a429b3 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -993,6 +993,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSDataBlock* pRes = pJoin->finBlk; int64_t st = 0; @@ -1003,7 +1004,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p if (pOperator->status == OP_EXEC_DONE) { pRes->info.rows = 0; - goto _return; + goto _end; } if (!pJoin->keyHashBuilt) { @@ -1011,13 +1012,10 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p bool queryDone = false; code = hJoinBuildHash(pOperator, &queryDone); - if (code) { - pTaskInfo->code = code; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); if (queryDone) { - goto _return; + goto _end; } } @@ -1025,17 +1023,11 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p if (pJoin->ctx.rowRemains) { code = (*pJoin->joinFp)(pOperator); - if (code) { - pTaskInfo->code = code; - return pTaskInfo->code; - } + QUERY_CHECK_CODE(code, lino, _end); if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); - if (code) { - pTaskInfo->code = code; - return pTaskInfo->code; - } + QUERY_CHECK_CODE(code, lino, _end); } if (pRes->info.rows > 0) { @@ -1055,10 +1047,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p pJoin->execInfo.probeBlkRows += pBlock->info.rows; code = hJoinPrepareStart(pOperator, pBlock); - if (code) { - pTaskInfo->code = code; - return pTaskInfo->code; - } + QUERY_CHECK_CODE(code, lino, _end); if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) { continue; @@ -1066,10 +1055,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) { code = doFilter(pRes, pJoin->pFinFilter, NULL); - if (code) { - pTaskInfo->code = code; - return pTaskInfo->code; - } + QUERY_CHECK_CODE(code, lino, _end); } if (pRes->info.rows > 0) { @@ -1077,11 +1063,15 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p } } -_return: +_end: if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } if (pRes->info.rows > 0) { *pResBlock = pRes; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index af5e4ed235..e007504ffb 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1731,6 +1731,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc if (pJoin->pFinFilter != NULL) { code = doFilter(pBlock, pJoin->pFinFilter, NULL); if (code) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pJoin->errCode = code; T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode); } diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 49973ac373..45cd755f78 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -492,6 +492,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + pOperator->pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, terrno); } @@ -501,6 +503,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { return 0; @@ -509,18 +513,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; - int32_t code = pOperator->fpSet._openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return code; - } + code = pOperator->fpSet._openFn(pOperator); + QUERY_CHECK_CODE(code, lino, _end); if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) { code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock); - if (code) { - pTaskInfo->code = code; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } if ((*pResBlock) != NULL) { @@ -530,6 +528,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { setOperatorCompleted(pOperator); } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index bbd6ce39d1..8daf4695db 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -881,14 +881,17 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) { QRY_PARAM_CHECK(pRes); + int32_t lino = 0; int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM); - if (TSDB_CODE_SUCCESS != code) { + QUERY_CHECK_CODE(code, lino, _end); + code = pOperator->fpSet.getNextFn(pOperator, pRes); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pOperator->pTaskInfo->code = code; - } else { - code = pOperator->fpSet.getNextFn(pOperator, pRes); - if (code) { - pOperator->pTaskInfo->code = code; - } + T_LONG_JMP(pOperator->pTaskInfo->env, code); } return code; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 790e97b27c..5b9e531679 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -270,6 +270,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pFinalRes = pProjectInfo->pFinalRes; int32_t code = 0; + int32_t lino = 0; int64_t st = 0; int32_t order = pInfo->inputTsOrder; int32_t scanFlag = 0; @@ -290,9 +291,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { if (downstream == NULL) { code = doGenerateSourceData(pOperator); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); if (pProjectInfo->outputIgnoreGroup) { pRes->info.id.groupId = 0; @@ -348,20 +347,14 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } code = setInputDataBlock(pSup, pBlock, order, scanFlag, false); - if (code) { - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, pProjectInfo->pPseudoColInfo); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + QUERY_CHECK_CODE(code, lino, _end); status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator); if (status == PROJECT_RETRIEVE_CONTINUE) { @@ -377,11 +370,8 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { pFinalRes->info.version = pRes->info.version; // continue merge data, ignore the group id - int32_t ret = blockDataMerge(pFinalRes, pRes); - if (ret < 0) { - pTaskInfo->code = code; - return code; - } + code = blockDataMerge(pFinalRes, pRes); + QUERY_CHECK_CODE(code, lino, _end); if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) { continue; @@ -390,10 +380,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { // do apply filter code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL); - if (code) { - pTaskInfo->code = code; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { @@ -404,10 +391,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { // do apply filter if (pRes->info.rows > 0) { code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - if (code) { - pTaskInfo->code = code; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); if (pRes->info.rows == 0) { continue; @@ -436,6 +420,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } *pResBlock = (p->info.rows > 0)? p:NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } @@ -578,14 +569,15 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo SOptrBasicInfo* pInfo = &pIndefInfo->binfo; SExprSupp* pSup = &pOperator->exprSupp; int64_t st = 0; - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_EXEC_DONE) { - return 0; + return code; } if (pOperator->cost.openCost == 0) { @@ -637,10 +629,7 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo } code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - if (code) { - pTaskInfo->code = code; - return code; - } + QUERY_CHECK_CODE(code, lino, _end); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { @@ -658,6 +647,13 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo } *pResBlock = (rows > 0) ? pInfo->pRes : NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76e4f8ba56..a2e2dc9b98 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -951,7 +951,8 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader); - return pTaskInfo->code; + code = pTaskInfo->code; + goto _end; } if (pOperator->status == OP_EXEC_DONE) { @@ -996,6 +997,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } return code; } @@ -1416,6 +1418,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } return code; @@ -2944,8 +2947,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pResult && pResult->info.rows > 0) { bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + QUERY_CHECK_CODE(code, lino, _end); qDebug("tmqsnap doQueueScan get data utid:%" PRId64 "", pResult->info.id.uid); - if (pResult->info.rows > 0 || code != TSDB_CODE_SUCCESS) { + if (pResult->info.rows > 0) { (*ppRes) = pResult; return code; } @@ -3009,8 +3013,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = NULL; return code; @@ -3340,9 +3345,7 @@ FETCH_NEXT_BLOCK: if (pBlock->info.parTbName[0]) { code = pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + QUERY_CHECK_CODE(code, lino, _end); } // TODO move into scan @@ -3482,7 +3485,7 @@ FETCH_NEXT_BLOCK: return code; } qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, - lino); + __LINE__); blockDataCleanup(pInfo->pUpdateDataRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } break; @@ -3496,7 +3499,7 @@ FETCH_NEXT_BLOCK: return code; } qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, - lino); + __LINE__); blockDataCleanup(pInfo->pUpdateDataRes); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } break; @@ -3658,8 +3661,9 @@ FETCH_NEXT_BLOCK: _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = NULL; return code; @@ -3730,6 +3734,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pBlock && pBlock->info.rows > 0) { bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext); code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + QUERY_CHECK_CODE(code, lino, _end); qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); (*ppRes) = pBlock; return code; @@ -3741,7 +3746,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { QUERY_CHECK_CODE(code, lino, _end); if (code != 0) { tDeleteSchemaWrapper(mtInfo.schema); - goto _end; + QUERY_CHECK_CODE(code, lino, _end); } STqOffsetVal offset = {0}; if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal @@ -3831,6 +3836,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = NULL; @@ -4677,6 +4683,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } return code; @@ -4684,6 +4691,7 @@ _end: static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { (*ppRes) = NULL; return code; @@ -4699,10 +4707,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* int32_t size = 0; code = tableListGetSize(pInfo->pTableListInfo, &size); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - return code; - } + QUERY_CHECK_CODE(code, lino, _end); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); @@ -4716,11 +4721,11 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); - if (code == TSDB_CODE_OUT_OF_MEMORY) { - break; - } else { + if (code != TSDB_CODE_OUT_OF_MEMORY) { // ignore other error + code = TSDB_CODE_SUCCESS; } + QUERY_CHECK_CODE(code, lino, _end); ++count; if (++pInfo->curPos >= size) { @@ -4744,6 +4749,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* pOperator->resultInfo.totalRows += pRes->info.rows; (*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } @@ -5429,6 +5441,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } else { (*ppRes) = pBlock; } @@ -5945,6 +5958,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } else { (*ppRes) = pBlock; } @@ -6460,7 +6474,12 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe } code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes); - if ((pRes->info.rows > 0) && (code == 0)) { + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed since %s", __func__, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + if (pRes->info.rows > 0) { *ppRes = pRes; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index d12f6dd94c..11b3fa8c70 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -349,82 +349,84 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { int32_t doOpenSortOperator(SOperatorInfo* pOperator) { SSortOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSortSource* pSource =NULL; if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; + return code; } pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. pInfo->pSortHandle = NULL; - int32_t code = + code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); - SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); - if (pSource == NULL) { - return terrno; - } + pSource = taosMemoryCalloc(1, sizeof(SSortSource)); + QUERY_CHECK_NULL(pSource, code, lino, _end, terrno); pSource->param = pOperator->pDownstream[0]; pSource->onlyRef = true; code = tsortAddSource(pInfo->pSortHandle, pSource); - if (code) { - taosMemoryFree(pSource); - return code; - } + QUERY_CHECK_CODE(code, lino, _end); + pSource = NULL; code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - } else { - pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; - pOperator->status = OP_RES_TO_RETURN; - OPTR_SET_OPENED(pOperator); - } + QUERY_CHECK_CODE(code, lino, _end); + pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; + pOperator->status = OP_RES_TO_RETURN; + OPTR_SET_OPENED(pOperator); +_end: + if (pSource) { + taosMemoryFree(pSource); + } + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { - return 0; + return code; } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortOperatorInfo* pInfo = pOperator->info; - int32_t code = pOperator->fpSet._openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = pOperator->fpSet._openFn(pOperator); + QUERY_CHECK_CODE(code, lino, _end); // multi-group case not handle here SSDataBlock* pBlock = NULL; while (1) { if (tsortIsClosed(pInfo->pSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pTaskInfo->env, code); + QUERY_CHECK_CODE(code, lino, _end); } code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo, &pBlock); - if (pBlock == NULL || code != 0) { + QUERY_CHECK_CODE(code, lino, _end); + if (pBlock == NULL) { setOperatorCompleted(pOperator); return code; } code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); - if (code) { - break; - } + QUERY_CHECK_CODE(code, lino, _end); if (blockDataGetNumOfRows(pBlock) == 0) { continue; @@ -443,6 +445,12 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } *pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } @@ -692,16 +700,16 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { QRY_PARAM_CHECK(pResBlock); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupSortOperatorInfo* pInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pOperator->status == OP_EXEC_DONE) { - return 0; - } - - int32_t code = pOperator->fpSet._openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { return code; } + code = pOperator->fpSet._openFn(pOperator); + QUERY_CHECK_CODE(code, lino, _end); + if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -714,30 +722,25 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; pInfo->childOpStatus = CHILD_OP_NEW_GROUP; code = beginSortGroup(pOperator); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } SSDataBlock* pBlock = NULL; while (pInfo->pCurrSortHandle != NULL) { if (tsortIsClosed(pInfo->pCurrSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pTaskInfo->env, code); + QUERY_CHECK_CODE(code, lino, _end); } // beginSortGroup would fetch all child blocks of pInfo->currGroupId; if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) { - pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - return code; + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); } code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo, &pBlock); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); if (pBlock != NULL) { pBlock->info.id.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -748,9 +751,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { (void) finishSortGroup(pOperator); pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId; code = beginSortGroup(pOperator); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { (void) finishSortGroup(pOperator); setOperatorCompleted(pOperator); @@ -759,6 +760,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { } } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 577af29bf7..2a53e7ba2e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -744,8 +744,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a46bbdace3..d6eec0a8a4 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -179,7 +179,7 @@ _end: pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); } pAggSup->stateStore.streamStateFreeCur(pCur); - qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, + qDebug("===stream===set event cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey); _error: @@ -233,7 +233,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW pWinInfo->pWinFlag->endFlag = ends[i]; } else if (pWin->ekey == pTsData[i]) { pWinInfo->pWinFlag->endFlag |= ends[i]; - } else { + } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) { *pRebuild = true; pWinInfo->pWinFlag->endFlag |= ends[i]; (*pWinRow) = i + 1 - start; @@ -734,8 +734,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 4d5f597ab6..291cc3b67b 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1155,8 +1155,9 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setOperatorCompleted(pOperator); resetStreamFillInfo(pInfo); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 22e462abab..a23f065657 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1802,8 +1802,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -3571,8 +3572,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -4076,8 +4078,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } clearFunctionContext(&pOperator->exprSupp); @@ -4816,8 +4819,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -5216,6 +5220,7 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p code = TSDB_CODE_SUCCESS; break; } + QUERY_CHECK_CODE(code, lino, _end); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); } @@ -5258,8 +5263,9 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -5795,8 +5801,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* _end: if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = NULL; return code; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index dcebdf59a9..7467d391d8 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2049,7 +2049,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) if (isTaskKilled(pOperator->pTaskInfo)) { setOperatorCompleted(pOperator); (*ppRes) = NULL; - return pTaskInfo->code; + break; } blockDataCleanup(pInfo->pRes); @@ -2092,12 +2092,18 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) continue; } (*ppRes) = pBlock; - return pTaskInfo->code; } else { (*ppRes) = NULL; - return pTaskInfo->code; } + break; } + +_end: + if (pTaskInfo->code) { + qError("%s failed since %s", __func__, tstrerror(pTaskInfo->code)); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + return pTaskInfo->code; } static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name, diff --git a/source/libs/monitor/src/monFramework.c b/source/libs/monitor/src/monFramework.c index 76473ccbb1..1e358ac8d4 100644 --- a/source/libs/monitor/src/monFramework.c +++ b/source/libs/monitor/src/monFramework.c @@ -100,7 +100,9 @@ extern char* tsMonFwUri; #define VNODE_ROLE "taosd_vnodes_info:role" void monInitMonitorFW(){ - (void)taos_collector_registry_default_init(); + if (taos_collector_registry_default_init() != 0) { + uError("failed to init default collector registry"); + } tsMonitor.metrics = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); taos_gauge_t *gauge = NULL; @@ -115,7 +117,9 @@ void monInitMonitorFW(){ for(int32_t i = 0; i < 25; i++){ gauge= taos_gauge_new(dnodes_gauges[i], "", dnodes_label_count, dnodes_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to delete metric %s", dnodes_gauges[i]); + } } if (taosHashPut(tsMonitor.metrics, dnodes_gauges[i], strlen(dnodes_gauges[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { @@ -129,7 +133,9 @@ void monInitMonitorFW(){ for(int32_t i = 0; i < 3; i++){ gauge= taos_gauge_new(dnodes_data_gauges[i], "", dnodes_data_label_count, dnodes_data_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to delete metric %s", dnodes_data_gauges[i]); + } } if (taosHashPut(tsMonitor.metrics, dnodes_data_gauges[i], strlen(dnodes_data_gauges[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { @@ -143,7 +149,9 @@ void monInitMonitorFW(){ for(int32_t i = 0; i < 3; i++){ gauge= taos_gauge_new(dnodes_log_gauges[i], "", dnodes_log_label_count, dnodes_log_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to delete metric %s", dnodes_log_gauges[i]); + } } if (taosHashPut(tsMonitor.metrics, dnodes_log_gauges[i], strlen(dnodes_log_gauges[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { @@ -154,7 +162,9 @@ void monInitMonitorFW(){ void monCleanupMonitorFW(){ taosHashCleanup(tsMonitor.metrics); - (void)taos_collector_registry_destroy(TAOS_COLLECTOR_REGISTRY_DEFAULT); + if (taos_collector_registry_destroy(TAOS_COLLECTOR_REGISTRY_DEFAULT) != 0) { + uError("failed to destroy default collector registry"); + } TAOS_COLLECTOR_REGISTRY_DEFAULT = NULL; } @@ -174,7 +184,9 @@ void monGenClusterInfoTable(SMonInfo *pMonitor){ uError("failed to delete metric %s", metric_names[i]); } - (void)taosHashRemove(tsMonitor.metrics, metric_names[i], strlen(metric_names[i])); + if (taosHashRemove(tsMonitor.metrics, metric_names[i], strlen(metric_names[i])) != 0) { + uError("failed to remove metric %s", metric_names[i]); + } } if(pBasicInfo->cluster_id == 0) { @@ -191,7 +203,9 @@ void monGenClusterInfoTable(SMonInfo *pMonitor){ for(int32_t i = 0; i < 18; i++){ gauge= taos_gauge_new(metric_names[i], "", label_count, sample_labels1); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to delete metric %s", metric_names[i]); + } } if (taosHashPut(tsMonitor.metrics, metric_names[i], strlen(metric_names[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { uError("failed to add cluster gauge at%d:%s", i, metric_names[i]); @@ -317,11 +331,15 @@ void monGenVgroupInfoTable(SMonInfo *pMonitor){ const char *vgroup_sample_labels[] = {"cluster_id", "vgroup_id", "database_name"}; taos_gauge_t *tableNumGauge = taos_gauge_new(TABLES_NUM, "", vgroup_label_count, vgroup_sample_labels); if(taos_collector_registry_register_metric(tableNumGauge) == 1){ - (void)taos_counter_destroy(tableNumGauge); + if (taos_counter_destroy(tableNumGauge) != 0) { + uError("failed to delete metric " TABLES_NUM); + } } taos_gauge_t *statusGauge = taos_gauge_new(STATUS, "", vgroup_label_count, vgroup_sample_labels); if(taos_collector_registry_register_metric(statusGauge) == 1){ - (void)taos_counter_destroy(statusGauge); + if (taos_counter_destroy(statusGauge) != 0) { + uError("failed to delete metric " STATUS); + } } char cluster_id[TSDB_CLUSTER_ID_LEN] = {0}; @@ -530,7 +548,9 @@ void monGenDnodeStatusInfoTable(SMonInfo *pMonitor){ gauge= taos_gauge_new(DNODE_STATUS, "", dnodes_label_count, dnodes_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to delete metric " DNODE_STATUS); + } } char cluster_id[TSDB_CLUSTER_ID_LEN]; @@ -633,7 +653,9 @@ void monGenMnodeRoleTable(SMonInfo *pMonitor){ uError("failed to delete metric %s", mnodes_role_gauges[i]); } - (void)taosHashRemove(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i])); + if (taosHashRemove(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i])) != 0) { + uError("failed to remove metric %s", mnodes_role_gauges[i]); + } } SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster; @@ -647,7 +669,9 @@ void monGenMnodeRoleTable(SMonInfo *pMonitor){ for(int32_t i = 0; i < 1; i++){ gauge= taos_gauge_new(mnodes_role_gauges[i], "", mnodes_role_label_count, mnodes_role_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to destroy gauge"); + } } if (taosHashPut(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { @@ -702,7 +726,9 @@ void monGenVnodeRoleTable(SMonInfo *pMonitor){ uError("failed to delete metric %s", vnodes_role_gauges[i]); } - (void)taosHashRemove(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i])); + if (taosHashRemove(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i])) != 0) { + uError("failed to remove metric %s", vnodes_role_gauges[i]); + } } SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup; @@ -716,7 +742,9 @@ void monGenVnodeRoleTable(SMonInfo *pMonitor){ for(int32_t i = 0; i < 1; i++){ gauge= taos_gauge_new(vnodes_role_gauges[i], "", vnodes_role_label_count, vnodes_role_sample_labels); if(taos_collector_registry_register_metric(gauge) == 1){ - (void)taos_counter_destroy(gauge); + if (taos_counter_destroy(gauge) != 0) { + uError("failed to destroy gauge"); + } } if (taosHashPut(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i]), &gauge, sizeof(taos_gauge_t *)) != 0) { @@ -774,7 +802,9 @@ void monSendPromReport() { tmp) != 0) { uError("failed to send monitor msg"); } else { - (void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT); + if (taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT) != 0) { + uError("failed to clear batch"); + } } taosMemoryFreeClear(pCont); } diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index f581d8c83d..744177b7a1 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -145,7 +145,9 @@ void monInitVnode() { counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", label_count, sample_labels); uDebug("new metric:%p", counter); if (taos_collector_registry_register_metric(counter) == 1) { - (void)taos_counter_destroy(counter); + if (taos_counter_destroy(counter) != 0) { + uError("failed to destroy metric:%p", counter); + } uError("failed to register metric:%p", counter); } else { tsInsertCounter = counter; @@ -226,14 +228,17 @@ static void monGenBasicJson(SMonInfo *pMonitor) { SJson *pJson = pMonitor->pJson; char buf[40] = {0}; - (void)taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI); + if (taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI) != 0) { + uError("failed to format time"); + return; + } - (void)tjsonAddStringToObject(pJson, "ts", buf); - (void)tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id); - (void)tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep); + if (tjsonAddStringToObject(pJson, "ts", buf) != 0) uError("failed to add ts"); + if (tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id) != 0) uError("failed to add dnode_id"); + if (tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep) != 0) uError("failed to add dnode_ep"); snprintf(buf, sizeof(buf), "%" PRId64, pInfo->cluster_id); - (void)tjsonAddStringToObject(pJson, "cluster_id", buf); - (void)tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol); + if (tjsonAddStringToObject(pJson, "cluster_id", buf) != 0) uError("failed to add cluster_id"); + if (tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol) != 0) uError("failed to add protocol"); } static void monGenBasicJsonBasic(SMonInfo *pMonitor) { @@ -244,12 +249,12 @@ static void monGenBasicJsonBasic(SMonInfo *pMonitor) { char buf[40] = {0}; sprintf(buf, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); - (void)tjsonAddStringToObject(pJson, "ts", buf); - (void)tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id); - (void)tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep); + if (tjsonAddStringToObject(pJson, "ts", buf) != 0) uError("failed to add ts"); + if (tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id) != 0) uError("failed to add dnode_id"); + if (tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep) != 0) uError("failed to add dnode_ep"); snprintf(buf, sizeof(buf), "%" PRId64, pInfo->cluster_id); - (void)tjsonAddStringToObject(pJson, "cluster_id", buf); - (void)tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol); + if (tjsonAddStringToObject(pJson, "cluster_id", buf) != 0) uError("failed to add cluster_id"); + if (tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol) != 0) uError("failed to add protocol"); } static void monGenClusterJson(SMonInfo *pMonitor) { @@ -263,21 +268,24 @@ static void monGenClusterJson(SMonInfo *pMonitor) { return; } - (void)tjsonAddStringToObject(pJson, "first_ep", pInfo->first_ep); - (void)tjsonAddDoubleToObject(pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id); - (void)tjsonAddStringToObject(pJson, "version", pInfo->version); - (void)tjsonAddDoubleToObject(pJson, "master_uptime", pInfo->master_uptime); - (void)tjsonAddDoubleToObject(pJson, "monitor_interval", pInfo->monitor_interval); - (void)tjsonAddDoubleToObject(pJson, "dbs_total", pInfo->dbs_total); - (void)tjsonAddDoubleToObject(pJson, "tbs_total", pInfo->tbs_total); - (void)tjsonAddDoubleToObject(pJson, "stbs_total", pInfo->stbs_total); - (void)tjsonAddDoubleToObject(pJson, "vgroups_total", pInfo->vgroups_total); - (void)tjsonAddDoubleToObject(pJson, "vgroups_alive", pInfo->vgroups_alive); - (void)tjsonAddDoubleToObject(pJson, "vnodes_total", pInfo->vnodes_total); - (void)tjsonAddDoubleToObject(pJson, "vnodes_alive", pInfo->vnodes_alive); - (void)tjsonAddDoubleToObject(pJson, "connections_total", pInfo->connections_total); - (void)tjsonAddDoubleToObject(pJson, "topics_total", pInfo->topics_toal); - (void)tjsonAddDoubleToObject(pJson, "streams_total", pInfo->streams_total); + if (tjsonAddStringToObject(pJson, "first_ep", pInfo->first_ep) != 0) uError("failed to add first_ep"); + if (tjsonAddDoubleToObject(pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id) != 0) + uError("failed to add first_ep_dnode_id"); + if (tjsonAddStringToObject(pJson, "version", pInfo->version) != 0) uError("failed to add version"); + if (tjsonAddDoubleToObject(pJson, "master_uptime", pInfo->master_uptime) != 0) uError("failed to add master_uptime"); + if (tjsonAddDoubleToObject(pJson, "monitor_interval", pInfo->monitor_interval) != 0) + uError("failed to add monitor_interval"); + if (tjsonAddDoubleToObject(pJson, "dbs_total", pInfo->dbs_total) != 0) uError("failed to add dbs_total"); + if (tjsonAddDoubleToObject(pJson, "tbs_total", pInfo->tbs_total) != 0) uError("failed to add tbs_total"); + if (tjsonAddDoubleToObject(pJson, "stbs_total", pInfo->stbs_total) != 0) uError("failed to add stbs_total"); + if (tjsonAddDoubleToObject(pJson, "vgroups_total", pInfo->vgroups_total) != 0) uError("failed to add vgroups_total"); + if (tjsonAddDoubleToObject(pJson, "vgroups_alive", pInfo->vgroups_alive) != 0) uError("failed to add vgroups_alive"); + if (tjsonAddDoubleToObject(pJson, "vnodes_total", pInfo->vnodes_total) != 0) uError("failed to add vnodes_total"); + if (tjsonAddDoubleToObject(pJson, "vnodes_alive", pInfo->vnodes_alive) != 0) uError("failed to add vnodes_alive"); + if (tjsonAddDoubleToObject(pJson, "connections_total", pInfo->connections_total) != 0) + uError("failed to add connections_total"); + if (tjsonAddDoubleToObject(pJson, "topics_total", pInfo->topics_toal) != 0) uError("failed to add topics_total"); + if (tjsonAddDoubleToObject(pJson, "streams_total", pInfo->streams_total) != 0) uError("failed to add streams_total"); SJson *pDnodesJson = tjsonAddArrayToObject(pJson, "dnodes"); if (pDnodesJson == NULL) return; @@ -287,9 +295,9 @@ static void monGenClusterJson(SMonInfo *pMonitor) { if (pDnodeJson == NULL) continue; SMonDnodeDesc *pDnodeDesc = taosArrayGet(pInfo->dnodes, i); - (void)tjsonAddDoubleToObject(pDnodeJson, "dnode_id", pDnodeDesc->dnode_id); - (void)tjsonAddStringToObject(pDnodeJson, "dnode_ep", pDnodeDesc->dnode_ep); - (void)tjsonAddStringToObject(pDnodeJson, "status", pDnodeDesc->status); + if (tjsonAddDoubleToObject(pDnodeJson, "dnode_id", pDnodeDesc->dnode_id) != 0) uError("failed to add dnode_id"); + if (tjsonAddStringToObject(pDnodeJson, "dnode_ep", pDnodeDesc->dnode_ep) != 0) uError("failed to add dnode_ep"); + if (tjsonAddStringToObject(pDnodeJson, "status", pDnodeDesc->status) != 0) uError("failed to add status"); if (tjsonAddItemToArray(pDnodesJson, pDnodeJson) != 0) tjsonDelete(pDnodeJson); } @@ -302,9 +310,9 @@ static void monGenClusterJson(SMonInfo *pMonitor) { if (pMnodeJson == NULL) continue; SMonMnodeDesc *pMnodeDesc = taosArrayGet(pInfo->mnodes, i); - (void)tjsonAddDoubleToObject(pMnodeJson, "mnode_id", pMnodeDesc->mnode_id); - (void)tjsonAddStringToObject(pMnodeJson, "mnode_ep", pMnodeDesc->mnode_ep); - (void)tjsonAddStringToObject(pMnodeJson, "role", pMnodeDesc->role); + if (tjsonAddDoubleToObject(pMnodeJson, "mnode_id", pMnodeDesc->mnode_id) != 0) uError("failed to add mnode_id"); + if (tjsonAddStringToObject(pMnodeJson, "mnode_ep", pMnodeDesc->mnode_ep) != 0) uError("failed to add mnode_ep"); + if (tjsonAddStringToObject(pMnodeJson, "role", pMnodeDesc->role) != 0) uError("failed to add role"); if (tjsonAddItemToArray(pMnodesJson, pMnodeJson) != 0) tjsonDelete(pMnodeJson); } @@ -314,11 +322,11 @@ static void monGenClusterJsonBasic(SMonInfo *pMonitor) { SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster; if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; - // (void)tjsonAddStringToObject(pMonitor->pJson, "first_ep", pInfo->first_ep); - (void)tjsonAddStringToObject(pMonitor->pJson, "first_ep", tsFirst); - (void)tjsonAddDoubleToObject(pMonitor->pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id); - (void)tjsonAddStringToObject(pMonitor->pJson, "cluster_version", pInfo->version); - // (void)tjsonAddDoubleToObject(pMonitor->pJson, "monitor_interval", pInfo->monitor_interval); + if (tjsonAddStringToObject(pMonitor->pJson, "first_ep", tsFirst) != 0) uError("failed to add first_ep"); + if (tjsonAddDoubleToObject(pMonitor->pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id) != 0) + uError("failed to add first_ep_dnode_id"); + if (tjsonAddStringToObject(pMonitor->pJson, "cluster_version", pInfo->version) != 0) + uError("failed to add cluster_version"); } static void monGenVgroupJson(SMonInfo *pMonitor) { @@ -337,10 +345,13 @@ static void monGenVgroupJson(SMonInfo *pMonitor) { } SMonVgroupDesc *pVgroupDesc = taosArrayGet(pInfo->vgroups, i); - (void)tjsonAddDoubleToObject(pVgroupJson, "vgroup_id", pVgroupDesc->vgroup_id); - (void)tjsonAddStringToObject(pVgroupJson, "database_name", pVgroupDesc->database_name); - (void)tjsonAddDoubleToObject(pVgroupJson, "tables_num", pVgroupDesc->tables_num); - (void)tjsonAddStringToObject(pVgroupJson, "status", pVgroupDesc->status); + if (tjsonAddDoubleToObject(pVgroupJson, "vgroup_id", pVgroupDesc->vgroup_id) != 0) + uError("failed to add vgroup_id"); + if (tjsonAddStringToObject(pVgroupJson, "database_name", pVgroupDesc->database_name) != 0) + uError("failed to add database_name"); + if (tjsonAddDoubleToObject(pVgroupJson, "tables_num", pVgroupDesc->tables_num) != 0) + uError("failed to add tables_num"); + if (tjsonAddStringToObject(pVgroupJson, "status", pVgroupDesc->status) != 0) uError("failed to add status"); SJson *pVnodesJson = tjsonAddArrayToObject(pVgroupJson, "vnodes"); if (pVnodesJson == NULL) continue; @@ -352,8 +363,9 @@ static void monGenVgroupJson(SMonInfo *pMonitor) { SJson *pVnodeJson = tjsonCreateObject(); if (pVnodeJson == NULL) continue; - (void)tjsonAddDoubleToObject(pVnodeJson, "dnode_id", pVnodeDesc->dnode_id); - (void)tjsonAddStringToObject(pVnodeJson, "vnode_role", pVnodeDesc->vnode_role); + if (tjsonAddDoubleToObject(pVnodeJson, "dnode_id", pVnodeDesc->dnode_id) != 0) uError("failed to add dnode_id"); + if (tjsonAddStringToObject(pVnodeJson, "vnode_role", pVnodeDesc->vnode_role) != 0) + uError("failed to add vnode_role"); if (tjsonAddItemToArray(pVnodesJson, pVnodeJson) != 0) tjsonDelete(pVnodeJson); } @@ -376,8 +388,9 @@ static void monGenStbJson(SMonInfo *pMonitor) { } SMonStbDesc *pStbDesc = taosArrayGet(pInfo->stbs, i); - (void)tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name); - (void)tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name); + if (tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name) != 0) uError("failed to add stb_name"); + if (tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name) != 0) + uError("failed to add database_name"); } } @@ -392,9 +405,11 @@ static void monGenGrantJson(SMonInfo *pMonitor) { return; } - (void)tjsonAddDoubleToObject(pJson, "expire_time", pInfo->expire_time); - (void)tjsonAddDoubleToObject(pJson, "timeseries_used", pInfo->timeseries_used); - (void)tjsonAddDoubleToObject(pJson, "timeseries_total", pInfo->timeseries_total); + if (tjsonAddDoubleToObject(pJson, "expire_time", pInfo->expire_time) != 0) uError("failed to add expire_time"); + if (tjsonAddDoubleToObject(pJson, "timeseries_used", pInfo->timeseries_used) != 0) + uError("failed to add timeseries_used"); + if (tjsonAddDoubleToObject(pJson, "timeseries_total", pInfo->timeseries_total) != 0) + uError("failed to add timeseries_total"); } static void monGenDnodeJson(SMonInfo *pMonitor) { @@ -451,36 +466,40 @@ static void monGenDnodeJson(SMonInfo *pMonitor) { double io_read_disk_rate = io_read_disk / interval; double io_write_disk_rate = io_write_disk / interval; - (void)tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime); - (void)tjsonAddDoubleToObject(pJson, "cpu_engine", cpu_engine); - (void)tjsonAddDoubleToObject(pJson, "cpu_system", pSys->cpu_system); - (void)tjsonAddDoubleToObject(pJson, "cpu_cores", pSys->cpu_cores); - (void)tjsonAddDoubleToObject(pJson, "mem_engine", mem_engine); - (void)tjsonAddDoubleToObject(pJson, "mem_system", pSys->mem_system); - (void)tjsonAddDoubleToObject(pJson, "mem_total", pSys->mem_total); - (void)tjsonAddDoubleToObject(pJson, "disk_engine", pSys->disk_engine); - (void)tjsonAddDoubleToObject(pJson, "disk_used", pSys->disk_used); - (void)tjsonAddDoubleToObject(pJson, "disk_total", pSys->disk_total); - (void)tjsonAddDoubleToObject(pJson, "net_in", net_in_rate); - (void)tjsonAddDoubleToObject(pJson, "net_out", net_out_rate); - (void)tjsonAddDoubleToObject(pJson, "io_read", io_read_rate); - (void)tjsonAddDoubleToObject(pJson, "io_write", io_write_rate); - (void)tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate); - (void)tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate); - (void)tjsonAddDoubleToObject(pJson, "req_select", pStat->numOfSelectReqs); - (void)tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate); - (void)tjsonAddDoubleToObject(pJson, "req_insert", pStat->numOfInsertReqs); - (void)tjsonAddDoubleToObject(pJson, "req_insert_success", pStat->numOfInsertSuccessReqs); - (void)tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate); - (void)tjsonAddDoubleToObject(pJson, "req_insert_batch", pStat->numOfBatchInsertReqs); - (void)tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pStat->numOfBatchInsertSuccessReqs); - (void)tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate); - (void)tjsonAddDoubleToObject(pJson, "errors", pStat->errors); - (void)tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes); - (void)tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum); - (void)tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode); - (void)tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode); - (void)tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode); + if (tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime) != 0) uError("failed to add uptime"); + if (tjsonAddDoubleToObject(pJson, "cpu_engine", cpu_engine) != 0) uError("failed to add cpu_engine"); + if (tjsonAddDoubleToObject(pJson, "cpu_system", pSys->cpu_system) != 0) uError("failed to add cpu_system"); + if (tjsonAddDoubleToObject(pJson, "cpu_cores", pSys->cpu_cores) != 0) uError("failed to add cpu_cores"); + if (tjsonAddDoubleToObject(pJson, "mem_engine", mem_engine) != 0) uError("failed to add mem_engine"); + if (tjsonAddDoubleToObject(pJson, "mem_system", pSys->mem_system) != 0) uError("failed to add mem_system"); + if (tjsonAddDoubleToObject(pJson, "mem_total", pSys->mem_total) != 0) uError("failed to add mem_total"); + if (tjsonAddDoubleToObject(pJson, "disk_engine", pSys->disk_engine) != 0) uError("failed to add disk_engine"); + if (tjsonAddDoubleToObject(pJson, "disk_used", pSys->disk_used) != 0) uError("failed to add disk_used"); + if (tjsonAddDoubleToObject(pJson, "disk_total", pSys->disk_total) != 0) uError("failed to add disk_total"); + if (tjsonAddDoubleToObject(pJson, "net_in", net_in_rate) != 0) uError("failed to add net_in"); + if (tjsonAddDoubleToObject(pJson, "net_out", net_out_rate) != 0) uError("failed to add net_out"); + if (tjsonAddDoubleToObject(pJson, "io_read", io_read_rate) != 0) uError("failed to add io_read"); + if (tjsonAddDoubleToObject(pJson, "io_write", io_write_rate) != 0) uError("failed to add io_write"); + if (tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate) != 0) uError("failed to add io_read_disk"); + if (tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate) != 0) uError("failed to add io_write_disk"); + if (tjsonAddDoubleToObject(pJson, "req_select", pStat->numOfSelectReqs) != 0) uError("failed to add req_select"); + if (tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate) != 0) uError("failed to add req_select_rate"); + if (tjsonAddDoubleToObject(pJson, "req_insert", pStat->numOfInsertReqs) != 0) uError("failed to add req_insert"); + if (tjsonAddDoubleToObject(pJson, "req_insert_success", pStat->numOfInsertSuccessReqs) != 0) + uError("failed to add req_insert_success"); + if (tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate) != 0) uError("failed to add req_insert_rate"); + if (tjsonAddDoubleToObject(pJson, "req_insert_batch", pStat->numOfBatchInsertReqs) != 0) + uError("failed to add req_insert_batch"); + if (tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pStat->numOfBatchInsertSuccessReqs) != 0) + uError("failed to add req_insert_batch_success"); + if (tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate) != 0) + uError("failed to add req_insert_batch_rate"); + if (tjsonAddDoubleToObject(pJson, "errors", pStat->errors) != 0) uError("failed to add errors"); + if (tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes) != 0) uError("failed to add vnodes_num"); + if (tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum) != 0) uError("failed to add masters"); + if (tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode) != 0) uError("failed to add has_mnode"); + if (tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode) != 0) uError("failed to add has_qnode"); + if (tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode) != 0) uError("failed to add has_snode"); } static void monGenDiskJson(SMonInfo *pMonitor) { @@ -515,18 +534,18 @@ static void monGenDiskJson(SMonInfo *pMonitor) { SJson *pLogdirJson = tjsonCreateObject(); if (pLogdirJson == NULL) return; if (tjsonAddItemToObject(pJson, "logdir", pLogdirJson) != 0) return; - (void)tjsonAddStringToObject(pLogdirJson, "name", pLogDesc->name); - (void)tjsonAddDoubleToObject(pLogdirJson, "avail", pLogDesc->size.avail); - (void)tjsonAddDoubleToObject(pLogdirJson, "used", pLogDesc->size.used); - (void)tjsonAddDoubleToObject(pLogdirJson, "total", pLogDesc->size.total); + if (tjsonAddStringToObject(pLogdirJson, "name", pLogDesc->name) != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pLogdirJson, "avail", pLogDesc->size.avail) != 0) uError("failed to add double to json"); + if (tjsonAddDoubleToObject(pLogdirJson, "used", pLogDesc->size.used) != 0) uError("failed to add double to json"); + if (tjsonAddDoubleToObject(pLogdirJson, "total", pLogDesc->size.total) != 0) uError("failed to add double to json"); SJson *pTempdirJson = tjsonCreateObject(); if (pTempdirJson == NULL) return; if (tjsonAddItemToObject(pJson, "tempdir", pTempdirJson) != 0) return; - (void)tjsonAddStringToObject(pTempdirJson, "name", pTempDesc->name); - (void)tjsonAddDoubleToObject(pTempdirJson, "avail", pTempDesc->size.avail); - (void)tjsonAddDoubleToObject(pTempdirJson, "used", pTempDesc->size.used); - (void)tjsonAddDoubleToObject(pTempdirJson, "total", pTempDesc->size.total); + if (tjsonAddStringToObject(pTempdirJson, "name", pTempDesc->name) != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pTempdirJson, "avail", pTempDesc->size.avail) != 0) uError("failed to add double to json"); + if (tjsonAddDoubleToObject(pTempdirJson, "used", pTempDesc->size.used) != 0) uError("failed to add double to json"); + if (tjsonAddDoubleToObject(pTempdirJson, "total", pTempDesc->size.total) != 0) uError("failed to add double to json"); } static const char *monLogLevelStr(ELogLevel level) { @@ -571,26 +590,26 @@ static void monGenLogJson(SMonInfo *pMonitor) { SJson *pLogError = tjsonCreateObject(); if (pLogError == NULL) return; - (void)tjsonAddStringToObject(pLogError, "level", "error"); - (void)tjsonAddDoubleToObject(pLogError, "total", numOfErrorLogs); + if (tjsonAddStringToObject(pLogError, "level", "error") != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pLogError, "total", numOfErrorLogs) != 0) uError("failed to add double to json"); if (tjsonAddItemToArray(pSummaryJson, pLogError) != 0) tjsonDelete(pLogError); SJson *pLogInfo = tjsonCreateObject(); if (pLogInfo == NULL) return; - (void)tjsonAddStringToObject(pLogInfo, "level", "info"); - (void)tjsonAddDoubleToObject(pLogInfo, "total", numOfInfoLogs); + if (tjsonAddStringToObject(pLogInfo, "level", "info") != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pLogInfo, "total", numOfInfoLogs) != 0) uError("failed to add double to json"); if (tjsonAddItemToArray(pSummaryJson, pLogInfo) != 0) tjsonDelete(pLogInfo); SJson *pLogDebug = tjsonCreateObject(); if (pLogDebug == NULL) return; - (void)tjsonAddStringToObject(pLogDebug, "level", "debug"); - (void)tjsonAddDoubleToObject(pLogDebug, "total", numOfDebugLogs); + if (tjsonAddStringToObject(pLogDebug, "level", "debug") != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pLogDebug, "total", numOfDebugLogs) != 0) uError("failed to add double to json"); if (tjsonAddItemToArray(pSummaryJson, pLogDebug) != 0) tjsonDelete(pLogDebug); SJson *pLogTrace = tjsonCreateObject(); if (pLogTrace == NULL) return; - (void)tjsonAddStringToObject(pLogTrace, "level", "trace"); - (void)tjsonAddDoubleToObject(pLogTrace, "total", numOfTraceLogs); + if (tjsonAddStringToObject(pLogTrace, "level", "trace") != 0) uError("failed to add string to json"); + if (tjsonAddDoubleToObject(pLogTrace, "total", numOfTraceLogs) != 0) uError("failed to add double to json"); if (tjsonAddItemToArray(pSummaryJson, pLogTrace) != 0) tjsonDelete(pLogTrace); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 798793bf55..3781479f36 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -14705,7 +14705,7 @@ static int32_t rewriteDropSuperTablewithOpt(STranslateContext* pCxt, SQuery* pQu break; } if (!isdigit(pStmt->tableName[i])) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, "Table does not exist: `%s`.`%s`", + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, "STable not exist: `%s`.`%s`", pStmt->dbName, pStmt->tableName); } } @@ -14715,8 +14715,11 @@ static int32_t rewriteDropSuperTablewithOpt(STranslateContext* pCxt, SQuery* pQu toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name); code = getTargetName(pCxt, &name, pTableName); if (TSDB_CODE_SUCCESS != code) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "%s: db:`%s`, tbuid:`%s`", tstrerror(code), pStmt->dbName, - pStmt->tableName); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "%s: db:`%s`, tbuid:`%s`", + (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST) + ? "STable not exist" + : tstrerror(code), + pStmt->dbName, pStmt->tableName); } tstrncpy(pStmt->tableName, pTableName, TSDB_TABLE_NAME_LEN); // rewrite table uid to table name diff --git a/tests/system-test/1-insert/drop.py b/tests/system-test/1-insert/drop.py index cbc2753e8d..ad9f6c9be6 100644 --- a/tests/system-test/1-insert/drop.py +++ b/tests/system-test/1-insert/drop.py @@ -54,6 +54,7 @@ class TDTestCase: self.ctb_names = [ f'ctb0', 'ctb1', f'aa\u00bf\u200bctb0', f'aa\u00bf\u200bctb1'] self.ntb_names = [ f'ntb0', f'aa\u00bf\u200bntb0', f'ntb1', f'aa\u00bf\u200bntb1'] self.vgroups_opt = f'vgroups 4' + self.err_dup_cnt = 5 def insert_data(self,column_dict,tbname,row_num): insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str) for i in range(row_num): @@ -147,13 +148,31 @@ class TDTestCase: if i == 0: dropTable = f'drop table with `{stb_result[1]}`.`{stb_result[10]}`,' dropStable = f'drop stable with `{stb_result[1]}`.`{stb_result[10]}`,' + dropTableWithSpace = f'drop table with `{stb_result[1]}`.`{stb_result[10]} `,' + dropStableWithSpace = f'drop stable with `{stb_result[1]}`.` {stb_result[10]}`,' + dropStableNotExist = f'drop stable with `{stb_result[1]}`.`{stb_result[10]}_notexist`,' + for _ in range(self.err_dup_cnt): + tdLog.info(dropTableWithSpace[:-1]) + tdSql.error(dropTableWithSpace[:-1], expectErrInfo="Table does not exist", fullMatched=False) + tdLog.info(dropStableWithSpace[:-1]) + tdSql.error(dropStableWithSpace[:-1], expectErrInfo="STable not exist", fullMatched=False) + tdLog.info(dropStableNotExist[:-1]) + tdSql.error(dropStableWithSpace[:-1], expectErrInfo="STable not exist", fullMatched=False) else: dropTable += f'`{stb_result[1]}`.`{stb_result[10]}`,' dropStable += f'`{stb_result[1]}`.`{stb_result[10]}`,' - tdLog.info(dropTable[:-1]) - tdLog.info(dropStable[:-1]) - tdSql.error(dropTable[:-1]) - tdSql.error(dropStable[:-1]) + for _ in range(self.err_dup_cnt): + tdLog.info(dropTable[:-1]) + tdLog.info(dropStable[:-1]) + tdSql.error(dropTable[:-1], expectErrInfo="Cannot drop super table in batch") + tdSql.error(dropStable[:-1], expectErrInfo="syntax error", fullMatched=False) + dropTableWithSpace += f'`{stb_result[1]}`.` {stb_result[10]}`,' + dropStableWithSpace += f'`{stb_result[1]}`.`{stb_result[10]} `,' + for _ in range(self.err_dup_cnt): + tdLog.info(dropTableWithSpace[:-1]) + tdLog.info(dropStableWithSpace[:-1]) + tdSql.error(dropTableWithSpace[:-1], expectErrInfo="Table does not exist", fullMatched=False) + tdSql.error(dropStableWithSpace[:-1], expectErrInfo="syntax error", fullMatched=False) i += 1 i = 0 for stb_result in result: @@ -172,9 +191,10 @@ class TDTestCase: tdSql.checkRows(0) tdSql.query(f'select * from information_schema.ins_tables where db_name like "dbtest_%"') tdSql.checkRows(8) - tdSql.error(f'drop stable with information_schema.`ins_tables`;') - tdSql.error(f'drop stable with performance_schema.`perf_connections`;') - self.drop_table_check_end() + for _ in range(self.err_dup_cnt): + tdSql.error(f'drop stable with information_schema.`ins_tables`;', expectErrInfo="Cannot drop table of system database", fullMatched=False) + tdSql.error(f'drop stable with performance_schema.`perf_connections`;', expectErrInfo="Cannot drop table of system database", fullMatched=False) + self.drop_table_check_end() def drop_table_with_check(self): self.drop_table_check_init() tdSql.query(f'select * from information_schema.ins_tables where db_name like "dbtest_%"') @@ -196,8 +216,9 @@ class TDTestCase: tdSql.checkRows(0) tdSql.query(f'select * from information_schema.ins_stables where db_name like "dbtest_%"') tdSql.checkRows(2) - tdSql.error(f'drop table with information_schema.`ins_tables`;') - tdSql.error(f'drop table with performance_schema.`perf_connections`;') + for _ in range(self.err_dup_cnt): + tdSql.error(f'drop table with information_schema.`ins_tables`;', expectErrInfo="Cannot drop table of system database", fullMatched=False) + tdSql.error(f'drop table with performance_schema.`perf_connections`;', expectErrInfo="Cannot drop table of system database", fullMatched=False) self.drop_table_check_end() def drop_table_with_check_tsma(self): tdSql.execute(f'create database if not exists {self.dbname} {self.vgroups_opt}')