From 38fa01a6babc9b1770b11ce001010dfaec035c80 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Jun 2022 10:23:52 +0800 Subject: [PATCH 1/5] remove offline sch --- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwUtil.c | 17 ++++++++++++++++- source/libs/qworker/src/qworker.c | 5 +++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 082db6428f..9ff49bef4f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -355,7 +355,7 @@ int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); -void qwClearExpiredSch(SArray* pExpiredSch); +void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); void qwDbgDumpMgmtInfo(SQWorker *mgmt); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8bfb80f061..30337ade66 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -536,8 +536,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { } -void qwClearExpiredSch(SArray* pExpiredSch) { +void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) { + int32_t num = taosArrayGetSize(pExpiredSch); + for (int32_t i = 0; i < num; ++i) { + uint64_t *sId = taosArrayGet(pExpiredSch, i); + SQWSchStatus *pSch = NULL; + if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + continue; + } + if (taosHashGetSize(pSch->tasksHash) <= 0) { + qwDestroySchStatus(pSch); + taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qError("sch %" PRIx64 "destroyed", *sId); + } + + qwReleaseScheduler(QW_WRITE, mgmt); + } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 44a8fdf7f4..8e0d14996d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -743,9 +743,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { } QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); + sch->hbBrokenTs = 0; + QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { @@ -865,7 +866,7 @@ _return: } if (taosArrayGetSize(pExpiredSch) > 0) { - qwClearExpiredSch(pExpiredSch); + qwClearExpiredSch(mgmt, pExpiredSch); } taosMemoryFreeClear(rspList); From 73d4e08ee6dd120ea082788839ed944b1325e47c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Jun 2022 13:39:34 +0800 Subject: [PATCH 2/5] sync update cache --- source/libs/catalog/inc/catalogInt.h | 7 +- source/libs/catalog/src/catalog.c | 17 +--- source/libs/catalog/src/ctgCache.c | 114 ++++++++++++++------------- 3 files changed, 64 insertions(+), 74 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9219a382e4..0aff0d1aa6 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation { int32_t opId; void *data; bool syncOp; - uint64_t seqId; + tsem_t rspSem; } SCtgCacheOperation; typedef struct SCtgQNode { - SCtgCacheOperation op; + SCtgCacheOperation *op; struct SCtgQNode *next; } SCtgQNode; typedef struct SCtgQueue { SRWLatch qlock; - uint64_t seqId; - uint64_t seqDone; SCtgQNode *head; SCtgQNode *tail; tsem_t reqSem; - tsem_t rspSem; uint64_t qRemainNum; } SCtgQueue; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 040156eca2..461cfa4d7e 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); } - if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) { - qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); - CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); - } - gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == gCtgMgmt.queue.head) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); @@ -1191,17 +1186,13 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); - if (tsem_post(&gCtgMgmt.queue.reqSem)) { - qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); - } - - if (tsem_post(&gCtgMgmt.queue.rspSem)) { - qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); - } - while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { taosUsleep(1); } + + if (tsem_post(&gCtgMgmt.queue.reqSem)) { + qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + } CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 8332c7b068..637cb97d3a 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -501,25 +501,6 @@ _return: return TSDB_CODE_SUCCESS; } - -void ctgWaitOpDone(SCtgCacheOperation *action) { - while (true) { - tsem_wait(&gCtgMgmt.queue.rspSem); - - if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { - tsem_post(&gCtgMgmt.queue.rspSem); - break; - } - - if (gCtgMgmt.queue.seqDone >= action->seqId) { - break; - } - - tsem_post(&gCtgMgmt.queue.rspSem); - sched_yield(); - } -} - void ctgDequeue(SCtgCacheOperation **op) { SCtgQNode *orig = gCtgMgmt.queue.head; @@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) { taosMemoryFreeClear(orig); - *op = &node->op; + *op = node->op; } @@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { CTG_RET(TSDB_CODE_CTG_MEM_ERROR); } - operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); + if (operation->syncOp) { + tsem_init(&operation->rspSem, 0, 0); + } - node->op = *operation; + node->op = operation; CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); gCtgMgmt.queue.tail->next = node; @@ -558,7 +541,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); if (operation->syncOp) { - ctgWaitOpDone(operation); + tsem_wait(&operation->rspSem); } return TSDB_CODE_SUCCESS; @@ -567,7 +550,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_DROP_DB_CACHE; + SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); @@ -583,21 +568,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); msg->dbId = dbId; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(action.data); + taosMemoryFreeClear(op->data); CTG_RET(code); } int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_DROP_DB_VGROUP; + op->syncOp = syncOp; + SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); @@ -612,15 +600,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) msg->pCtg = pCtg; strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(action.data); + taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -628,7 +616,10 @@ _return: int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_DROP_STB_META; + op->syncOp = syncOp; + SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); @@ -641,15 +632,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, msg->dbId = dbId; msg->suid = suid; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(action.data); + taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -657,7 +648,10 @@ _return: int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_DROP_TB_META; + op->syncOp = syncOp; + SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); @@ -669,21 +663,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, strncpy(msg->tbName, tbName, sizeof(msg->tbName)); msg->dbId = dbId; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(action.data); + taosMemoryFreeClear(op->data); CTG_RET(code); } int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_VGROUP; + op->syncOp = syncOp; + SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); @@ -701,22 +698,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId msg->dbId = dbId; msg->dbInfo = dbInfo; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; _return: ctgFreeVgInfo(dbInfo); - taosMemoryFreeClear(action.data); + taosMemoryFreeClear(op->data); CTG_RET(code); } int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_TB_META; + op->syncOp = syncOp; + SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); @@ -731,9 +731,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy msg->pCtg = pCtg; msg->output = output; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; @@ -746,7 +746,9 @@ _return: int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { int32_t code = 0; - SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_VG_EPSET; + SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); @@ -758,9 +760,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp msg->vgId = vgId; msg->epSet = *pEpSet; - operation.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &operation)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; @@ -775,7 +777,10 @@ _return: int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { int32_t code = 0; - SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncOp = syncOp}; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + op->opId = CTG_OP_UPDATE_USER; + op->syncOp = syncOp; + SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); @@ -785,9 +790,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp msg->pCtg = pCtg; msg->userAuth = *pAuth; - action.data = msg; + op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; @@ -1622,7 +1627,6 @@ void* ctgUpdateThreadFunc(void* param) { } if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { - tsem_post(&gCtgMgmt.queue.rspSem); break; } @@ -1634,10 +1638,8 @@ void* ctgUpdateThreadFunc(void* param) { (*gCtgCacheOperation[operation->opId].func)(operation); - gCtgMgmt.queue.seqDone = operation->seqId; - if (operation->syncOp) { - tsem_post(&gCtgMgmt.queue.rspSem); + tsem_post(&operation->rspSem); } CTG_RT_STAT_INC(qDoneNum, 1); From dbe1d1462d845157dfc15ecd61053a44ea29b089 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Jun 2022 13:54:13 +0800 Subject: [PATCH 3/5] clean cache queue --- source/libs/catalog/src/ctgCache.c | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 637cb97d3a..462678f79a 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -542,6 +542,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { if (operation->syncOp) { tsem_wait(&operation->rspSem); + taosMemoryFree(operation); } return TSDB_CODE_SUCCESS; @@ -1612,6 +1613,30 @@ void ctgUpdateThreadUnexpectedStopped(void) { if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); } +void ctgCleanupCacheQueue(void) { + SCtgQNode *node = gCtgMgmt.queue.head->next; + SCtgQNode *nodeNext = NULL; + + while (node) { + if (node->op) { + taosMemoryFree(node->op->data); + if (node->op->syncOp) { + tsem_post(&node->op->rspSem); + } else { + taosMemoryFree(node->op); + } + } + + nodeNext = node->next; + taosMemoryFree(node); + + node = nodeNext; + } + + taosMemoryFreeClear(gCtgMgmt.queue.head); + gCtgMgmt.queue.tail = NULL; +} + void* ctgUpdateThreadFunc(void* param) { setThreadName("catalog"); #ifdef WINDOWS @@ -1627,6 +1652,7 @@ void* ctgUpdateThreadFunc(void* param) { } if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + ctgCleanupCacheQueue(); break; } From 39848d24f93b22084af6b8477bc7aee76e8c5cf6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 7 Jun 2022 09:34:47 +0800 Subject: [PATCH 4/5] fix catalog quit issue --- source/libs/catalog/src/catalog.c | 8 +++---- source/libs/catalog/src/ctgCache.c | 36 +++++++++++++++++++----------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 461cfa4d7e..270d0b5f13 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1186,13 +1186,13 @@ void catalogDestroy(void) { atomic_store_8((int8_t*)&gCtgMgmt.exit, true); - while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { - taosUsleep(1); - } - if (tsem_post(&gCtgMgmt.queue.reqSem)) { qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); } + + while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { + taosUsleep(1); + } CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 462678f79a..59aab9fd22 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1614,23 +1614,32 @@ void ctgUpdateThreadUnexpectedStopped(void) { } void ctgCleanupCacheQueue(void) { - SCtgQNode *node = gCtgMgmt.queue.head->next; + SCtgQNode *node = NULL; SCtgQNode *nodeNext = NULL; - - while (node) { - if (node->op) { - taosMemoryFree(node->op->data); - if (node->op->syncOp) { - tsem_post(&node->op->rspSem); - } else { - taosMemoryFree(node->op); + + while (true) { + node = gCtgMgmt.queue.head->next; + while (node) { + if (node->op) { + taosMemoryFree(node->op->data); + if (node->op->syncOp) { + tsem_post(&node->op->rspSem); + } else { + taosMemoryFree(node->op); + } } + + nodeNext = node->next; + taosMemoryFree(node); + + node = nodeNext; } - nodeNext = node->next; - taosMemoryFree(node); - - node = nodeNext; + if (CTG_IS_LOCKED(&gCtgMgmt.lock)) { + taosUsleep(1); + } else { + break; + } } taosMemoryFreeClear(gCtgMgmt.queue.head); @@ -1652,6 +1661,7 @@ void* ctgUpdateThreadFunc(void* param) { } if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); ctgCleanupCacheQueue(); break; } From 9450c959d716809538061b03eec0f786c891adeb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 7 Jun 2022 17:25:01 +0800 Subject: [PATCH 5/5] get table index --- include/common/tmsg.h | 26 +++++ include/common/tmsgdef.h | 1 + include/libs/catalog/catalog.h | 3 +- source/common/src/tmsg.c | 96 ++++++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndSma.c | 104 ++++++++++++++++++++ source/libs/catalog/inc/catalogInt.h | 1 + source/libs/catalog/src/catalog.c | 11 +++ source/libs/catalog/src/ctgRemote.c | 53 ++++++++++ source/libs/qcom/src/querymsg.c | 69 +++++++++---- source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/schJob.c | 35 ++++--- 13 files changed, 370 insertions(+), 32 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 47c26ed431..0f527c3b8c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2459,6 +2459,32 @@ typedef struct { int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp); int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp); +typedef struct { + char tbFName[TSDB_TABLE_FNAME_LEN]; +} STableIndexReq; + +int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); +int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); + +typedef struct { + int8_t intervalUnit; + int8_t slidingUnit; + int64_t interval; + int64_t offset; + int64_t sliding; + int64_t dstTbUid; + int32_t dstVgId; // for stream + char* expr; +} STableIndexInfo; + +typedef struct { + SArray* pIndex; +} STableIndexRsp; + +int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); +int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); + + typedef struct { int8_t mqMsgType; int32_t code; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 73f8515f22..31bcd8c48b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -130,6 +130,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ee237741c3..21fee3cc3e 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); +int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes); + int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); @@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); - int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7041c9478e..e41272b69e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2392,6 +2392,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp return 0; } +int32_t tSerializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->tbFName) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->tbFName) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) { + if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1; + if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->offset) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->sliding) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->dstTbUid) < 0) return -1; + if (tEncodeI32(pEncoder, pInfo->dstVgId) < 0) return -1; + if (tEncodeCStr(pEncoder, pInfo->expr) < 0) return -1; + return 0; +} + +int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp *pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + int32_t num = taosArrayGetSize(pRsp->pIndex); + if (tEncodeI32(&encoder, num) < 0) return -1; + if (num > 0) { + for (int32_t i = 0; i < num; ++i) { + STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i); + if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1; + } + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSTableIndexInfo(SDecoder *pDecoder, STableIndexInfo *pInfo) { + if (tDecodeI8(pDecoder, &pInfo->intervalUnit) < 0) return -1; + if (tDecodeI8(pDecoder, &pInfo->slidingUnit) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->interval) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->offset) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->sliding) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->dstTbUid) < 0) return -1; + if (tDecodeI32(pDecoder, &pInfo->dstVgId) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pInfo->expr) < 0) return -1; + + return 0; +} + +int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num > 0) { + pRsp->pIndex = taosArrayInit(num, sizeof(STableIndexInfo)); + if (NULL == pRsp->pIndex) return -1; + STableIndexInfo info; + for (int32_t i = 0; i < num; ++i) { + if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1; + if (NULL == taosArrayPush(pRsp->pIndex, &info)) { + taosMemoryFree(info.expr); + return -1; + } + } + } + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 27f5e6f7af..0ca7ce6b51 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8ea172ef94..382f8dd55f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -309,6 +309,7 @@ typedef struct { int8_t slidingUnit; int8_t timezone; int32_t dstVgId; // for stream + int64_t dstTbUid; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index bcc413c1de..1d47f8fc7a 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); +static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); @@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma); @@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp return code; } +static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) { + int32_t code = 0; + SSmaObj *pSma = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + STableIndexInfo info; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) { + continue; + } + + info.intervalUnit = pSma->intervalUnit; + info.slidingUnit = pSma->slidingUnit; + info.interval = pSma->interval; + info.offset = pSma->offset; + info.sliding = pSma->sliding; + info.dstTbUid = pSma->dstTbUid; + info.dstVgId = pSma->dstVgId; + info.expr = taosMemoryMalloc(pSma->exprLen + 1); + if (info.expr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + sdbRelease(pSdb, pSma); + return code; + } + + memcpy(info.expr, pSma->expr, pSma->exprLen); + info.expr[pSma->exprLen] = 0; + + if (NULL == taosArrayPush(rsp->pIndex, &info)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + taosMemoryFree(info.expr); + sdbRelease(pSdb, pSma); + return code; + } + + *exist = true; + + sdbRelease(pSdb, pSma); + } + + return code; +} + static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) { SUserIndexReq indexReq = {0}; SMnode *pMnode = pReq->info.node; @@ -916,6 +967,59 @@ _OVER: return code; } +static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) { + STableIndexReq indexReq = {0}; + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + STableIndexRsp rsp = {0}; + bool exist = false; + + if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo)); + if (NULL == rsp.pIndex) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + + code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist); + if (code) { + goto _OVER; + } + + if (!exist) { + code = -1; + terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST; + } else { + int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + + tSerializeSTableIndexRsp(pRsp, contLen, &rsp); + + pReq->info.rsp = pRsp; + pReq->info.rspLen = contLen; + + code = 0; + } + +_OVER: + if (code != 0) { + mError("failed to get table index %s since %s", indexReq.tbFName, terrstr()); + } + + return code; +} + + static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index eb073cf6f1..260ed1977e 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -490,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask); +int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 270d0b5f13..0f6a79c14c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1136,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); } +int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL)); +} + + int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 188f5b44c0..94c59ddc53 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("Got index from mnode, indexName:%s", target); break; } + case TDMT_MND_GET_TABLE_INDEX: { + if (TSDB_CODE_SUCCESS != rspCode) { + qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target); + CTG_ERR_RET(rspCode); + } + + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); + if (code) { + qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target); + CTG_ERR_RET(code); + } + + qDebug("Got table index from mnode, tbFName:%s", target); + break; + } case TDMT_MND_RETRIEVE_FUNC: { if (TSDB_CODE_SUCCESS != rspCode) { qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); @@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo * return TSDB_CODE_SUCCESS; } +int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) { + char *msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_MND_GET_TABLE_INDEX; + void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; + + ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName); + CTG_ERR_RET(code); + } + + if (pTask) { + void* pOut = taosMemoryCalloc(1, POINTER_BYTES); + if (NULL == pOut) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); + + CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen)); + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { char *msg = NULL; int32_t msgLen = 0; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 949a58c2fa..6de1620de1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + STableIndexReq indexReq = {0}; + strcpy(indexReq.tbFName, input); + + int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq); + void *pBuf = (*mallcFp)(bufLen); + tSerializeSTableIndexReq(pBuf, bufLen, &indexReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; @@ -459,26 +477,43 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + STableIndexRsp out = {0}; + if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) { + qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + *(void **)output = out.pIndex; + + return TSDB_CODE_SUCCESS; +} + void initQueryModuleMsgHandle() { - queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; - + queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg; - queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp; } #pragma GCC diagnostic pop diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d961780ae5..9e302e569a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -207,6 +207,7 @@ typedef struct SSchJob { SArray *dataSrcTasks; // SArray int32_t levelIdx; SEpSet dataSrcEps; + SHashObj *taskList; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 7a0272a4d0..ddbd171a79 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -63,6 +63,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); } + + pJob->taskList = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->taskList) { + SCH_JOB_ELOG("taosHashInit %d taskList failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); @@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0}; - SSchTask *pTask = &task; - SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); - void *p = taosArrayPush(pLevel->subTasks, &task); - if (NULL == p) { + SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task); + if (NULL == pTask) { SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p)); + SCH_ERR_JRET(schRecordQueryDataSrc(pJob, pTask)); - if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { + SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + ++pJob->taskNum; } @@ -1276,14 +1286,10 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas } int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { - schGetTaskFromList(pJob->execTasks, taskId, pTask); + schGetTaskFromList(pJob->taskList, taskId, pTask); if (NULL == *pTask) { - schGetTaskFromList(pJob->succTasks, taskId, pTask); - - if (NULL == *pTask) { - SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } + SCH_JOB_ELOG("task not found in job task list, taskId:%" PRIx64, taskId); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } return TSDB_CODE_SUCCESS; @@ -1445,7 +1451,8 @@ void schFreeJobImpl(void *job) { taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->succTasks); - + taosHashCleanup(pJob->taskList); + taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); taosArrayDestroy(pJob->dataSrcTasks);