diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 13826a1a74..ef6ed4af1d 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 9bbda8309f..9a6a5329ae 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index b013d45911..17446d184d 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index a479c367e1..950444df52 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -39,7 +39,7 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** } static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { - while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) { + while (0 == atomic_load_64((int64_t*)&pStmt->queue.qRemainNum)) { taosUsleep(1); return false; } @@ -53,7 +53,7 @@ static bool stmtDequeue(STscStmt2* pStmt, SStmtQNode** param) { *param = node; - (void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)atomic_sub_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); return true; } @@ -63,7 +63,7 @@ static void stmtEnqueue(STscStmt2* pStmt, SStmtQNode* param) { pStmt->queue.tail = param; pStmt->stat.bindDataNum++; - (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)atomic_add_fetch_64((int64_t*)&pStmt->queue.qRemainNum, 1); } static int32_t stmtCreateRequest(STscStmt2* pStmt) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5ecea42d2f..e82209e03f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -830,7 +830,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } @@ -842,9 +842,21 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb}; initStorageAPI(&handle.api); + int32_t code = TSDB_CODE_SUCCESS; + bool redirected = false; switch (pMsg->msgType) { case TDMT_SCH_QUERY: + if (!syncIsReadyForRead(pVnode->sync)) { + pMsg->code = (terrno) ? terrno : TSDB_CODE_SYN_NOT_LEADER; + redirected = true; + } + code = qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + if (redirected) { + vnodeRedirectRpcMsg(pVnode, pMsg, pMsg->code); + return 0; + } + return code; case TDMT_SCH_MERGE_QUERY: return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 53e1c3d288..65f7cd772f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -320,9 +320,8 @@ extern SQueryMgmt gQueryMgmt; case QW_PHASE_POST_QUERY: \ case QW_PHASE_PRE_CQUERY: \ case QW_PHASE_POST_CQUERY: \ - atomic_store_8(&(ctx)->phase, _value); \ - break; \ default: \ + atomic_store_8(&(ctx)->phase, _value); \ break; \ } \ } while (0) diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 89475bca13..d625bb113a 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -83,13 +83,16 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { taosMemPoolDestroySession(gMemPoolHandle, session); + QW_LOCK(QW_WRITE, &pJobInfo->lock); int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1); - + if (remainSessions != 0) { + QW_UNLOCK(QW_WRITE, &pJobInfo->lock); + } + QW_TASK_DLOG("task session destoryed, remainSessions:%d", remainSessions); if (0 == remainSessions) { - QW_LOCK(QW_WRITE, &pJobInfo->lock); - if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) { +// if (/*0 == taosHashGetSize(pJobInfo->pSessions) && */0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) { atomic_store_8(&pJobInfo->destroyed, 1); QW_UNLOCK(QW_WRITE, &pJobInfo->lock); @@ -98,10 +101,10 @@ void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { TAOS_UNUSED(taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2))); QW_TASK_DLOG_E("the whole query job removed"); - } else { - QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession); - QW_UNLOCK(QW_WRITE, &pJobInfo->lock); - } +// } else { +// QW_TASK_DLOG("job not removed, remainSessions:%d, %d", taosHashGetSize(pJobInfo->pSessions), pJobInfo->memInfo->remainSession); +// QW_UNLOCK(QW_WRITE, &pJobInfo->lock); +// } } } @@ -147,12 +150,14 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) { } QW_LOCK(QW_READ, &pJob->lock); + if (atomic_load_8(&pJob->destroyed)) { QW_UNLOCK(QW_READ, &pJob->lock); continue; } (void)atomic_add_fetch_32(&pJob->memInfo->remainSession, 1); + QW_UNLOCK(QW_READ, &pJob->lock); break; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 0452a05fe4..e385bbe5ea 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -498,14 +498,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int64_t rId = msg.refId; int32_t eId = msg.execId; - SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; + SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType, .code = pMsg->code}; qwMsg.msgInfo.explain = msg.explain; qwMsg.msgInfo.taskType = msg.taskType; qwMsg.msgInfo.needFetch = msg.needFetch; qwMsg.msgInfo.compressMsg = msg.compress; - QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), - msg.compress, pMsg->info.handle, msg.sql); + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s, code:0x%x", node, TMSG_INFO(pMsg->msgType), + msg.compress, pMsg->info.handle, msg.sql, qwMsg.code); code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql); msg.sql = NULL; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 50af01cd6f..641fa03f7a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -567,6 +567,14 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->rspCode); } + if (TSDB_CODE_SUCCESS != input->code) { + QW_TASK_ELOG("task already failed at phase %s, code:0x%x", qwPhaseStr(phase), input->code); + ctx->ctrlConnInfo.handle = NULL; + (void)qwDropTask(QW_FPARAMS()); + + QW_ERR_JRET(input->code); + } + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask)); break; } @@ -631,6 +639,10 @@ _return: if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); + if (QW_PHASE_PRE_CQUERY == phase && code) { + QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY); + } + QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } @@ -767,7 +779,7 @@ _return: int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { int32_t code = 0; SSubplan *plan = NULL; - SQWPhaseInput input = {0}; + SQWPhaseInput input = {.code = qwMsg->code}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; @@ -808,10 +820,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } - atomic_add_fetch_64(&gQueryMgmt.stat.taskRunNum, 1); + (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskRunNum, 1); uint64_t flags = 0; - dsGetSinkFlags(sinkHandle, &flags); + (void)dsGetSinkFlags(sinkHandle, &flags); ctx->level = plan->level; ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo); @@ -1342,7 +1354,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { ctx.sinkHandle = sinkHandle; uint64_t flags = 0; - dsGetSinkFlags(sinkHandle, &flags); + (void)dsGetSinkFlags(sinkHandle, &flags); ctx.sinkWithMemPool = flags & DS_FLAG_USE_MEMPOOL; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index fd255f53cf..d15ac7a791 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1361,14 +1361,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, taosMemoryFree(msg); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); } else { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { + SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); + } + SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); - - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { - SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); - } } return TSDB_CODE_SUCCESS;