From 8675857207a99a0979c065fa666f229a416909dd Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 5 Sep 2022 13:56:42 +0800 Subject: [PATCH 1/8] enh: add client query policy --- include/libs/qcom/query.h | 15 ++-- include/libs/qworker/qworker.h | 3 +- include/libs/scheduler/scheduler.h | 1 + include/util/tdef.h | 1 + source/client/src/clientEnv.c | 1 + source/client/src/clientImpl.c | 24 ++++++ source/client/src/clientMain.c | 3 + source/dnode/mnode/impl/src/mndQuery.c | 2 +- source/dnode/qnode/src/qnode.c | 2 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- source/libs/qworker/inc/qwInt.h | 4 +- source/libs/qworker/src/qworker.c | 94 ++++++++++++++--------- source/libs/qworker/test/qworkerTests.cpp | 16 ++-- source/libs/scheduler/inc/schInt.h | 3 + source/libs/scheduler/src/schJob.c | 1 + source/libs/scheduler/src/schTask.c | 67 +++++++++++----- 16 files changed, 159 insertions(+), 80 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1fa7dca7dc..bf89e98ce3 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -52,6 +52,7 @@ typedef enum { #define QUERY_POLICY_VNODE 1 #define QUERY_POLICY_HYBRID 2 #define QUERY_POLICY_QNODE 3 +#define QUERY_POLICY_CLIENT 4 typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -267,43 +268,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qError(...) \ do { \ if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qWarn(...) \ do { \ if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qInfo(...) \ do { \ if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebug(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qTrace(...) \ do { \ if (qDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebugL(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 87aefe5187..b83ffe4db2 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -29,6 +29,7 @@ enum { NODE_TYPE_QNODE, NODE_TYPE_SNODE, NODE_TYPE_MNODE, + NODE_TYPE_CLIENT, }; typedef struct SQWorkerCfg { @@ -55,7 +56,7 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index e6973cd390..70bd7b0c78 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param); typedef struct SSchedulerReq { bool syncReq; + bool localReq; SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; diff --git a/include/util/tdef.h b/include/util/tdef.h index 2bc821b873..b17b8cbc66 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -467,6 +467,7 @@ enum { #define SNODE_HANDLE -2 #define VNODE_HANDLE -3 #define BNODE_HANDLE -4 +#define CLIENT_HANDLE -5 #define TSDB_CONFIG_OPTION_LEN 32 #define TSDB_CONFIG_VALUE_LEN 64 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bf92a9ba6a..e1cf078d57 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -27,6 +27,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f91ceb3184..54320445d4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -540,6 +540,20 @@ _return: return TSDB_CODE_SUCCESS; } + +int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) { + *pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); + SQueryNodeLoad load = {0}; + load.addr.nodeId = CLIENT_HANDLE; + + taosArrayPush(*pNodeList, &load); + + tscDebug("0x%" PRIx64 " client policy", pRequest->requestId); + + return TSDB_CODE_SUCCESS; +} + + int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) { SArray* pDbVgList = NULL; SArray* pQnodeList = NULL; @@ -585,6 +599,10 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } + case QUERY_POLICY_CLIENT: { + code = buildClientPolicyNodeList(pRequest, pNodeList); + break; + } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -645,6 +663,10 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } + case QUERY_POLICY_CLIENT: { + code = buildClientPolicyNodeList(pRequest, pNodeList); + break; + } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -667,6 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = true, + .localReq = (tsQueryPolicy == CLIENT_HANDLE), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, @@ -1042,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = false, + .localReq = (tsQueryPolicy == CLIENT_HANDLE), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 3086078080..9c435887e6 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -26,6 +26,7 @@ #include "tref.h" #include "trpc.h" #include "version.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -75,6 +76,8 @@ void taos_cleanup(void) { cleanupTaskQueue(); + qWorkerDestroy(&tscQueryMgmt); + taosConvDestroy(); tscInfo("all local resources released"); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 654f46ec85..0e897de4e7 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -170,7 +170,7 @@ _exit: } int32_t mndInitQuery(SMnode *pMnode) { - if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { + if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { mError("failed to init qworker in mnode since %s", terrstr()); return -1; } diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index b65189153e..efdc8b4693 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } - if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8d799e919d..1461d37181 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnd.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 729ac474e4..8efc247524 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -29,7 +29,7 @@ extern "C" { #include "executor.h" #include "trpc.h" -#define QW_DEFAULT_SCHEDULER_NUMBER 10000 +#define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 @@ -93,7 +93,7 @@ typedef struct SQWMsg { void *node; int32_t code; int32_t msgType; - char *msg; + void *msg; int32_t msgLen; SQWMsgInfo msgInfo; SRpcHandleInfo connInfo; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f006096ce2..77c8aa5cd9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -512,11 +512,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool queryRsped = false; - SSubplan *plan = NULL; - SQWPhaseInput input = {0}; - qTaskInfo_t pTaskInfo = NULL; - DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -578,19 +573,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - - // queryRsped = true; - ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); - if (pTaskInfo && sinkHandle) { - qwSaveTbVersionInfo(pTaskInfo, ctx); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); - } + qwSaveTbVersionInfo(pTaskInfo, ctx); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); _return: @@ -600,11 +588,6 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - // if (!queryRsped) { - // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - //} - QW_RET(TSDB_CODE_SUCCESS); } @@ -1000,8 +983,8 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { - if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { + if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1024,22 +1007,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (cfg) { - mgmt->cfg = *cfg; - if (0 == mgmt->cfg.maxSchedulerNum) { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - } - if (0 == mgmt->cfg.maxTaskNum) { - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - } - if (0 == mgmt->cfg.maxSchTaskNum) { - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } - } else { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } + mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; + mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); @@ -1064,7 +1034,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->msgCb = *pMsgCb; + mgmt->msgCb = pMsgCb ? *pMsgCb : NULL; mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); if (mgmt->refId < 0) { @@ -1140,3 +1110,51 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } + +int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + SSubplan *plan = (SSubplan *)qwMsg->msg; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + + QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + + ctx->taskType = qwMsg->msgInfo.taskType; + ctx->explain = qwMsg->msgInfo.explain; + ctx->needFetch = qwMsg->msgInfo.needFetch; + ctx->msgType = qwMsg->msgType; + + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + if (code) { + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + if (NULL == sinkHandle || NULL == pTaskInfo) { + QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + ctx->level = plan->level; + atomic_store_ptr(&ctx->taskHandle, pTaskInfo); + atomic_store_ptr(&ctx->sinkHandle, sinkHandle); + + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + +_return: + + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + qwReleaseTaskCtx(mgmt, ctx); + } + + QW_RET(code); +} + + diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 1f76ea1e7e..60d6594c1b 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -877,7 +877,7 @@ TEST(seqTest, normalCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); @@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); @@ -950,7 +950,7 @@ TEST(seqTest, randCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); int32_t t = 0; @@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); TdThreadAttr thattr; @@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); tsem_init(&qwtTestQuerySem, 0, 0); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 957fd46ba5..6292aac6d2 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt { SSchStat stat; SRWLatch hbLock; SHashObj *hbConnections; + void *queryMgmt; } SSchedulerMgmt; typedef struct SSchCallbackParamHeader { @@ -237,6 +238,7 @@ typedef struct SSchJobAttr { bool queryJob; bool needFetch; bool needFlowCtrl; + bool localExec; } SSchJobAttr; typedef struct { @@ -302,6 +304,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 98501427ab..a314ffaa30 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -719,6 +719,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { } pJob->attr.explainMode = pReq->pDag->explainInfo.mode; + pJob->attr.localExec = pReq->localReq; pJob->conn = *pReq->pConn; if (pReq->sql) { pJob->sql = strdup(pReq->sql); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9cab39c301..94b2c4d372 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -228,7 +228,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_RET(errCode); } -// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; @@ -819,6 +818,48 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } +int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { + SSubplan *plan = pTask->plan; + int32_t code = 0; + + if (NULL == pTask->msg) { // TODO add more detailed reason for failure + code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); + if (TSDB_CODE_SUCCESS != code) { + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, + pTask->msgLen); + SCH_ERR_RET(code); + } else { + SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); + } + } + + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); + + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); + } + + SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); +} + +int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { + //SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + if (NULL == schMgmt.queryMgmt) { + SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); + } + + SQWMsg qwMsg = {0}; + qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; + qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); + qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); + qwMsg.msg = pTask->plan; + qwMsg.msgType = pTask->plan->msgType; + + SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg)); + + SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); +} + int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchJob *pJob = schAcquireJob(pCtx->jobRid); @@ -852,27 +893,12 @@ int32_t schLaunchTaskImpl(void *param) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } - SSubplan *plan = pTask->plan; - - if (NULL == pTask->msg) { // TODO add more detailed reason for failure - code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); - if (TSDB_CODE_SUCCESS != code) { - SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, - pTask->msgLen); - SCH_ERR_JRET(code); - } else { - SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); - } + if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) { + SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask)); + } else { + SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); } - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); - - if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); - } - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - _return: taosMemoryFree(param); @@ -892,7 +918,6 @@ _return: } int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { - SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); if (NULL == param) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); From 1427a8f3b0d95ef7dff1500b32fb570e7feb270b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Sep 2022 19:03:42 +0800 Subject: [PATCH 2/8] enh: support client query policy --- include/libs/executor/executor.h | 9 +- include/libs/nodes/plannodes.h | 1 + include/libs/qworker/qworker.h | 23 ++++- source/client/src/clientImpl.c | 6 +- source/client/src/clientMain.c | 2 - source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 7 +- source/libs/executor/src/executorimpl.c | 69 ++++++++------ source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/qworker/inc/qwInt.h | 18 +--- source/libs/qworker/inc/qwMsg.h | 2 +- source/libs/qworker/src/qwMsg.c | 4 +- source/libs/qworker/src/qworker.c | 119 +++++++++++++++++++----- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schInt.h | 4 +- source/libs/scheduler/src/schRemote.c | 118 +++++++++++++---------- source/libs/scheduler/src/schTask.c | 48 ++++++++-- source/libs/scheduler/src/scheduler.c | 4 + 20 files changed, 302 insertions(+), 140 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 25a6221fcb..37f89caa41 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,6 +29,13 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; +typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); + +typedef struct { + void *handle; + localFetchFp fp; +} SLocalFetch; + typedef struct { void* tqReader; void* meta; @@ -127,7 +134,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6fd6a316eb..5abad5cccc 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -393,6 +393,7 @@ typedef struct SDownstreamSourceNode { uint64_t schedId; int32_t execId; int32_t fetchMsgType; + bool localExec; } SDownstreamSourceNode; typedef struct SExchangePhysiNode { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index b83ffe4db2..6062ffa3ef 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -56,6 +56,23 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; +typedef struct SQWMsgInfo { + int8_t taskType; + int8_t explain; + int8_t needFetch; +} SQWMsgInfo; + +typedef struct SQWMsg { + void *node; + int32_t code; + int32_t msgType; + void *msg; + int32_t msgLen; + SQWMsgInfo msgInfo; + SRpcHandleInfo connInfo; +} SQWMsg; + + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); @@ -78,10 +95,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); -void qWorkerDestroy(void **qWorkerMgmt); +void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg); + +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 54320445d4..6a8e58efb4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -365,7 +365,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { } bool qnodeRequired(SRequestObj* pRequest) { - if (QUERY_POLICY_VNODE == tsQueryPolicy) { + if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) { return false; } @@ -689,7 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = true, - .localReq = (tsQueryPolicy == CLIENT_HANDLE), + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, @@ -1065,7 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = false, - .localReq = (tsQueryPolicy == CLIENT_HANDLE), + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9c435887e6..73ff36a299 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -76,8 +76,6 @@ void taos_cleanup(void) { cleanupTaskQueue(); - qWorkerDestroy(&tscQueryMgmt); - taosConvDestroy(); tscInfo("all local resources released"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 47110237dd..add19ae337 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f2063e3067..d184926817 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -641,7 +641,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 89e5b2da0a..1dfa9da7d5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -183,6 +183,7 @@ typedef struct SExecTaskInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SSubplan* pSubplan; struct SOperatorInfo* pRoot; + SLocalFetch localFetch; } SExecTaskInfo; enum { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 278f02b228..93303cb62d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -450,10 +450,15 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + if (pLocal) { + pTaskInfo->localFetch.handle = pLocal->handle; + pTaskInfo->localFetch.fp = pLocal->fp; + } + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b53d35a1a1..a1f3e5a4a5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1999,38 +1999,45 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, - sourceIndex, totalSources); - - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - pMsg->execId = htonl(pSource->execId); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - taosMemoryFreeClear(pMsg); - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; - pMsgSendInfo->param = pWrapper; - pMsgSendInfo->paramFreeFp = taosMemoryFree; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = pSource->fetchMsgType; - pMsgSendInfo->fp = loadRemoteDataCallback; + if (pSource->localExec) { + SDataBuf pBuf = {0}; + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData); + loadRemoteDataCallback(pWrapper, &pBuf, code); + } else { + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, + sourceIndex, totalSources); - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->execId = htonl(pSource->execId); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + taosMemoryFreeClear(pMsg); + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + + pMsgSendInfo->param = pWrapper; + pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = pSource->fetchMsgType; + pMsgSendInfo->fp = loadRemoteDataCallback; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + } + return TSDB_CODE_SUCCESS; } @@ -3963,7 +3970,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { @@ -4474,8 +4481,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); - nodesDestroyNode((SNode*)pTaskInfo->pSubplan); - + if (!pTaskInfo->localFetch.fp) { + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + } + taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 83bccbffb4..904b8f01eb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -609,6 +609,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(execId); COPY_SCALAR_FIELD(fetchMsgType); + COPY_SCALAR_FIELD(localExec); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 8efc247524..150dde91e0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -83,22 +83,6 @@ typedef struct SQWDebug { extern SQWDebug gQWDebug; -typedef struct SQWMsgInfo { - int8_t taskType; - int8_t explain; - int8_t needFetch; -} SQWMsgInfo; - -typedef struct SQWMsg { - void *node; - int32_t code; - int32_t msgType; - void *msg; - int32_t msgLen; - SQWMsgInfo msgInfo; - SRpcHandleInfo connInfo; -} SQWMsg; - typedef struct SQWHbParam { bool inUse; int32_t qwrId; @@ -133,6 +117,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int8_t needFetch; + int8_t localExec; int32_t msgType; int32_t fetchType; int32_t execId; @@ -150,6 +135,7 @@ typedef struct SQWTaskCtx { int8_t events[QW_EVENT_MAX]; + SArray *explainRes; void *taskHandle; void *sinkHandle; STbVerInfo tbInfo; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 3ff5b5950f..b46c5d6baf 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index ecc25e9c2d..2acf3fad15 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -9,10 +9,10 @@ #include "tmsg.h" #include "tname.h" -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize); + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize)); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 77c8aa5cd9..00588977bc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -57,11 +57,15 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); - SRpcHandleInfo connInfo = ctx->ctrlConnInfo; - connInfo.ahandle = NULL; - int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); - taosArrayDestroyEx(execInfoList, freeItem); - QW_ERR_RET(code); + if (ctx->localExec) { + + } else { + SRpcHandleInfo connInfo = ctx->ctrlConnInfo; + connInfo.ahandle = NULL; + int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); + taosArrayDestroyEx(execInfoList, freeItem); + QW_ERR_RET(code); + } } if (!ctx->needFetch) { @@ -80,6 +84,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; + SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { @@ -88,7 +93,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTaskOpt(taskHandle, pResList, &useconds); + code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -229,7 +234,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); if (NULL == rsp) { - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); *pOutput = output; } else { pOutput->queryEnd = output.queryEnd; @@ -250,7 +255,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *dataLen += len; - QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); output.pData = rsp->data + *dataLen - len; code = dsGetDataBlock(ctx->sinkHandle, &output); @@ -474,16 +479,18 @@ _return: } if (QW_PHASE_POST_QUERY == phase && ctx) { - ctx->queryRsped = true; - - bool rsped = false; - SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); - qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + if (!ctx->localExec) { + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + if (!rsped) { + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } } + + ctx->queryRsped = true; } if (ctx) { @@ -551,6 +558,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->msgType = qwMsg->msgType; + ctx->localExec = false; // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -1034,7 +1042,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->msgCb = pMsgCb ? *pMsgCb : NULL; + if (pMsgCb) { + mgmt->msgCb = *pMsgCb; + } else { + memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); + } mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); if (mgmt->refId < 0) { @@ -1111,26 +1123,33 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; int32_t code = 0; SQWTaskCtx *ctx = NULL; SSubplan *plan = (SSubplan *)qwMsg->msg; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; + SReadHandle rHandle = {0}; QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); - - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->msgType = qwMsg->msgType; + ctx->localExec = true; + ctx->explainRes = explainRes; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; + + code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -1149,6 +1168,12 @@ int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { _return: + taosMemoryFree(rHandle.pMsgCb); + + input.code = code; + input.msgType = qwMsg->msgType; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); qwReleaseTaskCtx(mgmt, ctx); @@ -1157,4 +1182,52 @@ _return: QW_RET(code); } +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; + int32_t code = 0; + int32_t dataLen = 0; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; + bool queryStop = false; + + SQWPhaseInput input = {0}; + + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); + + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + ctx->msgType = TDMT_SCH_MERGE_FETCH; + ctx->explainRes = explainRes; + + SOutputData sOutput = {0}; + + while (true) { + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (NULL == rsp) { + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); + + continue; + } else { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } + + break; + } + } + +_return: + + *pRsp = rsp; + + input.code = code; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL); + + QW_RET(code); +} + diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 88180391dd..3288120b67 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( scheduler - PUBLIC os util nodes planner qcom common catalog transport command + PUBLIC os util nodes planner qcom common catalog transport command qworker executor ) if(${BUILD_TEST}) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 6292aac6d2..01d962a71f 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -264,7 +264,7 @@ typedef struct SSchJob { SHashObj *taskList; SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* SHashObj *flowCtrl; // key is ep, element is SSchFlowControl - + SExplainCtx *explainCtx; int8_t status; SQueryNodeAddr resNode; @@ -305,6 +305,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) +#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task)) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -503,6 +504,7 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 5a64aaaebb..d79514a122 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { return TSDB_CODE_SUCCESS; } +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + int32_t code = 0; + + SCH_ERR_JRET(rspCode); + + if (NULL == msg) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + if (rsp->completed) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); + if (pRsp) { + SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schLaunchFetchTask(pJob)); + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + if (pJob->fetchRes) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + atomic_store_ptr(&pJob->fetchRes, rsp); + atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); + } + + SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + + msg = NULL; + schProcessOnDataFetched(pJob); + +_return: + + taosMemoryFreeClear(msg); + + SCH_RET(code); +} + +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); + + if (pRsp) { + SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + return TSDB_CODE_SUCCESS; +} + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; @@ -305,61 +370,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp)); - - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } + SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); break; } case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (SCH_IS_EXPLAIN_JOB(pJob)) { - if (rsp->completed) { - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schLaunchFetchTask(pJob)); - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - if (pJob->fetchRes) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); - taosMemoryFreeClear(rsp); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); - - if (rsp->completed) { - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); - } - - SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - + code = schProcessFetchRsp(pJob, pTask, msg, rspCode); msg = NULL; - - schProcessOnDataFetched(pJob); + SCH_ERR_JRET(code); break; } case TDMT_SCH_DROP_TASK_RSP: { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 94b2c4d372..3cecbc9ae3 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "qworker.h" void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); @@ -89,6 +90,10 @@ _return: } int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, @@ -292,6 +297,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .execId = pTask->execId, .addr = pTask->succeedAddr, .fetchMsgType = SCH_FETCH_TYPE(pTask), + .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask), }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->planLock); @@ -847,15 +853,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { if (NULL == schMgmt.queryMgmt) { SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); } - + + SArray *explainRes = NULL; SQWMsg qwMsg = {0}; qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); qwMsg.msg = pTask->plan; qwMsg.msgType = pTask->plan->msgType; - - SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg)); + qwMsg.connInfo.handle = pJob->conn.pTrans; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + } + + SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); } @@ -878,7 +890,8 @@ int32_t schLaunchTaskImpl(void *param) { pTask->retryTimes++; pTask->waitRetry = false; - SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes); + SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", + pTask->execId, pTask->retryTimes); SCH_LOG_TASK_START_TS(pTask); @@ -893,7 +906,7 @@ int32_t schLaunchTaskImpl(void *param) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } - if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) { + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask)); } else { SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); @@ -986,6 +999,25 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } +int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { + SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); +} + +int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { + void *pRsp = NULL; + SArray *explainRes = NULL; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + } + + SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); + + SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); + + return TSDB_CODE_SUCCESS; +} + // Note: no more error processing, handled in function internal int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; @@ -996,7 +1028,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) { + SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask)); + } else { + SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask)); + } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f1ec7f5e04..0ccaef3857 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -17,6 +17,7 @@ #include "schInt.h" #include "tmsg.h" #include "tref.h" +#include "qworker.h" SSchedulerMgmt schMgmt = { .jobRef = -1, @@ -192,4 +193,7 @@ void schedulerDestroy(void) { schMgmt.hbConnections = NULL; } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + + qWorkerDestroy(&schMgmt.queryMgmt); + schMgmt.queryMgmt = NULL; } From 9e44816bcca2a71b164a56b30de59cfbd3daf0fa Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 15 Sep 2022 14:46:15 +0800 Subject: [PATCH 3/8] enh: support client local merge policy --- include/common/tmsg.h | 9 ++++ include/libs/executor/executor.h | 5 ++- include/libs/qworker/qworker.h | 4 +- source/common/src/tmsg.c | 17 ++++++- source/libs/command/src/explain.c | 11 ++--- source/libs/executor/src/executor.c | 3 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/qworker/src/qworker.c | 14 +++++- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schRemote.c | 6 +-- source/libs/scheduler/src/schTask.c | 59 ++++++++++++++++++++++++- 11 files changed, 110 insertions(+), 21 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8693061d12..4f041518d5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1412,6 +1412,14 @@ typedef struct { SExplainExecInfo* subplanInfo; } SExplainRsp; +typedef struct { + SExplainRsp rsp; + uint64_t qId; + uint64_t tId; + int64_t rId; + int32_t eId; +} SExplainLocalRsp; + typedef struct STableScanAnalyzeInfo { uint64_t totalRows; uint64_t totalCheckedRows; @@ -1426,6 +1434,7 @@ typedef struct STableScanAnalyzeInfo { int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); +void tFreeSExplainRsp(SExplainRsp *pRsp); typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 37f89caa41..86ae5ef155 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,11 +29,12 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); +typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { - void *handle; + void *handle; localFetchFp fp; + SArray *explainRes; } SLocalFetch; typedef struct { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 6062ffa3ef..99f5189228 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -99,9 +99,9 @@ void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); -int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes); -int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes); #ifdef __cplusplus } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b056677a03..3b29c17883 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4334,7 +4334,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1; if (pRsp->numOfPlans > 0) { - pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo)); + pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo)); if (pRsp->subplanInfo == NULL) return -1; } for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { @@ -4342,7 +4342,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1; - if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0) + if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0) return -1; } @@ -4352,6 +4352,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { return 0; } +void tFreeSExplainRsp(SExplainRsp *pRsp) { + if (NULL == pRsp) { + return; + } + + for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { + SExplainExecInfo *pExec = pRsp->subplanInfo + i; + taosMemoryFree(pExec->verboseInfo); + } + + taosMemoryFreeClear(pRsp->subplanInfo); +} + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 967c682b0b..618bbb7593 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -56,7 +56,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { int32_t num = taosArrayGetSize(group->nodeExecInfo); for (int32_t i = 0; i < num; ++i) { SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i); - taosMemoryFreeClear(rsp->subplanInfo); + tFreeSExplainRsp(rsp); } } @@ -1723,7 +1723,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId)); if (NULL == group) { qError("group %d not in groupHash", groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1732,7 +1732,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp)); if (NULL == group->nodeExecInfo) { qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1742,7 +1742,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t } else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) { qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo), group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -1751,13 +1751,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t if (group->physiPlanExecNum != pRspMsg->numOfPlans) { qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum, groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } taosArrayPush(group->nodeExecInfo, pRspMsg); + groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum); taosWUnLockLatch(&group->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 93303cb62d..d0f0a98e94 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -455,8 +455,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL int64_t threadId = taosGetSelfPthreadId(); if (pLocal) { - pTaskInfo->localFetch.handle = pLocal->handle; - pTaskInfo->localFetch.fp = pLocal->fp; + memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } taosArrayClearEx(pResList, freeBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1f3e5a4a5..dd00684ed1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2005,7 +2005,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf if (pSource->localExec) { SDataBuf pBuf = {0}; - int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData); + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); loadRemoteDataCallback(pWrapper, &pBuf, code); } else { qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 00588977bc..a5eab73086 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -58,7 +58,17 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { - + SExplainLocalRsp localRsp = {0}; + localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); + SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); + localRsp.rsp.subplanInfo = pExec; + localRsp.qId = qId; + localRsp.tId = tId; + localRsp.rId = rId; + localRsp.eId = eId; + taosArrayPush(ctx->explainRes, &localRsp); + taosArrayDestroy(execInfoList); } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; @@ -84,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch}; + SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 01d962a71f..e00cda399b 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -505,6 +505,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d79514a122..b2b824c33d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -128,10 +128,10 @@ _return: int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); + SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp)); } return TSDB_CODE_SUCCESS; @@ -366,7 +366,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SExplainRsp rsp = {0}; if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) { - taosMemoryFree(rsp.subplanInfo); + tFreeSExplainRsp(&rsp); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 3cecbc9ae3..05c24288c4 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -824,6 +824,53 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } +int32_t schHandleExplainRes(SArray *pExplainRes) { + int32_t code = 0; + int32_t resNum = taosArrayGetSize(pExplainRes); + if (resNum <= 0) { + goto _return; + } + + SSchTask *pTask = NULL; + SSchJob *pJob = NULL; + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg"); + + SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, localRsp->qId, localRsp->rId, localRsp->tId)); + + code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); + + if (pTask) { + SCH_UNLOCK_TASK(pTask); + } + + if (pJob) { + schReleaseJob(pJob->refId); + } + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg"); + + SCH_ERR_JRET(code); + + localRsp->rsp.numOfPlans = 0; + localRsp->rsp.subplanInfo = NULL; + } + +_return: + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + tFreeSExplainRsp(&localRsp->rsp); + } + + taosArrayDestroy(pExplainRes); + + SCH_RET(code); +} + int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { SSubplan *plan = pTask->plan; int32_t code = 0; @@ -864,11 +911,15 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { qwMsg.connInfo.handle = pJob->conn.pTrans; if (SCH_IS_EXPLAIN_JOB(pJob)) { - explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } + SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); } @@ -1008,10 +1059,14 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { SArray *explainRes = NULL; if (SCH_IS_EXPLAIN_JOB(pJob)) { - explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); From 9d148b42f98c4afa7dd5bbfdf5a3eaec7605c599 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 15 Sep 2022 15:15:35 +0800 Subject: [PATCH 4/8] enh: add client policy test cases --- source/libs/scheduler/src/schTask.c | 31 ++++++---- tests/system-test/fulltest.sh | 91 +++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 10 deletions(-) diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 70da8d4a16..e725fb7e45 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -842,26 +842,37 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { for (int32_t i = 0; i < resNum; ++i) { SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg"); + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); - SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, localRsp->qId, localRsp->rId, localRsp->tId)); - - code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); - - if (pTask) { - SCH_UNLOCK_TASK(pTask); + pJob = schAcquireJob(localRsp->rId); + if (NULL == pJob) { + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); } - - if (pJob) { + + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); schReleaseJob(pJob->refId); + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } + + code = schGetTaskInJob(pJob, localRsp->tId, &pTask); - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg"); + if (TSDB_CODE_SUCCESS == code) { + code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); + } + + schReleaseJob(pJob->refId); + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code); SCH_ERR_JRET(code); localRsp->rsp.numOfPlans = 0; localRsp->rsp.subplanInfo = NULL; + pTask = NULL; + pJob = NULL; } _return: diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 0b45b24258..4e8e9b4906 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -515,3 +515,94 @@ python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 #python3 ./test.py -f 2-query/sml.py -Q 3 python3 ./test.py -f 2-query/interp.py -Q 3 + +#------------querPolicy 4----------- + +python3 ./test.py -f 2-query/between.py -Q 4 +python3 ./test.py -f 2-query/distinct.py -Q 4 +python3 ./test.py -f 2-query/varchar.py -Q 4 +python3 ./test.py -f 2-query/ltrim.py -Q 4 +python3 ./test.py -f 2-query/rtrim.py -Q 4 +python3 ./test.py -f 2-query/length.py -Q 4 +python3 ./test.py -f 2-query/char_length.py -Q 4 +python3 ./test.py -f 2-query/upper.py -Q 4 +python3 ./test.py -f 2-query/lower.py -Q 4 +python3 ./test.py -f 2-query/join.py -Q 4 +python3 ./test.py -f 2-query/join2.py -Q 4 +python3 ./test.py -f 2-query/cast.py -Q 4 +python3 ./test.py -f 2-query/substr.py -Q 4 +python3 ./test.py -f 2-query/union.py -Q 4 +python3 ./test.py -f 2-query/union1.py -Q 4 +python3 ./test.py -f 2-query/concat.py -Q 4 +python3 ./test.py -f 2-query/concat2.py -Q 4 +python3 ./test.py -f 2-query/concat_ws.py -Q 4 +python3 ./test.py -f 2-query/concat_ws2.py -Q 4 +#python3 ./test.py -f 2-query/check_tsdb.py -Q 4 +python3 ./test.py -f 2-query/spread.py -Q 4 +python3 ./test.py -f 2-query/hyperloglog.py -Q 4 +python3 ./test.py -f 2-query/explain.py -Q 4 +python3 ./test.py -f 2-query/leastsquares.py -Q 4 +python3 ./test.py -f 2-query/timezone.py -Q 4 +python3 ./test.py -f 2-query/Now.py -Q 4 +python3 ./test.py -f 2-query/Today.py -Q 4 +python3 ./test.py -f 2-query/max.py -Q 4 +python3 ./test.py -f 2-query/min.py -Q 4 +python3 ./test.py -f 2-query/count.py -Q 4 +#python3 ./test.py -f 2-query/last.py -Q 4 +python3 ./test.py -f 2-query/first.py -Q 4 +python3 ./test.py -f 2-query/To_iso8601.py -Q 4 +python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 4 +python3 ./test.py -f 2-query/timetruncate.py -Q 4 +python3 ./test.py -f 2-query/diff.py -Q 4 +python3 ./test.py -f 2-query/Timediff.py -Q 4 +python3 ./test.py -f 2-query/json_tag.py -Q 4 +python3 ./test.py -f 2-query/top.py -Q 4 +python3 ./test.py -f 2-query/bottom.py -Q 4 +python3 ./test.py -f 2-query/percentile.py -Q 4 +python3 ./test.py -f 2-query/apercentile.py -Q 4 +python3 ./test.py -f 2-query/abs.py -Q 4 +python3 ./test.py -f 2-query/ceil.py -Q 4 +python3 ./test.py -f 2-query/floor.py -Q 4 +python3 ./test.py -f 2-query/round.py -Q 4 +python3 ./test.py -f 2-query/log.py -Q 4 +python3 ./test.py -f 2-query/pow.py -Q 4 +python3 ./test.py -f 2-query/sqrt.py -Q 4 +python3 ./test.py -f 2-query/sin.py -Q 4 +python3 ./test.py -f 2-query/cos.py -Q 4 +python3 ./test.py -f 2-query/tan.py -Q 4 +python3 ./test.py -f 2-query/arcsin.py -Q 4 +python3 ./test.py -f 2-query/arccos.py -Q 4 +python3 ./test.py -f 2-query/arctan.py -Q 4 +python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4 +# python3 ./test.py -f 2-query/nestedQuery.py -Q 4 +# python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 +# python3 ./test.py -f 2-query/avg.py -Q 4 +# python3 ./test.py -f 2-query/elapsed.py -Q 4 +python3 ./test.py -f 2-query/csum.py -Q 4 +#python3 ./test.py -f 2-query/mavg.py -Q 4 +python3 ./test.py -f 2-query/sample.py -Q 4 +python3 ./test.py -f 2-query/function_diff.py -Q 4 +python3 ./test.py -f 2-query/unique.py -Q 4 +python3 ./test.py -f 2-query/stateduration.py -Q 4 +python3 ./test.py -f 2-query/function_stateduration.py -Q 4 +python3 ./test.py -f 2-query/statecount.py -Q 4 +python3 ./test.py -f 2-query/tail.py -Q 4 +python3 ./test.py -f 2-query/ttl_comment.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_count.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_max.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_min.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_sum.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_spread.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_apercentile.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_avg.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 4 +python3 ./test.py -f 2-query/twa.py -Q 4 +python3 ./test.py -f 2-query/irate.py -Q 4 +python3 ./test.py -f 2-query/function_null.py -Q 4 +python3 ./test.py -f 2-query/count_partition.py -Q 4 +python3 ./test.py -f 2-query/max_partition.py -Q 4 +python3 ./test.py -f 2-query/last_row.py -Q 4 +python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 +#python3 ./test.py -f 2-query/sml.py -Q 4 +python3 ./test.py -f 2-query/interp.py -Q 4 + From a85a73e2de83aa42ed1341346caedc9cae531b22 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 15 Sep 2022 16:30:21 +0800 Subject: [PATCH 5/8] fix: fix memory leak issues --- include/libs/executor/executor.h | 1 + source/libs/command/src/explain.c | 4 +++- source/libs/executor/src/executorimpl.c | 15 ++++++++------- source/libs/qworker/src/qworker.c | 2 +- source/libs/scheduler/src/schRemote.c | 2 ++ source/libs/scheduler/src/schTask.c | 3 ++- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 86ae5ef155..8c1d957381 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -33,6 +33,7 @@ typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, i typedef struct { void *handle; + bool localExec; localFetchFp fp; SArray *explainRes; } SLocalFetch; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 618bbb7593..577e9cae1d 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -28,7 +28,7 @@ void qExplainFreeResNode(SExplainResNode *resNode) { return; } - taosMemoryFreeClear(resNode->pExecInfo); + taosArrayDestroy(resNode->pExecInfo); SNode *node = NULL; FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); } @@ -58,6 +58,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i); tFreeSExplainRsp(rsp); } + taosArrayDestroy(group->nodeExecInfo); } pIter = taosHashIterate(pCtx->groupHash, pIter); @@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { taosHashCleanup(pCtx->groupHash); taosArrayDestroy(pCtx->rows); + taosMemoryFreeClear(pCtx->tbuf); taosMemoryFree(pCtx); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 90560a028b..0834894350 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1778,12 +1778,6 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); @@ -1797,7 +1791,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf SDataBuf pBuf = {0}; int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); loadRemoteDataCallback(pWrapper, &pBuf, code); + taosMemoryFree(pWrapper); } else { + SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources); @@ -4051,7 +4052,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); - if (!pTaskInfo->localFetch.fp) { + if (!pTaskInfo->localFetch.localExec) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f174a163c9..4b3df21319 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -94,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes}; + SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 4b15629407..d540b17c90 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -371,6 +371,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); + + taosMemoryFreeClear(msg); break; } case TDMT_SCH_FETCH_RSP: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e725fb7e45..d463874a54 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -898,7 +898,8 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { pTask->msgLen); SCH_ERR_RET(code); } else { - SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); + //binary msg + //SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } From d2d761282ff66f1349df2fe8a8e8f69655e00218 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 16 Sep 2022 16:33:18 +0800 Subject: [PATCH 6/8] enh: support multiple exchange operators in explain --- source/libs/command/inc/commandInt.h | 2 ++ source/libs/command/src/explain.c | 46 ++++++++++++++++++---------- source/libs/scheduler/src/schTask.c | 10 ++++-- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 706985f894..ad677a81ff 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -99,8 +99,10 @@ extern "C" { typedef struct SExplainGroup { int32_t nodeNum; + int32_t nodeIdx; int32_t physiPlanExecNum; int32_t physiPlanExecIdx; + bool singleChannel; SRWLatch lock; SSubplan *plan; SArray *nodeExecInfo; //Array diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 577e9cae1d..b50aab9b1e 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -21,7 +21,7 @@ #include "tdatablock.h" int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); -int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel); void qExplainFreeResNode(SExplainResNode *resNode) { if (NULL == resNode) { @@ -250,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo return TSDB_CODE_SUCCESS; } -int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) { +int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) { *pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo)); if (NULL == (*pExecInfo)) { qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); @@ -258,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group } SExplainRsp *rsp = NULL; - for (int32_t i = 0; i < group->nodeNum; ++i) { - rsp = taosArrayGet(group->nodeExecInfo, i); - /* - if (group->physiPlanExecIdx >= rsp->numOfPlans) { - qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); - return TSDB_CODE_QRY_APP_ERROR; - } + if (group->singleChannel) { + if (0 == group->physiPlanExecIdx) { + group->nodeIdx = 0; + } + + rsp = taosArrayGet(group->nodeExecInfo, group->nodeIdx++); + if (group->physiPlanExecIdx >= rsp->numOfPlans) { + qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); + return TSDB_CODE_QRY_APP_ERROR; + } + + taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); + } else { + for (int32_t i = 0; i < group->nodeNum; ++i) { + rsp = taosArrayGet(group->nodeExecInfo, i); + if (group->physiPlanExecIdx >= rsp->numOfPlans) { + qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); + return TSDB_CODE_QRY_APP_ERROR; + } - taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); - */ - taosArrayPush(*pExecInfo, rsp->subplanInfo); + taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); + } } ++group->physiPlanExecIdx; @@ -293,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai resNode->pNode = pNode; if (group->nodeExecInfo) { - QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group)); + QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(pNode, &resNode->pExecInfo, group)); } QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren)); @@ -803,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } - QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1)); + QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1, pExchNode->singleChannel)); break; } case QUERY_NODE_PHYSICAL_PLAN_SORT: { @@ -1535,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32 return TSDB_CODE_SUCCESS; } -int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) { SExplainResNode *node = NULL; int32_t code = 0; SExplainCtx *ctx = (SExplainCtx *)pCtx; @@ -1546,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } + group->singleChannel = singleChannel; + group->physiPlanExecIdx = 0; + QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node)); QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level)); @@ -1709,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) { } int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { - QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); + QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false)); QRY_ERR_RET(qExplainAppendPlanRows(pCtx)); QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d463874a54..b6f96a188e 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -21,6 +21,7 @@ #include "tref.h" #include "trpc.h" #include "qworker.h" +#include "tglobal.h" void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); @@ -897,9 +898,12 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); SCH_ERR_RET(code); - } else { - //binary msg - //SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); + } else if (tsQueryPlannerTrace) { + char *msg = NULL; + int32_t msgLen = 0; + qSubPlanToString(plan, &msg, &msgLen); + SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); + taosMemoryFree(msg); } } From 2750ace3c311fdf88e48d8dc46f0316dcdb96a62 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 23 Sep 2022 16:37:09 +0800 Subject: [PATCH 7/8] fix: fix insert into select issue --- source/client/src/clientImpl.c | 37 ++++++++---------------------- source/libs/scheduler/inc/schInt.h | 6 +++-- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 865ab8f611..be907ea1e2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) { SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); - + char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; + int32_t dbNum = taosArrayGetSize(pDbVgList); for (int32_t i = 0; i < dbNum; ++i) { SArray* pVg = taosArrayGetP(pDbVgList, i); @@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr int32_t vnodeNum = taosArrayGetSize(nodeList); if (vnodeNum > 0) { - tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum); goto _return; } int32_t mnodeNum = taosArrayGetSize(pMnodeList); if (mnodeNum <= 0) { - tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId); + tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy); goto _return; } void* pData = taosArrayGet(pMnodeList, 0); taosArrayAddBatch(nodeList, pData, mnodeNum); - tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum); _return: @@ -555,27 +556,14 @@ _return: return TSDB_CODE_SUCCESS; } - -int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) { - *pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SQueryNodeLoad load = {0}; - load.addr.nodeId = CLIENT_HANDLE; - - taosArrayPush(*pNodeList, &load); - - tscDebug("0x%" PRIx64 " client policy", pRequest->requestId); - - return TSDB_CODE_SUCCESS; -} - - int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) { SArray* pDbVgList = NULL; SArray* pQnodeList = NULL; int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { if (pResultMeta) { pDbVgList = taosArrayInit(4, POINTER_BYTES); @@ -614,10 +602,6 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } - case QUERY_POLICY_CLIENT: { - code = buildClientPolicyNodeList(pRequest, pNodeList); - break; - } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -640,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { int32_t dbNum = taosArrayGetSize(pRequest->dbList); if (dbNum > 0) { SCatalog* pCtg = NULL; @@ -678,10 +663,6 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } - case QUERY_POLICY_CLIENT: { - code = buildClientPolicyNodeList(pRequest, pNodeList); - break; - } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index e017606035..7ced4f626c 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -236,6 +236,7 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; bool queryJob; + bool insertJob; bool needFetch; bool needFlowCtrl; bool localExec; @@ -307,7 +308,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) -#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task)) +#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task))) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -330,8 +331,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) -#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) +#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob) #define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) From 5deb0e25d5f193763f8a974f7fb1007bff3d4510 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Fri, 23 Sep 2022 16:38:20 +0800 Subject: [PATCH 8/8] build: close default gtest compile and docker --- Jenkinsfile | 2 +- Jenkinsfile2 | 4 +- cmake/cmake.options | 34 +++--------- packaging/docker/dockerManifest.sh | 89 ++++++++++++++++++------------ packaging/docker/dockerbuild.sh | 22 -------- packaging/release.bat | 3 - source/libs/sync/CMakeLists.txt | 4 +- 7 files changed, 67 insertions(+), 91 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 4b84e1f88e..0842795f0a 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -79,7 +79,7 @@ def pre_test(){ rm -rf debug mkdir debug cd debug - cmake .. > /dev/null + cmake .. -DBUILD_TEST=true > /dev/null make -j4> /dev/null ''' diff --git a/Jenkinsfile2 b/Jenkinsfile2 index b95b3ff86b..ac152e2aa4 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -173,7 +173,7 @@ def pre_test_build_mac() { ''' sh ''' cd ${WK}/debug - cmake .. + cmake .. -DBUILD_TEST=true make -j8 ''' sh ''' @@ -302,7 +302,7 @@ def pre_test_build_win() { set CL=/MP8 echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake" time /t - cmake .. -G "NMake Makefiles JOM" || exit 7 + cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true || exit 7 echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6" time /t jom -j 6 || exit 8 diff --git a/cmake/cmake.options b/cmake/cmake.options index 3baccde4d7..60ff00affc 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -2,6 +2,12 @@ # Deps options # ========================================================= +option( + BUILD_TEST + "If build unit tests using googletest" + OFF +) + IF(${TD_WINDOWS}) MESSAGE("build pthread Win32") @@ -45,12 +51,6 @@ IF(${TD_WINDOWS}) "If build wingetopt on Windows" ON ) - - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) option( TDENGINE_3 @@ -65,28 +65,8 @@ IF(${TD_WINDOWS}) ) ELSEIF (TD_DARWIN_64) - add_definitions(-DCOMPILER_SUPPORTS_CXX13) - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) -ELSE () - include(CheckCXXCompilerFlag) - CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13) - IF(${COMPILER_SUPPORTS_CXX13}) + IF(${BUILD_TEST}) add_definitions(-DCOMPILER_SUPPORTS_CXX13) - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) - ELSE () - option( - BUILD_TEST - "If build unit tests using googletest" - OFF - ) ENDIF () ENDIF () diff --git a/packaging/docker/dockerManifest.sh b/packaging/docker/dockerManifest.sh index 71788423f6..8f71e30fbd 100755 --- a/packaging/docker/dockerManifest.sh +++ b/packaging/docker/dockerManifest.sh @@ -1,6 +1,7 @@ #!/bin/bash set -e #set -x +set -v # dockerbuild.sh # -n [version number] @@ -11,8 +12,9 @@ set -e version="" passWord="" verType="" +dockerLatest="n" -while getopts "hn:p:V:" arg +while getopts "hn:p:V:a:" arg do case $arg in n) @@ -29,9 +31,15 @@ do ;; h) echo "Usage: `basename $0` -n [version number] " - echo " -p [password for docker hub] " + echo " -p [password for docker hub] " + echo " -V [stable |beta] " + echo " -a [y | n ] " exit 0 ;; + a) + #echo "dockerLatest=$OPTARG" + dockerLatest=$(echo $OPTARG) + ;; ?) #unknow option echo "unkonw argument" exit 1 @@ -41,42 +49,55 @@ done echo "version=${version}" -#docker manifest rm tdengine/tdengine -#docker manifest rm tdengine/tdengine:${version} -if [ "$verType" == "beta" ]; then - docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version} - docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest - docker manifest rm tdengine/tdengine-beta:${version} - docker manifest rm tdengine/tdengine-beta:latest - docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version} - docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest - docker manifest inspect tdengine/tdengine:latest - docker manifest inspect tdengine/tdengine:${version} - docker login -u tdengine -p ${passWord} #replace the docker registry username and password - docker manifest push tdengine/tdengine-beta:${version} - docker manifest push tdengine/tdengine-beta:latest -elif [ "$verType" == "stable" ]; then - docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} - docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest - docker manifest rm tdengine/tdengine:latest - docker manifest rm tdengine/tdengine:${version} - docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} - docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest - docker manifest inspect tdengine/tdengine:latest - docker manifest inspect tdengine/tdengine:${version} - docker login -u tdengine -p ${passWord} #replace the docker registry username and password - docker manifest push tdengine/tdengine:${version} - docker manifest push tdengine/tdengine:latest -else +if [ "$verType" == "stable" ]; then + verType=stable + dockerinput=TDengine-server-${version}-Linux-$cpuType.tar.gz + dockerinput_x64=TDengine-server-${version}-Linux-amd64.tar.gz + dockerim=tdengine/tdengine + dockeramd64=tdengine/tdengine-amd64 + dockeraarch64=tdengine/tdengine-aarch64 + dockeraarch32=tdengine/tdengine-aarch32 +elif [ "$verType" == "beta" ];then + verType=beta + tagVal=ver-${version}-beta + dockerinput=TDengine-server-${version}-${verType}-Linux-$cpuType.tar.gz + dockerinput_x64=TDengine-server-${version}-${verType}-Linux-amd64.tar.gz + dockerim=tdengine/tdengine-beta + dockeramd64=tdengine/tdengine-amd64-beta + dockeraarch64=tdengine/tdengine-aarch64-beta + dockeraarch32=tdengine/tdengine-aarch32-beta + else echo "unknow verType, nor stabel or beta" exit 1 fi -# docker manifest create -a tdengine/${dockername}:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} -# docker manifest create -a tdengine/${dockername}:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest +username="tdengine" -# docker login -u tdengine -p ${passWord} #replace the docker registry username and password +# generate docker verison +echo "generate ${dockerim}:${version}" +docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version} +docker manifest inspect ${dockerim}:${version} +docker manifest rm ${dockerim}:${version} +docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version} +docker manifest inspect ${dockerim}:${version} +docker login -u ${username} -p ${passWord} +docker manifest push ${dockerim}:${version} + + +# generate docker latest +echo "generate ${dockerim}:latest " + +if [ ${dockerLatest} == 'y' ] ;then + echo "docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest" + docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest + docker manifest inspect ${dockerim}:latest + docker manifest rm ${dockerim}:latest + docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest + docker manifest inspect ${dockerim}:latest + docker login -u tdengine -p ${passWord} #replace the docker registry username and password + docker manifest push ${dockerim}:latest + docker pull tdengine/tdengine:latest + +fi -# docker manifest push tdengine/tdengine:latest -# # how set latest version ??? diff --git a/packaging/docker/dockerbuild.sh b/packaging/docker/dockerbuild.sh index 541ae6ec13..8b0b0c190c 100755 --- a/packaging/docker/dockerbuild.sh +++ b/packaging/docker/dockerbuild.sh @@ -149,26 +149,4 @@ rm -rf temp1.data if [ ${dockerLatest} == 'y' ] ;then docker tag tdengine/tdengine-${dockername}:${version} tdengine/tdengine-${dockername}:latest docker push tdengine/tdengine-${dockername}:latest - echo ">>>>>>>>>>>>> check whether tdengine/tdengine-${dockername}:latest has been published correctly" - docker run -d --name doctestla -p 7030-7049:6030-6049 -p 7030-7049:6030-6049/udp tdengine/tdengine-${dockername}:latest - sleep 2 - curl -u root:taosdata -d 'show variables;' 127.0.0.1:7041/rest/sql > temp2.data - version_latest=` cat temp2.data |jq .data| jq '.[]' |grep "version" -A 2 -B 1 | jq ".[1]" ` - echo "${version_latest}" - if [ "${version_latest}" == "\"${version}\"" ] ; then - echo "docker version is right " - else - echo "docker version is wrong " - exit 1 - fi fi -rm -rf temp2.data - -if [ -n "$(docker ps -aq)" ] ;then - echo "delte docker process" - docker stop $(docker ps -aq) - docker rm $(docker ps -aq) -fi - -cd ${scriptDir} -rm -f ${pkgFile} diff --git a/packaging/release.bat b/packaging/release.bat index 591227382f..b87ae68e2b 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -44,8 +44,6 @@ cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=tru cmake --build . rd /s /Q C:\TDengine cmake --install . -for /r c:\TDengine %%i in (*.dll) do signtool sign /f D:\\123.pfx /p taosdata %%i -for /r c:\TDengine %%i in (*.exe) do signtool sign /f D:\\123.pfx /p taosdata %%i if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1) cd %package_dir% iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release @@ -53,7 +51,6 @@ if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% faile iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1) -for /r ..\release %%i in (*.exe) do signtool sign /f d:\\123.pfx /p taosdata %%i goto EXIT0 :USAGE diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 551849c6f2..6025070cb7 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -15,6 +15,6 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -if(${BUILD_TEST}) +if(BUILD_TEST AND BUILD_SYNC_TEST) add_subdirectory(test) -endif(${BUILD_TEST}) +endif()