Merge branch '3.0' into enh/TD-31895-3.0
This commit is contained in:
commit
7203823447
|
@ -347,11 +347,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
|
||||
pTaskInfo, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
|
||||
pInfo->indexOfBufferedRes += 1;
|
||||
|
@ -414,11 +410,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
|
||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||
pInfo->pRes->info.rows, pTaskInfo, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -938,7 +938,9 @@ _return:
|
|||
}
|
||||
|
||||
if (code) {
|
||||
qError("%s failed since %s", __func__, tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
} else {
|
||||
seqStableJoinComposeRes(pStbJoin, *pRes);
|
||||
}
|
||||
|
|
|
@ -1094,6 +1094,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
|
|||
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||
SExchangeInfo* pExchangeInfo = pOperator->info;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
|
||||
(pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1101,23 +1102,26 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pExchangeInfo->dynamicOp) {
|
||||
code = addDynamicExchangeSource(pOperator);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
if (!pExchangeInfo->seqLoadData) {
|
||||
int32_t code = prepareConcurrentlyLoad(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
code = prepareConcurrentlyLoad(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pExchangeInfo->openedTs = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -450,7 +450,7 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
|
|||
|
||||
QRY_PARAM_CHECK(ppRes);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
|
@ -502,6 +502,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
} else {
|
||||
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
|
||||
}
|
||||
|
@ -1533,8 +1534,9 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock**
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
|
|
@ -993,6 +993,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
SHJoinOperatorInfo* pJoin = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSDataBlock* pRes = pJoin->finBlk;
|
||||
int64_t st = 0;
|
||||
|
||||
|
@ -1003,7 +1004,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
pRes->info.rows = 0;
|
||||
goto _return;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (!pJoin->keyHashBuilt) {
|
||||
|
@ -1011,13 +1012,10 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
bool queryDone = false;
|
||||
code = hJoinBuildHash(pOperator, &queryDone);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (queryDone) {
|
||||
goto _return;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1025,17 +1023,11 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
if (pJoin->ctx.rowRemains) {
|
||||
code = (*pJoin->joinFp)(pOperator);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
||||
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pRes->info.rows > 0) {
|
||||
|
@ -1055,10 +1047,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
pJoin->execInfo.probeBlkRows += pBlock->info.rows;
|
||||
|
||||
code = hJoinPrepareStart(pOperator, pBlock);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
|
||||
continue;
|
||||
|
@ -1066,10 +1055,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
|
||||
code = doFilter(pRes, pJoin->pFinFilter, NULL);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pRes->info.rows > 0) {
|
||||
|
@ -1077,11 +1063,15 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
|
|||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
_end:
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (pRes->info.rows > 0) {
|
||||
*pResBlock = pRes;
|
||||
}
|
||||
|
|
|
@ -1731,6 +1731,7 @@ int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBloc
|
|||
if (pJoin->pFinFilter != NULL) {
|
||||
code = doFilter(pBlock, pJoin->pFinFilter, NULL);
|
||||
if (code) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pJoin->errCode = code;
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
|
||||
}
|
||||
|
|
|
@ -492,6 +492,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
|
@ -501,6 +503,8 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
|
@ -509,18 +513,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
code = pOperator->fpSet._openFn(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
|
||||
code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if ((*pResBlock) != NULL) {
|
||||
|
@ -530,6 +528,12 @@ int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -881,14 +881,17 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i
|
|||
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
|
||||
QRY_PARAM_CHECK(pRes);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = pOperator->fpSet.getNextFn(pOperator, pRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
} else {
|
||||
code = pOperator->fpSet.getNextFn(pOperator, pRes);
|
||||
if (code) {
|
||||
pOperator->pTaskInfo->code = code;
|
||||
}
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -270,6 +270,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
SSDataBlock* pRes = pInfo->pRes;
|
||||
SSDataBlock* pFinalRes = pProjectInfo->pFinalRes;
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int64_t st = 0;
|
||||
int32_t order = pInfo->inputTsOrder;
|
||||
int32_t scanFlag = 0;
|
||||
|
@ -290,9 +291,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
if (downstream == NULL) {
|
||||
code = doGenerateSourceData(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pProjectInfo->outputIgnoreGroup) {
|
||||
pRes->info.id.groupId = 0;
|
||||
|
@ -348,20 +347,14 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
}
|
||||
|
||||
code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
|
||||
if (code) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
|
||||
pProjectInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
status = doIngroupLimitOffset(pLimitInfo, pBlock->info.id.groupId, pInfo->pRes, pOperator);
|
||||
if (status == PROJECT_RETRIEVE_CONTINUE) {
|
||||
|
@ -377,11 +370,8 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
pFinalRes->info.version = pRes->info.version;
|
||||
|
||||
// continue merge data, ignore the group id
|
||||
int32_t ret = blockDataMerge(pFinalRes, pRes);
|
||||
if (ret < 0) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
code = blockDataMerge(pFinalRes, pRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold && (pOperator->status != OP_EXEC_DONE)) {
|
||||
continue;
|
||||
|
@ -390,10 +380,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
// do apply filter
|
||||
code = doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
|
||||
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
|
||||
|
@ -404,10 +391,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
// do apply filter
|
||||
if (pRes->info.rows > 0) {
|
||||
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pRes->info.rows == 0) {
|
||||
continue;
|
||||
|
@ -436,6 +420,13 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
}
|
||||
|
||||
*pResBlock = (p->info.rows > 0)? p:NULL;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -578,14 +569,15 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
int64_t st = 0;
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pRes);
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
|
@ -637,10 +629,7 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
}
|
||||
|
||||
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
size_t rows = pInfo->pRes->info.rows;
|
||||
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -658,6 +647,13 @@ int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
}
|
||||
|
||||
*pResBlock = (rows > 0) ? pInfo->pRes : NULL;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -951,7 +951,8 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
|
||||
return pTaskInfo->code;
|
||||
code = pTaskInfo->code;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -996,6 +997,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1416,6 +1418,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -2944,8 +2947,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
if (pResult && pResult->info.rows > 0) {
|
||||
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
||||
code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
qDebug("tmqsnap doQueueScan get data utid:%" PRId64 "", pResult->info.id.uid);
|
||||
if (pResult->info.rows > 0 || code != TSDB_CODE_SUCCESS) {
|
||||
if (pResult->info.rows > 0) {
|
||||
(*ppRes) = pResult;
|
||||
return code;
|
||||
}
|
||||
|
@ -3009,8 +3013,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
@ -3340,9 +3345,7 @@ FETCH_NEXT_BLOCK:
|
|||
if (pBlock->info.parTbName[0]) {
|
||||
code =
|
||||
pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
// TODO move into scan
|
||||
|
@ -3482,7 +3485,7 @@ FETCH_NEXT_BLOCK:
|
|||
return code;
|
||||
}
|
||||
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
|
||||
lino);
|
||||
__LINE__);
|
||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} break;
|
||||
|
@ -3496,7 +3499,7 @@ FETCH_NEXT_BLOCK:
|
|||
return code;
|
||||
}
|
||||
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
|
||||
lino);
|
||||
__LINE__);
|
||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} break;
|
||||
|
@ -3658,8 +3661,9 @@ FETCH_NEXT_BLOCK:
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
@ -3730,6 +3734,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
if (pBlock && pBlock->info.rows > 0) {
|
||||
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
|
||||
code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
|
||||
(*ppRes) = pBlock;
|
||||
return code;
|
||||
|
@ -3741,7 +3746,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (code != 0) {
|
||||
tDeleteSchemaWrapper(mtInfo.schema);
|
||||
goto _end;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
STqOffsetVal offset = {0};
|
||||
if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
|
||||
|
@ -3831,6 +3836,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
(*ppRes) = NULL;
|
||||
|
@ -4677,6 +4683,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -4684,6 +4691,7 @@ _end:
|
|||
|
||||
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
@ -4699,10 +4707,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
int32_t size = 0;
|
||||
code = tableListGetSize(pInfo->pTableListInfo, &size);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (size == 0) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
|
@ -4716,11 +4721,11 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||
if (code == TSDB_CODE_OUT_OF_MEMORY) {
|
||||
break;
|
||||
} else {
|
||||
if (code != TSDB_CODE_OUT_OF_MEMORY) {
|
||||
// ignore other error
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
++count;
|
||||
if (++pInfo->curPos >= size) {
|
||||
|
@ -4744,6 +4749,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
|
||||
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -5429,6 +5441,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
} else {
|
||||
(*ppRes) = pBlock;
|
||||
}
|
||||
|
@ -5945,6 +5958,7 @@ _end:
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
} else {
|
||||
(*ppRes) = pBlock;
|
||||
}
|
||||
|
@ -6460,7 +6474,12 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
}
|
||||
|
||||
code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
|
||||
if ((pRes->info.rows > 0) && (code == 0)) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed since %s", __func__, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (pRes->info.rows > 0) {
|
||||
*ppRes = pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -349,82 +349,84 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
|||
int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||
SSortOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSortSource* pSource =NULL;
|
||||
|
||||
if (OPTR_IS_OPENED(pOperator)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
pInfo->startTs = taosGetTimestampUs();
|
||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||
pInfo->pSortHandle = NULL;
|
||||
int32_t code =
|
||||
code =
|
||||
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
|
||||
pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||
|
||||
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
if (pSource == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
pSource = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
|
||||
|
||||
pSource->param = pOperator->pDownstream[0];
|
||||
pSource->onlyRef = true;
|
||||
|
||||
code = tsortAddSource(pInfo->pSortHandle, pSource);
|
||||
if (code) {
|
||||
taosMemoryFree(pSource);
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pSource = NULL;
|
||||
|
||||
code = tsortOpen(pInfo->pSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
} else {
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
|
||||
_end:
|
||||
if (pSource) {
|
||||
taosMemoryFree(pSource);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
code = pOperator->fpSet._openFn(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// multi-group case not handle here
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (1) {
|
||||
if (tsortIsClosed(pInfo->pSortHandle)) {
|
||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
pInfo->matchInfo.pList, pInfo, &pBlock);
|
||||
if (pBlock == NULL || code != 0) {
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (pBlock == NULL) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (blockDataGetNumOfRows(pBlock) == 0) {
|
||||
continue;
|
||||
|
@ -443,6 +445,12 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
}
|
||||
|
||||
*pResBlock = blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -692,16 +700,16 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
QRY_PARAM_CHECK(pResBlock);
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SGroupSortOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = pOperator->fpSet._openFn(pOperator);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
|
@ -714,30 +722,25 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
|
||||
pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
|
||||
code = beginSortGroup(pOperator);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->pCurrSortHandle != NULL) {
|
||||
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
|
||||
code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
|
||||
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
|
||||
pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return code;
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
pInfo->matchInfo.pList, pInfo, &pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (pBlock != NULL) {
|
||||
pBlock->info.id.groupId = pInfo->currGroupId;
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
|
@ -748,9 +751,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
(void) finishSortGroup(pOperator);
|
||||
pInfo->currGroupId = pInfo->prefetchedSortInput->info.id.groupId;
|
||||
code = beginSortGroup(pOperator);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
|
||||
(void) finishSortGroup(pOperator);
|
||||
setOperatorCompleted(pOperator);
|
||||
|
@ -759,6 +760,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -744,8 +744,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
|
|
@ -179,7 +179,7 @@ _end:
|
|||
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||
}
|
||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||
qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
|
||||
qDebug("===stream===set event cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
|
||||
pCurWin->winInfo.sessionWin.win.ekey);
|
||||
|
||||
_error:
|
||||
|
@ -233,7 +233,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
|
|||
pWinInfo->pWinFlag->endFlag = ends[i];
|
||||
} else if (pWin->ekey == pTsData[i]) {
|
||||
pWinInfo->pWinFlag->endFlag |= ends[i];
|
||||
} else {
|
||||
} else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
|
||||
*pRebuild = true;
|
||||
pWinInfo->pWinFlag->endFlag |= ends[i];
|
||||
(*pWinRow) = i + 1 - start;
|
||||
|
@ -734,8 +734,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
|
|
@ -1155,8 +1155,9 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setOperatorCompleted(pOperator);
|
||||
resetStreamFillInfo(pInfo);
|
||||
|
|
|
@ -1802,8 +1802,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -3571,8 +3572,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -4076,8 +4078,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
clearFunctionContext(&pOperator->exprSupp);
|
||||
|
@ -4816,8 +4819,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -5216,6 +5220,7 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
code = TSDB_CODE_SUCCESS;
|
||||
break;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
}
|
||||
|
@ -5258,8 +5263,9 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
|
@ -5795,8 +5801,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
(*ppRes) = NULL;
|
||||
return code;
|
||||
|
|
|
@ -2049,7 +2049,7 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
|||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
return pTaskInfo->code;
|
||||
break;
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
@ -2092,12 +2092,18 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
|||
continue;
|
||||
}
|
||||
(*ppRes) = pBlock;
|
||||
return pTaskInfo->code;
|
||||
} else {
|
||||
(*ppRes) = NULL;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
_end:
|
||||
if (pTaskInfo->code) {
|
||||
qError("%s failed since %s", __func__, tstrerror(pTaskInfo->code));
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
|
||||
|
|
|
@ -100,7 +100,9 @@ extern char* tsMonFwUri;
|
|||
#define VNODE_ROLE "taosd_vnodes_info:role"
|
||||
|
||||
void monInitMonitorFW(){
|
||||
(void)taos_collector_registry_default_init();
|
||||
if (taos_collector_registry_default_init() != 0) {
|
||||
uError("failed to init default collector registry");
|
||||
}
|
||||
|
||||
tsMonitor.metrics = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
taos_gauge_t *gauge = NULL;
|
||||
|
@ -115,7 +117,9 @@ void monInitMonitorFW(){
|
|||
for(int32_t i = 0; i < 25; i++){
|
||||
gauge= taos_gauge_new(dnodes_gauges[i], "", dnodes_label_count, dnodes_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to delete metric %s", dnodes_gauges[i]);
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, dnodes_gauges[i], strlen(dnodes_gauges[i]), &gauge, sizeof(taos_gauge_t *)) !=
|
||||
0) {
|
||||
|
@ -129,7 +133,9 @@ void monInitMonitorFW(){
|
|||
for(int32_t i = 0; i < 3; i++){
|
||||
gauge= taos_gauge_new(dnodes_data_gauges[i], "", dnodes_data_label_count, dnodes_data_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to delete metric %s", dnodes_data_gauges[i]);
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, dnodes_data_gauges[i], strlen(dnodes_data_gauges[i]), &gauge,
|
||||
sizeof(taos_gauge_t *)) != 0) {
|
||||
|
@ -143,7 +149,9 @@ void monInitMonitorFW(){
|
|||
for(int32_t i = 0; i < 3; i++){
|
||||
gauge= taos_gauge_new(dnodes_log_gauges[i], "", dnodes_log_label_count, dnodes_log_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to delete metric %s", dnodes_log_gauges[i]);
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, dnodes_log_gauges[i], strlen(dnodes_log_gauges[i]), &gauge,
|
||||
sizeof(taos_gauge_t *)) != 0) {
|
||||
|
@ -154,7 +162,9 @@ void monInitMonitorFW(){
|
|||
|
||||
void monCleanupMonitorFW(){
|
||||
taosHashCleanup(tsMonitor.metrics);
|
||||
(void)taos_collector_registry_destroy(TAOS_COLLECTOR_REGISTRY_DEFAULT);
|
||||
if (taos_collector_registry_destroy(TAOS_COLLECTOR_REGISTRY_DEFAULT) != 0) {
|
||||
uError("failed to destroy default collector registry");
|
||||
}
|
||||
TAOS_COLLECTOR_REGISTRY_DEFAULT = NULL;
|
||||
}
|
||||
|
||||
|
@ -174,7 +184,9 @@ void monGenClusterInfoTable(SMonInfo *pMonitor){
|
|||
uError("failed to delete metric %s", metric_names[i]);
|
||||
}
|
||||
|
||||
(void)taosHashRemove(tsMonitor.metrics, metric_names[i], strlen(metric_names[i]));
|
||||
if (taosHashRemove(tsMonitor.metrics, metric_names[i], strlen(metric_names[i])) != 0) {
|
||||
uError("failed to remove metric %s", metric_names[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if(pBasicInfo->cluster_id == 0) {
|
||||
|
@ -191,7 +203,9 @@ void monGenClusterInfoTable(SMonInfo *pMonitor){
|
|||
for(int32_t i = 0; i < 18; i++){
|
||||
gauge= taos_gauge_new(metric_names[i], "", label_count, sample_labels1);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to delete metric %s", metric_names[i]);
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, metric_names[i], strlen(metric_names[i]), &gauge, sizeof(taos_gauge_t *)) != 0) {
|
||||
uError("failed to add cluster gauge at%d:%s", i, metric_names[i]);
|
||||
|
@ -317,11 +331,15 @@ void monGenVgroupInfoTable(SMonInfo *pMonitor){
|
|||
const char *vgroup_sample_labels[] = {"cluster_id", "vgroup_id", "database_name"};
|
||||
taos_gauge_t *tableNumGauge = taos_gauge_new(TABLES_NUM, "", vgroup_label_count, vgroup_sample_labels);
|
||||
if(taos_collector_registry_register_metric(tableNumGauge) == 1){
|
||||
(void)taos_counter_destroy(tableNumGauge);
|
||||
if (taos_counter_destroy(tableNumGauge) != 0) {
|
||||
uError("failed to delete metric " TABLES_NUM);
|
||||
}
|
||||
}
|
||||
taos_gauge_t *statusGauge = taos_gauge_new(STATUS, "", vgroup_label_count, vgroup_sample_labels);
|
||||
if(taos_collector_registry_register_metric(statusGauge) == 1){
|
||||
(void)taos_counter_destroy(statusGauge);
|
||||
if (taos_counter_destroy(statusGauge) != 0) {
|
||||
uError("failed to delete metric " STATUS);
|
||||
}
|
||||
}
|
||||
|
||||
char cluster_id[TSDB_CLUSTER_ID_LEN] = {0};
|
||||
|
@ -530,7 +548,9 @@ void monGenDnodeStatusInfoTable(SMonInfo *pMonitor){
|
|||
|
||||
gauge= taos_gauge_new(DNODE_STATUS, "", dnodes_label_count, dnodes_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to delete metric " DNODE_STATUS);
|
||||
}
|
||||
}
|
||||
|
||||
char cluster_id[TSDB_CLUSTER_ID_LEN];
|
||||
|
@ -633,7 +653,9 @@ void monGenMnodeRoleTable(SMonInfo *pMonitor){
|
|||
uError("failed to delete metric %s", mnodes_role_gauges[i]);
|
||||
}
|
||||
|
||||
(void)taosHashRemove(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i]));
|
||||
if (taosHashRemove(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i])) != 0) {
|
||||
uError("failed to remove metric %s", mnodes_role_gauges[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster;
|
||||
|
@ -647,7 +669,9 @@ void monGenMnodeRoleTable(SMonInfo *pMonitor){
|
|||
for(int32_t i = 0; i < 1; i++){
|
||||
gauge= taos_gauge_new(mnodes_role_gauges[i], "", mnodes_role_label_count, mnodes_role_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to destroy gauge");
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, mnodes_role_gauges[i], strlen(mnodes_role_gauges[i]), &gauge,
|
||||
sizeof(taos_gauge_t *)) != 0) {
|
||||
|
@ -702,7 +726,9 @@ void monGenVnodeRoleTable(SMonInfo *pMonitor){
|
|||
uError("failed to delete metric %s", vnodes_role_gauges[i]);
|
||||
}
|
||||
|
||||
(void)taosHashRemove(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i]));
|
||||
if (taosHashRemove(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i])) != 0) {
|
||||
uError("failed to remove metric %s", vnodes_role_gauges[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup;
|
||||
|
@ -716,7 +742,9 @@ void monGenVnodeRoleTable(SMonInfo *pMonitor){
|
|||
for(int32_t i = 0; i < 1; i++){
|
||||
gauge= taos_gauge_new(vnodes_role_gauges[i], "", vnodes_role_label_count, vnodes_role_sample_labels);
|
||||
if(taos_collector_registry_register_metric(gauge) == 1){
|
||||
(void)taos_counter_destroy(gauge);
|
||||
if (taos_counter_destroy(gauge) != 0) {
|
||||
uError("failed to destroy gauge");
|
||||
}
|
||||
}
|
||||
if (taosHashPut(tsMonitor.metrics, vnodes_role_gauges[i], strlen(vnodes_role_gauges[i]), &gauge,
|
||||
sizeof(taos_gauge_t *)) != 0) {
|
||||
|
@ -774,7 +802,9 @@ void monSendPromReport() {
|
|||
tmp) != 0) {
|
||||
uError("failed to send monitor msg");
|
||||
} else {
|
||||
(void)taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT);
|
||||
if (taos_collector_registry_clear_batch(TAOS_COLLECTOR_REGISTRY_DEFAULT) != 0) {
|
||||
uError("failed to clear batch");
|
||||
}
|
||||
}
|
||||
taosMemoryFreeClear(pCont);
|
||||
}
|
||||
|
|
|
@ -145,7 +145,9 @@ void monInitVnode() {
|
|||
counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", label_count, sample_labels);
|
||||
uDebug("new metric:%p", counter);
|
||||
if (taos_collector_registry_register_metric(counter) == 1) {
|
||||
(void)taos_counter_destroy(counter);
|
||||
if (taos_counter_destroy(counter) != 0) {
|
||||
uError("failed to destroy metric:%p", counter);
|
||||
}
|
||||
uError("failed to register metric:%p", counter);
|
||||
} else {
|
||||
tsInsertCounter = counter;
|
||||
|
@ -226,14 +228,17 @@ static void monGenBasicJson(SMonInfo *pMonitor) {
|
|||
|
||||
SJson *pJson = pMonitor->pJson;
|
||||
char buf[40] = {0};
|
||||
(void)taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI);
|
||||
if (taosFormatUtcTime(buf, sizeof(buf), pMonitor->curTime, TSDB_TIME_PRECISION_MILLI) != 0) {
|
||||
uError("failed to format time");
|
||||
return;
|
||||
}
|
||||
|
||||
(void)tjsonAddStringToObject(pJson, "ts", buf);
|
||||
(void)tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id);
|
||||
(void)tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep);
|
||||
if (tjsonAddStringToObject(pJson, "ts", buf) != 0) uError("failed to add ts");
|
||||
if (tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id) != 0) uError("failed to add dnode_id");
|
||||
if (tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep) != 0) uError("failed to add dnode_ep");
|
||||
snprintf(buf, sizeof(buf), "%" PRId64, pInfo->cluster_id);
|
||||
(void)tjsonAddStringToObject(pJson, "cluster_id", buf);
|
||||
(void)tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol);
|
||||
if (tjsonAddStringToObject(pJson, "cluster_id", buf) != 0) uError("failed to add cluster_id");
|
||||
if (tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol) != 0) uError("failed to add protocol");
|
||||
}
|
||||
|
||||
static void monGenBasicJsonBasic(SMonInfo *pMonitor) {
|
||||
|
@ -244,12 +249,12 @@ static void monGenBasicJsonBasic(SMonInfo *pMonitor) {
|
|||
char buf[40] = {0};
|
||||
|
||||
sprintf(buf, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
||||
(void)tjsonAddStringToObject(pJson, "ts", buf);
|
||||
(void)tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id);
|
||||
(void)tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep);
|
||||
if (tjsonAddStringToObject(pJson, "ts", buf) != 0) uError("failed to add ts");
|
||||
if (tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id) != 0) uError("failed to add dnode_id");
|
||||
if (tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep) != 0) uError("failed to add dnode_ep");
|
||||
snprintf(buf, sizeof(buf), "%" PRId64, pInfo->cluster_id);
|
||||
(void)tjsonAddStringToObject(pJson, "cluster_id", buf);
|
||||
(void)tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol);
|
||||
if (tjsonAddStringToObject(pJson, "cluster_id", buf) != 0) uError("failed to add cluster_id");
|
||||
if (tjsonAddDoubleToObject(pJson, "protocol", pInfo->protocol) != 0) uError("failed to add protocol");
|
||||
}
|
||||
|
||||
static void monGenClusterJson(SMonInfo *pMonitor) {
|
||||
|
@ -263,21 +268,24 @@ static void monGenClusterJson(SMonInfo *pMonitor) {
|
|||
return;
|
||||
}
|
||||
|
||||
(void)tjsonAddStringToObject(pJson, "first_ep", pInfo->first_ep);
|
||||
(void)tjsonAddDoubleToObject(pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id);
|
||||
(void)tjsonAddStringToObject(pJson, "version", pInfo->version);
|
||||
(void)tjsonAddDoubleToObject(pJson, "master_uptime", pInfo->master_uptime);
|
||||
(void)tjsonAddDoubleToObject(pJson, "monitor_interval", pInfo->monitor_interval);
|
||||
(void)tjsonAddDoubleToObject(pJson, "dbs_total", pInfo->dbs_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "tbs_total", pInfo->tbs_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "stbs_total", pInfo->stbs_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "vgroups_total", pInfo->vgroups_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "vgroups_alive", pInfo->vgroups_alive);
|
||||
(void)tjsonAddDoubleToObject(pJson, "vnodes_total", pInfo->vnodes_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "vnodes_alive", pInfo->vnodes_alive);
|
||||
(void)tjsonAddDoubleToObject(pJson, "connections_total", pInfo->connections_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "topics_total", pInfo->topics_toal);
|
||||
(void)tjsonAddDoubleToObject(pJson, "streams_total", pInfo->streams_total);
|
||||
if (tjsonAddStringToObject(pJson, "first_ep", pInfo->first_ep) != 0) uError("failed to add first_ep");
|
||||
if (tjsonAddDoubleToObject(pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id) != 0)
|
||||
uError("failed to add first_ep_dnode_id");
|
||||
if (tjsonAddStringToObject(pJson, "version", pInfo->version) != 0) uError("failed to add version");
|
||||
if (tjsonAddDoubleToObject(pJson, "master_uptime", pInfo->master_uptime) != 0) uError("failed to add master_uptime");
|
||||
if (tjsonAddDoubleToObject(pJson, "monitor_interval", pInfo->monitor_interval) != 0)
|
||||
uError("failed to add monitor_interval");
|
||||
if (tjsonAddDoubleToObject(pJson, "dbs_total", pInfo->dbs_total) != 0) uError("failed to add dbs_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "tbs_total", pInfo->tbs_total) != 0) uError("failed to add tbs_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "stbs_total", pInfo->stbs_total) != 0) uError("failed to add stbs_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "vgroups_total", pInfo->vgroups_total) != 0) uError("failed to add vgroups_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "vgroups_alive", pInfo->vgroups_alive) != 0) uError("failed to add vgroups_alive");
|
||||
if (tjsonAddDoubleToObject(pJson, "vnodes_total", pInfo->vnodes_total) != 0) uError("failed to add vnodes_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "vnodes_alive", pInfo->vnodes_alive) != 0) uError("failed to add vnodes_alive");
|
||||
if (tjsonAddDoubleToObject(pJson, "connections_total", pInfo->connections_total) != 0)
|
||||
uError("failed to add connections_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "topics_total", pInfo->topics_toal) != 0) uError("failed to add topics_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "streams_total", pInfo->streams_total) != 0) uError("failed to add streams_total");
|
||||
|
||||
SJson *pDnodesJson = tjsonAddArrayToObject(pJson, "dnodes");
|
||||
if (pDnodesJson == NULL) return;
|
||||
|
@ -287,9 +295,9 @@ static void monGenClusterJson(SMonInfo *pMonitor) {
|
|||
if (pDnodeJson == NULL) continue;
|
||||
|
||||
SMonDnodeDesc *pDnodeDesc = taosArrayGet(pInfo->dnodes, i);
|
||||
(void)tjsonAddDoubleToObject(pDnodeJson, "dnode_id", pDnodeDesc->dnode_id);
|
||||
(void)tjsonAddStringToObject(pDnodeJson, "dnode_ep", pDnodeDesc->dnode_ep);
|
||||
(void)tjsonAddStringToObject(pDnodeJson, "status", pDnodeDesc->status);
|
||||
if (tjsonAddDoubleToObject(pDnodeJson, "dnode_id", pDnodeDesc->dnode_id) != 0) uError("failed to add dnode_id");
|
||||
if (tjsonAddStringToObject(pDnodeJson, "dnode_ep", pDnodeDesc->dnode_ep) != 0) uError("failed to add dnode_ep");
|
||||
if (tjsonAddStringToObject(pDnodeJson, "status", pDnodeDesc->status) != 0) uError("failed to add status");
|
||||
|
||||
if (tjsonAddItemToArray(pDnodesJson, pDnodeJson) != 0) tjsonDelete(pDnodeJson);
|
||||
}
|
||||
|
@ -302,9 +310,9 @@ static void monGenClusterJson(SMonInfo *pMonitor) {
|
|||
if (pMnodeJson == NULL) continue;
|
||||
|
||||
SMonMnodeDesc *pMnodeDesc = taosArrayGet(pInfo->mnodes, i);
|
||||
(void)tjsonAddDoubleToObject(pMnodeJson, "mnode_id", pMnodeDesc->mnode_id);
|
||||
(void)tjsonAddStringToObject(pMnodeJson, "mnode_ep", pMnodeDesc->mnode_ep);
|
||||
(void)tjsonAddStringToObject(pMnodeJson, "role", pMnodeDesc->role);
|
||||
if (tjsonAddDoubleToObject(pMnodeJson, "mnode_id", pMnodeDesc->mnode_id) != 0) uError("failed to add mnode_id");
|
||||
if (tjsonAddStringToObject(pMnodeJson, "mnode_ep", pMnodeDesc->mnode_ep) != 0) uError("failed to add mnode_ep");
|
||||
if (tjsonAddStringToObject(pMnodeJson, "role", pMnodeDesc->role) != 0) uError("failed to add role");
|
||||
|
||||
if (tjsonAddItemToArray(pMnodesJson, pMnodeJson) != 0) tjsonDelete(pMnodeJson);
|
||||
}
|
||||
|
@ -314,11 +322,11 @@ static void monGenClusterJsonBasic(SMonInfo *pMonitor) {
|
|||
SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster;
|
||||
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
|
||||
|
||||
// (void)tjsonAddStringToObject(pMonitor->pJson, "first_ep", pInfo->first_ep);
|
||||
(void)tjsonAddStringToObject(pMonitor->pJson, "first_ep", tsFirst);
|
||||
(void)tjsonAddDoubleToObject(pMonitor->pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id);
|
||||
(void)tjsonAddStringToObject(pMonitor->pJson, "cluster_version", pInfo->version);
|
||||
// (void)tjsonAddDoubleToObject(pMonitor->pJson, "monitor_interval", pInfo->monitor_interval);
|
||||
if (tjsonAddStringToObject(pMonitor->pJson, "first_ep", tsFirst) != 0) uError("failed to add first_ep");
|
||||
if (tjsonAddDoubleToObject(pMonitor->pJson, "first_ep_dnode_id", pInfo->first_ep_dnode_id) != 0)
|
||||
uError("failed to add first_ep_dnode_id");
|
||||
if (tjsonAddStringToObject(pMonitor->pJson, "cluster_version", pInfo->version) != 0)
|
||||
uError("failed to add cluster_version");
|
||||
}
|
||||
|
||||
static void monGenVgroupJson(SMonInfo *pMonitor) {
|
||||
|
@ -337,10 +345,13 @@ static void monGenVgroupJson(SMonInfo *pMonitor) {
|
|||
}
|
||||
|
||||
SMonVgroupDesc *pVgroupDesc = taosArrayGet(pInfo->vgroups, i);
|
||||
(void)tjsonAddDoubleToObject(pVgroupJson, "vgroup_id", pVgroupDesc->vgroup_id);
|
||||
(void)tjsonAddStringToObject(pVgroupJson, "database_name", pVgroupDesc->database_name);
|
||||
(void)tjsonAddDoubleToObject(pVgroupJson, "tables_num", pVgroupDesc->tables_num);
|
||||
(void)tjsonAddStringToObject(pVgroupJson, "status", pVgroupDesc->status);
|
||||
if (tjsonAddDoubleToObject(pVgroupJson, "vgroup_id", pVgroupDesc->vgroup_id) != 0)
|
||||
uError("failed to add vgroup_id");
|
||||
if (tjsonAddStringToObject(pVgroupJson, "database_name", pVgroupDesc->database_name) != 0)
|
||||
uError("failed to add database_name");
|
||||
if (tjsonAddDoubleToObject(pVgroupJson, "tables_num", pVgroupDesc->tables_num) != 0)
|
||||
uError("failed to add tables_num");
|
||||
if (tjsonAddStringToObject(pVgroupJson, "status", pVgroupDesc->status) != 0) uError("failed to add status");
|
||||
|
||||
SJson *pVnodesJson = tjsonAddArrayToObject(pVgroupJson, "vnodes");
|
||||
if (pVnodesJson == NULL) continue;
|
||||
|
@ -352,8 +363,9 @@ static void monGenVgroupJson(SMonInfo *pMonitor) {
|
|||
SJson *pVnodeJson = tjsonCreateObject();
|
||||
if (pVnodeJson == NULL) continue;
|
||||
|
||||
(void)tjsonAddDoubleToObject(pVnodeJson, "dnode_id", pVnodeDesc->dnode_id);
|
||||
(void)tjsonAddStringToObject(pVnodeJson, "vnode_role", pVnodeDesc->vnode_role);
|
||||
if (tjsonAddDoubleToObject(pVnodeJson, "dnode_id", pVnodeDesc->dnode_id) != 0) uError("failed to add dnode_id");
|
||||
if (tjsonAddStringToObject(pVnodeJson, "vnode_role", pVnodeDesc->vnode_role) != 0)
|
||||
uError("failed to add vnode_role");
|
||||
|
||||
if (tjsonAddItemToArray(pVnodesJson, pVnodeJson) != 0) tjsonDelete(pVnodeJson);
|
||||
}
|
||||
|
@ -376,8 +388,9 @@ static void monGenStbJson(SMonInfo *pMonitor) {
|
|||
}
|
||||
|
||||
SMonStbDesc *pStbDesc = taosArrayGet(pInfo->stbs, i);
|
||||
(void)tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name);
|
||||
(void)tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name);
|
||||
if (tjsonAddStringToObject(pStbJson, "stb_name", pStbDesc->stb_name) != 0) uError("failed to add stb_name");
|
||||
if (tjsonAddStringToObject(pStbJson, "database_name", pStbDesc->database_name) != 0)
|
||||
uError("failed to add database_name");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -392,9 +405,11 @@ static void monGenGrantJson(SMonInfo *pMonitor) {
|
|||
return;
|
||||
}
|
||||
|
||||
(void)tjsonAddDoubleToObject(pJson, "expire_time", pInfo->expire_time);
|
||||
(void)tjsonAddDoubleToObject(pJson, "timeseries_used", pInfo->timeseries_used);
|
||||
(void)tjsonAddDoubleToObject(pJson, "timeseries_total", pInfo->timeseries_total);
|
||||
if (tjsonAddDoubleToObject(pJson, "expire_time", pInfo->expire_time) != 0) uError("failed to add expire_time");
|
||||
if (tjsonAddDoubleToObject(pJson, "timeseries_used", pInfo->timeseries_used) != 0)
|
||||
uError("failed to add timeseries_used");
|
||||
if (tjsonAddDoubleToObject(pJson, "timeseries_total", pInfo->timeseries_total) != 0)
|
||||
uError("failed to add timeseries_total");
|
||||
}
|
||||
|
||||
static void monGenDnodeJson(SMonInfo *pMonitor) {
|
||||
|
@ -451,36 +466,40 @@ static void monGenDnodeJson(SMonInfo *pMonitor) {
|
|||
double io_read_disk_rate = io_read_disk / interval;
|
||||
double io_write_disk_rate = io_write_disk / interval;
|
||||
|
||||
(void)tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime);
|
||||
(void)tjsonAddDoubleToObject(pJson, "cpu_engine", cpu_engine);
|
||||
(void)tjsonAddDoubleToObject(pJson, "cpu_system", pSys->cpu_system);
|
||||
(void)tjsonAddDoubleToObject(pJson, "cpu_cores", pSys->cpu_cores);
|
||||
(void)tjsonAddDoubleToObject(pJson, "mem_engine", mem_engine);
|
||||
(void)tjsonAddDoubleToObject(pJson, "mem_system", pSys->mem_system);
|
||||
(void)tjsonAddDoubleToObject(pJson, "mem_total", pSys->mem_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "disk_engine", pSys->disk_engine);
|
||||
(void)tjsonAddDoubleToObject(pJson, "disk_used", pSys->disk_used);
|
||||
(void)tjsonAddDoubleToObject(pJson, "disk_total", pSys->disk_total);
|
||||
(void)tjsonAddDoubleToObject(pJson, "net_in", net_in_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "net_out", net_out_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "io_read", io_read_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "io_write", io_write_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_select", pStat->numOfSelectReqs);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert", pStat->numOfInsertReqs);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert_success", pStat->numOfInsertSuccessReqs);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert_batch", pStat->numOfBatchInsertReqs);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pStat->numOfBatchInsertSuccessReqs);
|
||||
(void)tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate);
|
||||
(void)tjsonAddDoubleToObject(pJson, "errors", pStat->errors);
|
||||
(void)tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes);
|
||||
(void)tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum);
|
||||
(void)tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode);
|
||||
(void)tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode);
|
||||
(void)tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode);
|
||||
if (tjsonAddDoubleToObject(pJson, "uptime", pInfo->uptime) != 0) uError("failed to add uptime");
|
||||
if (tjsonAddDoubleToObject(pJson, "cpu_engine", cpu_engine) != 0) uError("failed to add cpu_engine");
|
||||
if (tjsonAddDoubleToObject(pJson, "cpu_system", pSys->cpu_system) != 0) uError("failed to add cpu_system");
|
||||
if (tjsonAddDoubleToObject(pJson, "cpu_cores", pSys->cpu_cores) != 0) uError("failed to add cpu_cores");
|
||||
if (tjsonAddDoubleToObject(pJson, "mem_engine", mem_engine) != 0) uError("failed to add mem_engine");
|
||||
if (tjsonAddDoubleToObject(pJson, "mem_system", pSys->mem_system) != 0) uError("failed to add mem_system");
|
||||
if (tjsonAddDoubleToObject(pJson, "mem_total", pSys->mem_total) != 0) uError("failed to add mem_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "disk_engine", pSys->disk_engine) != 0) uError("failed to add disk_engine");
|
||||
if (tjsonAddDoubleToObject(pJson, "disk_used", pSys->disk_used) != 0) uError("failed to add disk_used");
|
||||
if (tjsonAddDoubleToObject(pJson, "disk_total", pSys->disk_total) != 0) uError("failed to add disk_total");
|
||||
if (tjsonAddDoubleToObject(pJson, "net_in", net_in_rate) != 0) uError("failed to add net_in");
|
||||
if (tjsonAddDoubleToObject(pJson, "net_out", net_out_rate) != 0) uError("failed to add net_out");
|
||||
if (tjsonAddDoubleToObject(pJson, "io_read", io_read_rate) != 0) uError("failed to add io_read");
|
||||
if (tjsonAddDoubleToObject(pJson, "io_write", io_write_rate) != 0) uError("failed to add io_write");
|
||||
if (tjsonAddDoubleToObject(pJson, "io_read_disk", io_read_disk_rate) != 0) uError("failed to add io_read_disk");
|
||||
if (tjsonAddDoubleToObject(pJson, "io_write_disk", io_write_disk_rate) != 0) uError("failed to add io_write_disk");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_select", pStat->numOfSelectReqs) != 0) uError("failed to add req_select");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_select_rate", req_select_rate) != 0) uError("failed to add req_select_rate");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert", pStat->numOfInsertReqs) != 0) uError("failed to add req_insert");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert_success", pStat->numOfInsertSuccessReqs) != 0)
|
||||
uError("failed to add req_insert_success");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert_rate", req_insert_rate) != 0) uError("failed to add req_insert_rate");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert_batch", pStat->numOfBatchInsertReqs) != 0)
|
||||
uError("failed to add req_insert_batch");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert_batch_success", pStat->numOfBatchInsertSuccessReqs) != 0)
|
||||
uError("failed to add req_insert_batch_success");
|
||||
if (tjsonAddDoubleToObject(pJson, "req_insert_batch_rate", req_insert_batch_rate) != 0)
|
||||
uError("failed to add req_insert_batch_rate");
|
||||
if (tjsonAddDoubleToObject(pJson, "errors", pStat->errors) != 0) uError("failed to add errors");
|
||||
if (tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes) != 0) uError("failed to add vnodes_num");
|
||||
if (tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum) != 0) uError("failed to add masters");
|
||||
if (tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode) != 0) uError("failed to add has_mnode");
|
||||
if (tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode) != 0) uError("failed to add has_qnode");
|
||||
if (tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode) != 0) uError("failed to add has_snode");
|
||||
}
|
||||
|
||||
static void monGenDiskJson(SMonInfo *pMonitor) {
|
||||
|
@ -515,18 +534,18 @@ static void monGenDiskJson(SMonInfo *pMonitor) {
|
|||
SJson *pLogdirJson = tjsonCreateObject();
|
||||
if (pLogdirJson == NULL) return;
|
||||
if (tjsonAddItemToObject(pJson, "logdir", pLogdirJson) != 0) return;
|
||||
(void)tjsonAddStringToObject(pLogdirJson, "name", pLogDesc->name);
|
||||
(void)tjsonAddDoubleToObject(pLogdirJson, "avail", pLogDesc->size.avail);
|
||||
(void)tjsonAddDoubleToObject(pLogdirJson, "used", pLogDesc->size.used);
|
||||
(void)tjsonAddDoubleToObject(pLogdirJson, "total", pLogDesc->size.total);
|
||||
if (tjsonAddStringToObject(pLogdirJson, "name", pLogDesc->name) != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pLogdirJson, "avail", pLogDesc->size.avail) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddDoubleToObject(pLogdirJson, "used", pLogDesc->size.used) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddDoubleToObject(pLogdirJson, "total", pLogDesc->size.total) != 0) uError("failed to add double to json");
|
||||
|
||||
SJson *pTempdirJson = tjsonCreateObject();
|
||||
if (pTempdirJson == NULL) return;
|
||||
if (tjsonAddItemToObject(pJson, "tempdir", pTempdirJson) != 0) return;
|
||||
(void)tjsonAddStringToObject(pTempdirJson, "name", pTempDesc->name);
|
||||
(void)tjsonAddDoubleToObject(pTempdirJson, "avail", pTempDesc->size.avail);
|
||||
(void)tjsonAddDoubleToObject(pTempdirJson, "used", pTempDesc->size.used);
|
||||
(void)tjsonAddDoubleToObject(pTempdirJson, "total", pTempDesc->size.total);
|
||||
if (tjsonAddStringToObject(pTempdirJson, "name", pTempDesc->name) != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pTempdirJson, "avail", pTempDesc->size.avail) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddDoubleToObject(pTempdirJson, "used", pTempDesc->size.used) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddDoubleToObject(pTempdirJson, "total", pTempDesc->size.total) != 0) uError("failed to add double to json");
|
||||
}
|
||||
|
||||
static const char *monLogLevelStr(ELogLevel level) {
|
||||
|
@ -571,26 +590,26 @@ static void monGenLogJson(SMonInfo *pMonitor) {
|
|||
|
||||
SJson *pLogError = tjsonCreateObject();
|
||||
if (pLogError == NULL) return;
|
||||
(void)tjsonAddStringToObject(pLogError, "level", "error");
|
||||
(void)tjsonAddDoubleToObject(pLogError, "total", numOfErrorLogs);
|
||||
if (tjsonAddStringToObject(pLogError, "level", "error") != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pLogError, "total", numOfErrorLogs) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddItemToArray(pSummaryJson, pLogError) != 0) tjsonDelete(pLogError);
|
||||
|
||||
SJson *pLogInfo = tjsonCreateObject();
|
||||
if (pLogInfo == NULL) return;
|
||||
(void)tjsonAddStringToObject(pLogInfo, "level", "info");
|
||||
(void)tjsonAddDoubleToObject(pLogInfo, "total", numOfInfoLogs);
|
||||
if (tjsonAddStringToObject(pLogInfo, "level", "info") != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pLogInfo, "total", numOfInfoLogs) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddItemToArray(pSummaryJson, pLogInfo) != 0) tjsonDelete(pLogInfo);
|
||||
|
||||
SJson *pLogDebug = tjsonCreateObject();
|
||||
if (pLogDebug == NULL) return;
|
||||
(void)tjsonAddStringToObject(pLogDebug, "level", "debug");
|
||||
(void)tjsonAddDoubleToObject(pLogDebug, "total", numOfDebugLogs);
|
||||
if (tjsonAddStringToObject(pLogDebug, "level", "debug") != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pLogDebug, "total", numOfDebugLogs) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddItemToArray(pSummaryJson, pLogDebug) != 0) tjsonDelete(pLogDebug);
|
||||
|
||||
SJson *pLogTrace = tjsonCreateObject();
|
||||
if (pLogTrace == NULL) return;
|
||||
(void)tjsonAddStringToObject(pLogTrace, "level", "trace");
|
||||
(void)tjsonAddDoubleToObject(pLogTrace, "total", numOfTraceLogs);
|
||||
if (tjsonAddStringToObject(pLogTrace, "level", "trace") != 0) uError("failed to add string to json");
|
||||
if (tjsonAddDoubleToObject(pLogTrace, "total", numOfTraceLogs) != 0) uError("failed to add double to json");
|
||||
if (tjsonAddItemToArray(pSummaryJson, pLogTrace) != 0) tjsonDelete(pLogTrace);
|
||||
}
|
||||
|
||||
|
|
|
@ -14705,7 +14705,7 @@ static int32_t rewriteDropSuperTablewithOpt(STranslateContext* pCxt, SQuery* pQu
|
|||
break;
|
||||
}
|
||||
if (!isdigit(pStmt->tableName[i])) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, "Table does not exist: `%s`.`%s`",
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, "STable not exist: `%s`.`%s`",
|
||||
pStmt->dbName, pStmt->tableName);
|
||||
}
|
||||
}
|
||||
|
@ -14715,8 +14715,11 @@ static int32_t rewriteDropSuperTablewithOpt(STranslateContext* pCxt, SQuery* pQu
|
|||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name);
|
||||
code = getTargetName(pCxt, &name, pTableName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "%s: db:`%s`, tbuid:`%s`", tstrerror(code), pStmt->dbName,
|
||||
pStmt->tableName);
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "%s: db:`%s`, tbuid:`%s`",
|
||||
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)
|
||||
? "STable not exist"
|
||||
: tstrerror(code),
|
||||
pStmt->dbName, pStmt->tableName);
|
||||
}
|
||||
tstrncpy(pStmt->tableName, pTableName, TSDB_TABLE_NAME_LEN); // rewrite table uid to table name
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ class TDTestCase:
|
|||
self.ctb_names = [ f'ctb0', 'ctb1', f'aa\u00bf\u200bctb0', f'aa\u00bf\u200bctb1']
|
||||
self.ntb_names = [ f'ntb0', f'aa\u00bf\u200bntb0', f'ntb1', f'aa\u00bf\u200bntb1']
|
||||
self.vgroups_opt = f'vgroups 4'
|
||||
self.err_dup_cnt = 5
|
||||
def insert_data(self,column_dict,tbname,row_num):
|
||||
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
|
||||
for i in range(row_num):
|
||||
|
@ -147,13 +148,31 @@ class TDTestCase:
|
|||
if i == 0:
|
||||
dropTable = f'drop table with `{stb_result[1]}`.`{stb_result[10]}`,'
|
||||
dropStable = f'drop stable with `{stb_result[1]}`.`{stb_result[10]}`,'
|
||||
dropTableWithSpace = f'drop table with `{stb_result[1]}`.`{stb_result[10]} `,'
|
||||
dropStableWithSpace = f'drop stable with `{stb_result[1]}`.` {stb_result[10]}`,'
|
||||
dropStableNotExist = f'drop stable with `{stb_result[1]}`.`{stb_result[10]}_notexist`,'
|
||||
for _ in range(self.err_dup_cnt):
|
||||
tdLog.info(dropTableWithSpace[:-1])
|
||||
tdSql.error(dropTableWithSpace[:-1], expectErrInfo="Table does not exist", fullMatched=False)
|
||||
tdLog.info(dropStableWithSpace[:-1])
|
||||
tdSql.error(dropStableWithSpace[:-1], expectErrInfo="STable not exist", fullMatched=False)
|
||||
tdLog.info(dropStableNotExist[:-1])
|
||||
tdSql.error(dropStableWithSpace[:-1], expectErrInfo="STable not exist", fullMatched=False)
|
||||
else:
|
||||
dropTable += f'`{stb_result[1]}`.`{stb_result[10]}`,'
|
||||
dropStable += f'`{stb_result[1]}`.`{stb_result[10]}`,'
|
||||
tdLog.info(dropTable[:-1])
|
||||
tdLog.info(dropStable[:-1])
|
||||
tdSql.error(dropTable[:-1])
|
||||
tdSql.error(dropStable[:-1])
|
||||
for _ in range(self.err_dup_cnt):
|
||||
tdLog.info(dropTable[:-1])
|
||||
tdLog.info(dropStable[:-1])
|
||||
tdSql.error(dropTable[:-1], expectErrInfo="Cannot drop super table in batch")
|
||||
tdSql.error(dropStable[:-1], expectErrInfo="syntax error", fullMatched=False)
|
||||
dropTableWithSpace += f'`{stb_result[1]}`.` {stb_result[10]}`,'
|
||||
dropStableWithSpace += f'`{stb_result[1]}`.`{stb_result[10]} `,'
|
||||
for _ in range(self.err_dup_cnt):
|
||||
tdLog.info(dropTableWithSpace[:-1])
|
||||
tdLog.info(dropStableWithSpace[:-1])
|
||||
tdSql.error(dropTableWithSpace[:-1], expectErrInfo="Table does not exist", fullMatched=False)
|
||||
tdSql.error(dropStableWithSpace[:-1], expectErrInfo="syntax error", fullMatched=False)
|
||||
i += 1
|
||||
i = 0
|
||||
for stb_result in result:
|
||||
|
@ -172,9 +191,10 @@ class TDTestCase:
|
|||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select * from information_schema.ins_tables where db_name like "dbtest_%"')
|
||||
tdSql.checkRows(8)
|
||||
tdSql.error(f'drop stable with information_schema.`ins_tables`;')
|
||||
tdSql.error(f'drop stable with performance_schema.`perf_connections`;')
|
||||
self.drop_table_check_end()
|
||||
for _ in range(self.err_dup_cnt):
|
||||
tdSql.error(f'drop stable with information_schema.`ins_tables`;', expectErrInfo="Cannot drop table of system database", fullMatched=False)
|
||||
tdSql.error(f'drop stable with performance_schema.`perf_connections`;', expectErrInfo="Cannot drop table of system database", fullMatched=False)
|
||||
self.drop_table_check_end()
|
||||
def drop_table_with_check(self):
|
||||
self.drop_table_check_init()
|
||||
tdSql.query(f'select * from information_schema.ins_tables where db_name like "dbtest_%"')
|
||||
|
@ -196,8 +216,9 @@ class TDTestCase:
|
|||
tdSql.checkRows(0)
|
||||
tdSql.query(f'select * from information_schema.ins_stables where db_name like "dbtest_%"')
|
||||
tdSql.checkRows(2)
|
||||
tdSql.error(f'drop table with information_schema.`ins_tables`;')
|
||||
tdSql.error(f'drop table with performance_schema.`perf_connections`;')
|
||||
for _ in range(self.err_dup_cnt):
|
||||
tdSql.error(f'drop table with information_schema.`ins_tables`;', expectErrInfo="Cannot drop table of system database", fullMatched=False)
|
||||
tdSql.error(f'drop table with performance_schema.`perf_connections`;', expectErrInfo="Cannot drop table of system database", fullMatched=False)
|
||||
self.drop_table_check_end()
|
||||
def drop_table_with_check_tsma(self):
|
||||
tdSql.execute(f'create database if not exists {self.dbname} {self.vgroups_opt}')
|
||||
|
|
Loading…
Reference in New Issue