From 497ba9992d5c2e1b542d5951ab9088edd8256cf7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Mar 2023 02:26:18 +0000 Subject: [PATCH 01/14] fix coverity scan problem --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndIndex.c | 4 ++-- source/libs/index/src/indexFstFile.c | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index dfc3b3fde8..fead246ed6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -392,7 +392,7 @@ typedef struct { } SSmaObj; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_INDEX_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; char dstTbName[TSDB_TABLE_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index 8782fd823f..83172acf64 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -138,7 +138,7 @@ static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbOb mInfo("idx: %s start to build drop index req", pIdx->name); len = tSerializeSDropIdxReq(NULL, 0, &req); - if (ret < 0) { + if (len < 0) { goto _err; } @@ -672,7 +672,7 @@ _OVER: static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; SIdxObj idxObj = {0}; - memcpy(idxObj.name, req->idxName, TSDB_TABLE_FNAME_LEN); + memcpy(idxObj.name, req->idxName, TSDB_INDEX_FNAME_LEN); memcpy(idxObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); memcpy(idxObj.db, pDb->name, TSDB_DB_FNAME_LEN); memcpy(idxObj.colName, req->colName, TSDB_COL_NAME_LEN); diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 40c50ed9cb..9e7ed52104 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -127,8 +127,6 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of blk->blockId = blkId; blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize); ASSERTS(blk->nread <= kBlockSize, "index read incomplete data"); - if (blk->nread > kBlockSize) break; - if (blk->nread < kBlockSize && blk->nread < len) { taosMemoryFree(blk); break; From 80798dd934a3f8df7c5e62a965404a6f9e2e06c3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 28 Mar 2023 09:20:53 +0800 Subject: [PATCH 02/14] fix: job retry issue --- include/libs/qcom/query.h | 1 + source/common/src/tdatablock.c | 2 +- source/libs/qcom/src/queryUtil.c | 2 + source/libs/scheduler/inc/schInt.h | 30 +++++++++---- source/libs/scheduler/src/schFlowCtrl.c | 1 - source/libs/scheduler/src/schJob.c | 42 ++++++++++++++++-- source/libs/scheduler/src/schRemote.c | 42 +++++++++++------- source/libs/scheduler/src/schStatus.c | 3 ++ source/libs/scheduler/src/schTask.c | 59 ++++++++++--------------- source/libs/scheduler/src/scheduler.c | 2 +- 10 files changed, 119 insertions(+), 65 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index cb547ee6b3..b6ada5a0c7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -33,6 +33,7 @@ typedef enum { JOB_TASK_STATUS_INIT, JOB_TASK_STATUS_EXEC, JOB_TASK_STATUS_PART_SUCC, + JOB_TASK_STATUS_FETCH, JOB_TASK_STATUS_SUCC, JOB_TASK_STATUS_FAIL, JOB_TASK_STATUS_DROP, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3c8d394b43..a75046d06d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { void blockDataEmpty(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; - if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) { + if (pInfo->capacity == 0) { return; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index c68a08682c..9d8c170003 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) { return "EXECUTING"; case JOB_TASK_STATUS_PART_SUCC: return "PARTIAL_SUCCEED"; + case JOB_TASK_STATUS_FETCH: + return "FETCHING"; case JOB_TASK_STATUS_SUCC: return "SUCCEED"; case JOB_TASK_STATUS_FAIL: diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 85b952937f..b7f9272bdc 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -193,7 +193,7 @@ typedef struct SSchLevel { int32_t taskSucceed; int32_t taskNum; int32_t taskLaunchedNum; - int32_t taskDoneNum; + int32_t taskExecDoneNum; SArray *subTasks; // Element is SSchTask } SSchLevel; @@ -340,6 +340,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) +#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC) +#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC) + #define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL) #define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle)) @@ -361,6 +364,7 @@ extern SSchedulerMgmt schMgmt; (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) +#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1)) #define SCH_SET_JOB_TYPE(_job, type) \ do { \ @@ -377,16 +381,24 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) -#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \ - (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect)) #define SCH_REDIRECT_MSGTYPE(_msgType) \ ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \ (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) -#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \ - (SCH_REDIRECT_MSGTYPE(_msgType) && \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)))) -#define SCH_NEED_RETRY(_msgType, _code) \ - ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) +#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \ + (SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx)) +#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \ + (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx)) +#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \ + (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect) + +#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code)) +#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && \ + (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) +#define SCH_TASK_NEED_RETRY(_msgType, _code) \ + ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -562,7 +574,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet); -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); +int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode); int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq); void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode); diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 5e4fe4b8a1..9cb95a6bbe 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { } int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl); - ; SCH_ERR_RET(code); return code; // to avoid compiler error diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 980a8ac6a1..b2c90dc67f 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -108,10 +108,18 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_PART_SUCC: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && + newStatus != JOB_TASK_STATUS_FETCH) { SCH_ERR_JRET(TSDB_CODE_APP_ERROR); } + break; + case JOB_TASK_STATUS_FETCH: + if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { + SCH_ERR_JRET(TSDB_CODE_APP_ERROR); + } + break; case JOB_TASK_STATUS_SUCC: case JOB_TASK_STATUS_FAIL: @@ -550,9 +558,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { } SSchLevel *pLevel = pTask->level; - int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1); + int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1); if (doneNum == pLevel->taskNum) { - pJob->levelIdx--; + atomic_sub_fetch_32(&pJob->levelIdx, 1); pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); for (int32_t i = 0; i < pLevel->taskNum; ++i) { @@ -562,6 +570,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { continue; } + if (SCH_TASK_ALREADY_LAUNCHED(pTask)) { + continue; + } + SCH_ERR_RET(schLaunchTask(pJob, pTask)); } } @@ -811,6 +823,30 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) { } } +int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { + if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->fetched) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_JRET(rspCode); + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + + SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); + +} + bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { bool r = false; SCH_LOCK(SCH_READ, &pJob->opStatus.lock); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 6f4130fd9f..af206aa46e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); } - if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { + if (taskStatus != JOB_TASK_STATUS_FETCH) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR); @@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { return TSDB_CODE_SUCCESS; } -// Note: no more task error processing, handled in function internal -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { +int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; char *msg = pMsg->pData; int32_t msgSize = pMsg->len; int32_t msgType = pMsg->msgType; - bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); - if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); - } - - SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); - - int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); - if (SCH_TASK_NEED_REDIRECT(pTask, reqType, rspCode, pMsg->len)) { - SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode)); - } - pTask->redirectCtx.inRedirect = false; switch (msgType) { @@ -423,6 +410,31 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } + + +// Note: no more task error processing, handled in function internal +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + int32_t msgType = pMsg->msgType; + + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); + } + + SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); + + int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); + if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { + SCH_RET(schHandleJobRetry()); + } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) { + SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); + } + + pTask->redirectCtx.inRedirect = false; + + SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode)); +} int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index 4c16a81a05..9d0ad30e2a 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { case JOB_TASK_STATUS_PART_SUCC: SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob)); break; + case JOB_TASK_STATUS_FETCH: + SCH_ERR_JRET(schJobFetchRows(pJob)); + break; case JOB_TASK_STATUS_SUCC: break; case JOB_TASK_STATUS_FAIL: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index bdab739327..a3194c7ff7 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -404,21 +404,7 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { - int32_t code = 0; - - SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); - - if (NULL == pData) { - pTask->retryTimes = 0; - } - - if (!NO_RET_REDIRECT_ERROR(rspCode)) { - SCH_UPDATE_REDICT_CODE(pJob, rspCode); - } - - SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); - +void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); @@ -426,10 +412,27 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 schRemoveTaskFromExecList(pJob, pTask); schDeregisterTaskHb(pJob, pTask); atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + if (SCH_TASK_EXEC_DONE(pTask)) { + atomic_sub_fetch_32(&pTask->level->taskExecDoneNum, 1); + } taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); +} + +int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { + int32_t code = 0; + + SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); + + if (!NO_RET_REDIRECT_ERROR(rspCode)) { + SCH_UPDATE_REDICT_CODE(pJob, rspCode); + } + + SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); + + schResetTaskForRetry(pJob, pTask); if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pData && pData->pEpSet) { @@ -445,12 +448,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); } - if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { - if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) { - SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); - } - } - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask)); @@ -486,20 +483,10 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } -int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { +int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - if (JOB_TASK_STATUS_PART_SUCC == pJob->status) { - SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->fetched) { - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); - SCH_ERR_JRET(rspCode); - } - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - - schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); - } + SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) { if (NULL == pData->pEpSet) { @@ -510,6 +497,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 } code = schDoTaskRedirect(pJob, pTask, pData, rspCode); + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -645,7 +633,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) { + if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; @@ -1067,7 +1055,6 @@ int32_t schLaunchTaskImpl(void *param) { SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } - // NOTE: race condition: the task should be put into the hash table before send msg to server if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); @@ -1272,6 +1259,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } + SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) { SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask)); } else { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 7cd5e957b6..2b46a4710e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) { SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq)); - SCH_ERR_JRET(schJobFetchRows(pJob)); + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FETCH, pReq)); _return: From 91c16f03446270779e64ab39d7bf4e93ed84e6ee Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Mar 2023 15:34:55 +0800 Subject: [PATCH 03/14] tsdb/read: remove dup mem schema fetching --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 38cb206877..0522869902 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3687,7 +3687,6 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); tsdbRowMergerAdd(&merge, pRow, pSchema); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); From 41b2c2d8fd84a331e8d12ba83a5eed94a3933773 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 31 Mar 2023 16:16:14 +0800 Subject: [PATCH 04/14] fix: error in optimizing useless columns for multi-level set operators --- source/libs/parser/src/parCalcConst.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index a7f6d7cb3c..c25d0e7036 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -337,8 +337,14 @@ static SNodeList* getChildProjection(SNode* pStmt) { static void eraseSetOpChildProjection(SSetOperator* pSetOp, int32_t index) { SNodeList* pLeftProjs = getChildProjection(pSetOp->pLeft); nodesListErase(pLeftProjs, nodesListGetCell(pLeftProjs, index)); + if (QUERY_NODE_SET_OPERATOR == nodeType(pSetOp->pLeft)) { + eraseSetOpChildProjection((SSetOperator*)pSetOp->pLeft, index); + } SNodeList* pRightProjs = getChildProjection(pSetOp->pRight); nodesListErase(pRightProjs, nodesListGetCell(pRightProjs, index)); + if (QUERY_NODE_SET_OPERATOR == nodeType(pSetOp->pRight)) { + eraseSetOpChildProjection((SSetOperator*)pSetOp->pRight, index); + } } typedef struct SNotRefByOrderByCxt { From 19e5d6372104647697264d00d6f511d06b7285fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 31 Mar 2023 18:26:46 +0800 Subject: [PATCH 05/14] fix: add job retry --- source/libs/qworker/src/qwDbg.c | 17 +++++++-- source/libs/scheduler/inc/schInt.h | 16 ++++++-- source/libs/scheduler/src/schJob.c | 53 +++++++++++++++++++++++++-- source/libs/scheduler/src/schRemote.c | 9 ++++- source/libs/scheduler/src/schTask.c | 31 +++++++++++++--- 5 files changed, 109 insertions(+), 17 deletions(-) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index b8d5d2e6ee..59e63e9eae 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -259,15 +259,26 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { static int32_t ignoreTime = 0; if (++ignoreTime > 10 && 0 == taosRand() % 9) { + if (ctx->msgType == TDMT_SCH_FETCH) { + qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + *rsped = true; + + taosSsleep(3); + return; + } + +#if 0 SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo - : &ctx->ctrlConnInfo); + : &ctx->ctrlConnInfo); qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); - + qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); *rsped = true; - + return; +#endif } } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index b7f9272bdc..d002b5dfa9 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -299,6 +299,7 @@ typedef struct SSchJob { SExecResult execRes; void *fetchRes; // TODO free it or not bool fetched; + bool noMoreRetry; int64_t resNumOfRows; // from int32_t to int64_t SSchResInfo userRes; char *sql; @@ -333,8 +334,8 @@ extern SSchedulerMgmt schMgmt; ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \ (!SCH_IS_DATA_BIND_QRY_TASK(_task))) -#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) -#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) +#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) +#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -398,7 +399,7 @@ extern SSchedulerMgmt schMgmt; (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) #define SCH_TASK_NEED_RETRY(_msgType, _code) \ ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) - + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -522,6 +523,11 @@ extern SSchedulerMgmt schMgmt; } \ } while (0) +#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \ + (_job)->levelIdx = (_job)->levelNum - 1; \ + SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \ +} while (0) + void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); @@ -603,6 +609,10 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); +int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode); +int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); +void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); +int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index b2c90dc67f..a36cd6747c 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -296,7 +296,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } pJob->levelNum = levelNum; - pJob->levelIdx = levelNum - 1; + SCH_RESET_JOB_LEVEL_IDX(pJob); SSchLevel level = {0}; SNodeListNode *plans = NULL; @@ -828,8 +828,9 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (pJob->fetched) { SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); - SCH_ERR_JRET(rspCode); + pJob->noMoreRetry = true; + SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_RET(rspCode); } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); @@ -839,12 +840,56 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { + SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode)); + + int32_t numOfLevels = taosArrayGetSize(pJob->levels); + for (int32_t i = 0; i < numOfLevels; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + pLevel->taskExecDoneNum = 0; + pLevel->taskLaunchedNum = 0; + + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); + for (int32_t j = 0; j < numOfTasks; ++j) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + SCH_LOCK_TASK(pTask); + SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode)); + qClearSubplanExecutionNode(pTask->plan); + schResetTaskForRetry(pJob, pTask); + SCH_UNLOCK_TASK(pTask); + } + } + + SCH_RESET_JOB_LEVEL_IDX(pJob); + + return TSDB_CODE_SUCCESS; +} + int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); + SCH_UNLOCK_TASK(pTask); + + SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode)); + + SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode)); + + SCH_ERR_JRET(schLaunchJob(pJob)); + + SCH_LOCK_TASK(pTask); + + SCH_RET(code); + +_return: + + SCH_LOCK_TASK(pTask); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index af206aa46e..80fdc7594c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -416,6 +416,7 @@ _return: int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; int32_t msgType = pMsg->msgType; + char *msg = pMsg->pData; bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); if (SCH_IS_QUERY_JOB(pJob)) { @@ -426,7 +427,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { - SCH_RET(schHandleJobRetry()); + SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) { SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } @@ -434,6 +435,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa pTask->redirectCtx.inRedirect = false; SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode)); + +_return: + + taosMemoryFreeClear(msg); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a3194c7ff7..207753ae25 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -378,7 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode)); + pJob->noMoreRetry = true; + SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } pCtx->periodMs *= tsRedirectFactor; @@ -408,16 +409,16 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); + if (pTask->delayTimer) { + taosTmrStopA(&pTask->delayTimer); + } taosHashClear(pTask->execNodes); schRemoveTaskFromExecList(pJob, pTask); schDeregisterTaskHb(pJob, pTask); - atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); - if (SCH_TASK_EXEC_DONE(pTask)) { - atomic_sub_fetch_32(&pTask->level->taskExecDoneNum, 1); - } taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -427,7 +428,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); if (!NO_RET_REDIRECT_ERROR(rspCode)) { - SCH_UPDATE_REDICT_CODE(pJob, rspCode); + SCH_UPDATE_REDIRECT_CODE(pJob, rspCode); } SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); @@ -496,6 +497,17 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i } } + SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode)); + + for (int32_t i = 0; i < pJob->levelNum; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + pLevel->taskExecDoneNum = 0; + pLevel->taskLaunchedNum = 0; + } + + SCH_RESET_JOB_LEVEL_IDX(pJob); + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); taosMemoryFreeClear(pData->pData); @@ -609,6 +621,13 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { */ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { + if (pJob->noMoreRetry) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes, + pTask->maxRetryTimes); + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++; From 02dc919b9d044c87084d6a81b406bbcfee3e260c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 31 Mar 2023 20:47:10 +0800 Subject: [PATCH 06/14] fix: job status issue --- source/libs/scheduler/src/schJob.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index a36cd6747c..e7bfe95795 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -83,6 +83,10 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { oriStatus = SCH_GET_JOB_STATUS(pJob); if (oriStatus == newStatus) { + if (JOB_TASK_STATUS_FETCH == newStatus) { + return code; + } + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -116,7 +120,8 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_FETCH: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC && + newStatus != JOB_TASK_STATUS_FETCH) { SCH_ERR_JRET(TSDB_CODE_APP_ERROR); } @@ -988,7 +993,7 @@ int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq SCH_ERR_RET(TSDB_CODE_APP_ERROR); } - if (status != JOB_TASK_STATUS_PART_SUCC) { + if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) { SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } From e2f58a555df706217999dce789ae36f6e3adf1c7 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 3 Apr 2023 11:24:29 +0800 Subject: [PATCH 07/14] enh: convert the month and year in interval-offset to a database precision duration --- source/libs/parser/src/parTranslater.c | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f58a66af18..53816a5436 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -757,7 +757,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); - } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) { + } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || + FUNCTION_TYPE_IROWTS == pFunc->funcType) { return true; } } @@ -3119,6 +3120,19 @@ static const char* getPrecisionStr(uint8_t precision) { return "unknown"; } +static void convertVarDuration(SValueNode* pOffset, uint8_t precision) { + const int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1}; + const int8_t units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND}; + + if (pOffset->unit == 'n') { + pOffset->datum.i = pOffset->datum.i * 31 * (NANOSECOND_PER_DAY / factors[precision]); + } else { + pOffset->datum.i = pOffset->datum.i * 365 * (NANOSECOND_PER_DAY / factors[precision]); + } + + pOffset->unit = units[precision]; +} + static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision; @@ -3143,6 +3157,10 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit))) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG); } + + if (pOffset->unit == 'n' || pOffset->unit == 'y') { + convertVarDuration(pOffset, precision); + } } if (NULL != pInterval->pSliding) { From c5632ca3558a57d7d1c35ca63d121e2ea074aa05 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 3 Apr 2023 13:47:28 +0800 Subject: [PATCH 08/14] enh: convert the month and year in interval-offset to a database precision duration --- tests/script/tsim/query/interval-offset.sim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index f4d95df083..0d796af0a0 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -212,10 +212,10 @@ print ===> rows2: $data20 $data21 $data22 $data23 $data24 if $rows != 3 then return -1 endi -if $data01 != 2 then +if $data01 != 4 then return -1 endi -if $data04 != 2 then +if $data04 != 4 then return -1 endi From aa6a4efd5b8b4bdcca6889c973c51864aed2af47 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 3 Apr 2023 16:18:16 +0800 Subject: [PATCH 09/14] fix(query): fix interp tsdbReader external range not setting properly --- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96bce02b67..836fd09bda 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4001,10 +4001,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; - if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { - pCond->twindows.skey += 1; - pCond->twindows.ekey -= 1; - } int32_t capacity = pVnode->config.tsdbCfg.maxRows; if (pResBlock != NULL) { @@ -4027,11 +4023,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL // update the SQueryTableDataCond to create inner reader int32_t order = pCond->order; if (order == TSDB_ORDER_ASC) { - pCond->twindows.ekey = window.skey; + pCond->twindows.ekey = window.skey - 1; pCond->twindows.skey = INT64_MIN; pCond->order = TSDB_ORDER_DESC; } else { - pCond->twindows.skey = window.ekey; + pCond->twindows.skey = window.ekey + 1; pCond->twindows.ekey = INT64_MAX; pCond->order = TSDB_ORDER_ASC; } @@ -4043,11 +4039,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } if (order == TSDB_ORDER_ASC) { - pCond->twindows.skey = window.ekey; + pCond->twindows.skey = window.ekey + 1; pCond->twindows.ekey = INT64_MAX; } else { pCond->twindows.skey = INT64_MIN; - pCond->twindows.ekey = window.ekey; + pCond->twindows.ekey = window.ekey - 1; } pCond->order = order; From 16394862a6a02d9070d17cbf68463f2e5debb5d2 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 3 Apr 2023 18:11:46 +0800 Subject: [PATCH 10/14] enh: last_row is keep order function --- source/libs/function/src/builtins.c | 10 +++++----- source/libs/parser/src/parTranslater.c | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 1a039ebda6..a88b1388ba 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -480,14 +480,16 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le return code; } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; + pFunc->node.resType = + (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; return TSDB_CODE_SUCCESS; } static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters - pFunc->node.resType = (SDataType){.bytes =tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; + pFunc->node.resType = + (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; return TSDB_CODE_SUCCESS; } @@ -509,13 +511,11 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; if (!IS_NUMERIC_TYPE(para1Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - for (int32_t i = 1; i < numOfParams; ++i) { SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i); pValue->notReserved = true; @@ -2491,7 +2491,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 53816a5436..4541002960 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -755,7 +755,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { SFunctionNode* pFunc = (SFunctionNode*)pExpr; if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType || - FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) { + FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType || + FUNCTION_TYPE_LAST_ROW == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) { From 6a3bcf4a71b4aa3c37dc767e5b3e4274dcff72eb Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 3 Apr 2023 19:08:57 +0800 Subject: [PATCH 11/14] ehn(plan/optimizer): remove tbname fetching with cached groupby --- source/libs/planner/src/planOptimizer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 16a5ef7bae..8fee17d968 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2279,7 +2279,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic if (NULL != cxt.pLastCols) { cxt.doAgg = false; lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols); - nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); + NODES_DESTORY_LIST(pScan->pScanPseudoCols); lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols); nodesClearList(cxt.pLastCols); } From fc5890c8bf07671c21ce2ad51e36f8f740b61661 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 3 Apr 2023 12:36:45 +0000 Subject: [PATCH 12/14] aovid conn leak --- source/libs/transport/src/transCli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 50ed9fa61b..c23d6d0a1f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -462,6 +462,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) { tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn); + if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); transUnrefCliHandle(pConn); return; } @@ -521,6 +522,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { destroyCmsg(pMsg); tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); } while (!transQueueEmpty(&pConn->cliMsgs)); + if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn); transUnrefCliHandle(pConn); } void cliHandleExcept(SCliConn* conn) { From 45726cf551887269e33c666ba1ab99bdbbe540b7 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 3 Apr 2023 23:39:35 +0800 Subject: [PATCH 13/14] fix: taosbenchmark multithreads with limit for main (#20744) * fix: taosbenchmark multithreads with limit for main * fix: update taos-tools --- cmake/taostools_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index aef89a2d42..2de3881dd2 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG d194dc9 + GIT_TAG 273a3fe SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From f79eeb3032e422cef9676ee65f89c8aabab9c494 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 4 Apr 2023 08:54:28 +0800 Subject: [PATCH 14/14] fix: add stmt error handling --- source/client/inc/clientLog.h | 1 + source/client/inc/clientStmt.h | 4 +++ source/client/src/clientEnv.c | 1 + source/client/src/clientStmt.c | 22 ++++++++++++-- tests/script/api/batchprepare.c | 51 ++++++++++++++++++++++++++++++++- 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/source/client/inc/clientLog.h b/source/client/inc/clientLog.h index c29f495201..908e470830 100644 --- a/source/client/inc/clientLog.h +++ b/source/client/inc/clientLog.h @@ -26,6 +26,7 @@ extern "C" { #define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", DEBUG_FATAL, cDebugFlag, __VA_ARGS__); }} while(0) #define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", DEBUG_ERROR, cDebugFlag, __VA_ARGS__); }} while(0) #define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0) +#define tscWarnL(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLongString("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0) #define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0) #define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0) #define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 2b42de93e3..0c9696f1c8 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -102,6 +102,7 @@ typedef struct STscStmt { SStmtBindInfo bInfo; int64_t reqid; + int32_t errCode; } STscStmt; extern char *gStmtStatusStr[]; @@ -121,6 +122,7 @@ extern char *gStmtStatusStr[]; int32_t _code = c; \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ + pStmt->errCode = _code; \ return _code; \ } \ } while (0) @@ -129,6 +131,7 @@ extern char *gStmtStatusStr[]; int32_t _code = c; \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ + pStmt->errCode = _code; \ } \ return _code; \ } while (0) @@ -137,6 +140,7 @@ extern char *gStmtStatusStr[]; code = c; \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ + pStmt->errCode = code; \ goto _return; \ } \ } while (0) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index de08ba66cc..dba1dbcf9a 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -103,6 +103,7 @@ static void deregisterRequest(SRequestObj *pRequest) { if (duration >= SLOW_QUERY_INTERVAL) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); + tscWarnL("slow query: %s, duration:%" PRId64, pRequest->sqlstr, duration); } releaseTscObj(pTscObj->id); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 3ed157efef..71a41c68e7 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -32,8 +32,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { STMT_LOG_SEQ(newStatus); } + if (pStmt->errCode && newStatus != STMT_PREPARE) { + STMT_DLOG("stmt already failed with err: %s", tstrerror(pStmt->errCode)); + return pStmt->errCode; + } + switch (newStatus) { case STMT_PREPARE: + pStmt->errCode = 0; break; case STMT_SETTBNAME: if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) { @@ -197,7 +203,10 @@ int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHa STscStmt* pStmt = (STscStmt*)stmt; *pVgHash = pStmt->sql.pVgHash; + pStmt->sql.pVgHash = NULL; + *pBlockHash = pStmt->exec.pBlockHash; + pStmt->exec.pBlockHash = NULL; return TSDB_CODE_SUCCESS; } @@ -325,6 +334,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { } int32_t stmtCleanSQLInfo(STscStmt* pStmt) { + STMT_DLOG_E("start to free SQL info"); + taosMemoryFree(pStmt->sql.queryRes.fields); taosMemoryFree(pStmt->sql.queryRes.userFields); taosMemoryFree(pStmt->sql.sqlStr); @@ -351,6 +362,8 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { memset(&pStmt->sql, 0, sizeof(pStmt->sql)); + STMT_DLOG_E("end to free SQL info"); + return TSDB_CODE_SUCCESS; } @@ -441,11 +454,10 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { - STMT_ERR_RET(stmtCleanBindInfo(pStmt)); - tscDebug("tb %s not exist", pStmt->bInfo.tbFName); + stmtCleanBindInfo(pStmt); - return TSDB_CODE_SUCCESS; + STMT_ERR_RET(code); } STMT_ERR_RET(code); @@ -922,9 +934,13 @@ _return: int stmtClose(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; + STMT_DLOG_E("start to free stmt"); + stmtCleanSQLInfo(pStmt); taosMemoryFree(stmt); + STMT_DLOG_E("stmt freed"); + return TSDB_CODE_SUCCESS; } diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 0903095dc9..99507ef5c3 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -122,9 +122,11 @@ int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos); int insertAUTOTest3(TAOS_STMT *stmt, TAOS *taos); int queryColumnTest(TAOS_STMT *stmt, TAOS *taos); int queryMiscTest(TAOS_STMT *stmt, TAOS *taos); +int insertNonExistsTb(TAOS_STMT *stmt, TAOS *taos); enum { TTYPE_INSERT = 1, + TTYPE_INSERT_NG, TTYPE_QUERY, }; @@ -187,6 +189,8 @@ CaseCfg gCase[] = { {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:NG-TBNEXISTS",tListLen(fullColList), fullColList, TTYPE_INSERT_NG,0, false, false, insertNonExistsTb, 10, 10, 1, 3, 0, 0, 1, -1}, + // {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, // {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, @@ -250,7 +254,7 @@ CaseCtrl gCaseCtrl = { .funcIdxList = NULL, .checkParamNum = false, .runTimes = 0, - .caseIdx = 24, + .caseIdx = 26, .caseNum = 1, .caseRunIdx = -1, .caseRunNum = -1, @@ -2191,6 +2195,47 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) { } +int insertNonExistsTb(TAOS_STMT *stmt, TAOS *taos) { + BindData data = {0}; + prepareInsertData(&data); + + int code = taos_stmt_prepare(stmt, data.sql, 0); + if (code != 0){ + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + + bpCheckIsInsert(stmt, 1); + + char *buf = "tbnexist"; + code = bpSetTableNameTags(&data, 0, buf, stmt); + if (code == 0){ + printf("!!!taos_stmt_set_tbname expected error not occurred\n"); + exit(1); + } + + if (0 == taos_stmt_bind_param_batch(stmt, data.pBind)) { + printf("!!!taos_stmt_bind_param_batch expected error not occurred\n"); + exit(1); + } + + if (0 == taos_stmt_add_batch(stmt)) { + printf("!!!taos_stmt_add_batch expected error not occurred\n"); + exit(1); + } + + if (0 == taos_stmt_execute(stmt)) { + printf("!!!taos_stmt_execute expected error not occurred\n"); + exit(1); + } + + destroyData(&data); + + return 0; +} + + + int errorSQLTest1(TAOS_STMT *stmt, TAOS *taos) { BindData data = {0}; @@ -2213,6 +2258,10 @@ int errorSQLTest1(TAOS_STMT *stmt, TAOS *taos) { } void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expected, bool silent) { + if (TTYPE_INSERT_NG == gCurCase->testType) { + return; + } + char sql[255] = "SELECT * FROM "; int32_t rows = 0;