From 8f16b9b8c11eca9e463cb4fbcaf5c48a4a8fcfb7 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 15:59:41 +0800 Subject: [PATCH 1/9] catalog force update --- include/libs/catalog/catalog.h | 3 +- include/libs/qcom/query.h | 2 +- source/client/src/clientEnv.c | 8 +-- source/client/src/clientImpl.c | 14 +++++ source/client/src/clientMsgHandler.c | 2 +- source/libs/catalog/inc/catalogInt.h | 21 +++++-- source/libs/catalog/src/ctgAsync.c | 6 ++ source/libs/catalog/src/ctgCache.c | 80 ++++++++++++++++++++++++--- source/libs/catalog/src/ctgDbg.c | 3 +- source/libs/scheduler/src/schJob.c | 25 +++++++-- source/libs/scheduler/src/schRemote.c | 1 + 11 files changed, 136 insertions(+), 29 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f0e642bc9a..ee237741c3 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -68,6 +68,7 @@ typedef struct SCatalogReq { SArray *pIndex; // element is index name SArray *pUser; // element is SUserAuthInfo bool qNodeRequired; // valid qnode + bool forceUpdate; } SCatalogReq; typedef struct SMetaData { @@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); -int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId); +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 45a7e9a29f..a17be44846 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t || (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB) #define NEED_SCHEDULER_RETRY_ERROR(_code) \ - ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) + ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define REQUEST_MAX_TRY_TIMES 1 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 4b39a51584..9afb989d27 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -223,6 +223,10 @@ static void doDestroyRequest(void *p) { taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); + } + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pDb); @@ -230,10 +234,6 @@ static void doDestroyRequest(void *p) { doFreeReqResultInfo(&pRequest->body.resInfo); qDestroyQueryPlan(pRequest->body.pDag); - if (pRequest->body.queryJob != 0) { - schedulerFreeJob(pRequest->body.queryJob); - } - taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 774ef5f248..9762ac6ba5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -741,6 +741,20 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { do { destroyRequest(pRequest); pRequest = launchQuery(pTscObj, sql, sqlLen); + if (*sql == 'y') { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); + break; + } else if (*sql == 'z') { + SCatalog *pCatalog = NULL; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); + break; + } + if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 1039d36362..2d5199f181 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { taosMemoryFreeClear(output.dbVgroup); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); - } else if (output.dbVgroup) { + } else if (output.dbVgroup && output.dbVgroup->vgHash) { struct SCatalog* pCatalog = NULL; int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index cebe696390..9219a382e4 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -52,6 +52,7 @@ enum { CTG_OP_UPDATE_VGROUP = 0, CTG_OP_UPDATE_TB_META, CTG_OP_DROP_DB_CACHE, + CTG_OP_DROP_DB_VGROUP, CTG_OP_DROP_STB_META, CTG_OP_DROP_TB_META, CTG_OP_UPDATE_USER, @@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg { STableMetaOutput* output; } SCtgUpdateTblMsg; -typedef struct SCtgRemoveDBMsg { +typedef struct SCtgDropDBMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; uint64_t dbId; -} SCtgRemoveDBMsg; +} SCtgDropDBMsg; -typedef struct SCtgRemoveStbMsg { +typedef struct SCtgDropDbVgroupMsg { + SCatalog* pCtg; + char dbFName[TSDB_DB_FNAME_LEN]; +} SCtgDropDbVgroupMsg; + + +typedef struct SCtgDropStbMetaMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; uint64_t dbId; uint64_t suid; -} SCtgRemoveStbMsg; +} SCtgDropStbMetaMsg; -typedef struct SCtgRemoveTblMsg { +typedef struct SCtgDropTblMetaMsg { SCatalog* pCtg; char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; uint64_t dbId; -} SCtgRemoveTblMsg; +} SCtgDropTblMetaMsg; typedef struct SCtgUpdateUserMsg { SCatalog* pCtg; @@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action); int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action); int32_t ctgOpDropDbCache(SCtgCacheOperation *action); +int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action); int32_t ctgOpDropStbMeta(SCtgCacheOperation *action); int32_t ctgOpDropTbMeta(SCtgCacheOperation *action); int32_t ctgOpUpdateUser(SCtgCacheOperation *action); @@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); +int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq); int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq); int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 312f0c9250..47ef45d7d7 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* int32_t taskIdx = 0; for (int32_t i = 0; i < dbVgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + if (pReq->forceUpdate) { + ctgDropDbVgroupEnqueue(pCtg, dbFName, true); + } CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); } @@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); + if (pReq->forceUpdate) { + catalogRemoveTableMeta(pCtg, name); + } CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 0f1344c343..8332c7b068 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = { "drop DB", ctgOpDropDbCache }, + { + CTG_OP_DROP_DB_VGROUP, + "drop DBVgroup", + ctgOpDropDbVgroup + }, { CTG_OP_DROP_STB_META, "drop stbMeta", @@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; - SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg)); + SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -590,13 +595,43 @@ _return: CTG_RET(code); } +int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; + SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + char *p = strchr(dbFName, '.'); + if (p && CTG_IS_SYS_DBNAME(p + 1)) { + dbFName = p + 1; + } + + msg->pCtg = pCtg; + strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); + + action.data = msg; + + CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + taosMemoryFreeClear(action.data); + CTG_RET(code); +} + + int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; - SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg)); + SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -623,9 +658,9 @@ _return: int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t code = 0; SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; - SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg)); + SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1281,7 +1316,7 @@ _return: int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveDBMsg *msg = operation->data; + SCtgDropDBMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1304,6 +1339,33 @@ _return: CTG_RET(code); } +int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgDropDbVgroupMsg *msg = operation->data; + SCatalog* pCtg = msg->pCtg; + + SCtgDBCache *dbCache = NULL; + ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); + if (NULL == dbCache) { + goto _return; + } + + CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); + + ctgFreeVgInfo(dbCache->vgInfo); + dbCache->vgInfo = NULL; + + ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName); + + ctgWReleaseVgInfo(dbCache); + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; @@ -1353,7 +1415,7 @@ _return: int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveStbMsg *msg = operation->data; + SCtgDropStbMetaMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; @@ -1399,7 +1461,7 @@ _return: int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { int32_t code = 0; - SCtgRemoveTblMsg *msg = operation->data; + SCtgDropTblMetaMsg *msg = operation->data; SCatalog* pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index fdab50db0f..cb18e45638 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -132,7 +132,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { } } -int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) { +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) { int32_t code = 0; SCatalogReq req = {0}; req.pTableMeta = taosArrayInit(2, sizeof(SName)); @@ -144,6 +144,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN); req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo)); req.qNodeRequired = true; + req.forceUpdate = forceUpdate; SName name = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0}; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 3e23c395c9..be7b930d79 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -377,15 +377,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ return TSDB_CODE_SUCCESS; } - taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx)); + if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) { + SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx); + } else { + SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx); + } + if (execIdx != pTask->execIdx) { // ignore it + SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx); SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { +int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) { if (taosHashGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } @@ -393,6 +399,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx)); nodeInfo->handle = handle; + SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx); + return TSDB_CODE_SUCCESS; } @@ -403,7 +411,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v SCH_SET_TASK_HANDLE(pTask, handle); - schUpdateTaskExecNode(pTask, handle, execIdx); + schUpdateTaskExecNode(pJob, pTask, handle, execIdx); return TSDB_CODE_SUCCESS; } @@ -551,6 +559,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + ++addNum; } } @@ -1110,6 +1120,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_UNLOCK(SCH_WRITE, &parent->lock); if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { + SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId); SCH_ERR_RET(schLaunchTask(pJob, parent)); } } @@ -1186,7 +1197,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); } - SCH_TASK_DLOG("task has %d exec address", size); + SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); } @@ -1196,7 +1207,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { } SCH_LOCK_TASK(pTask); - if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) { + if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { + SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR); @@ -1306,9 +1318,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); - pTask->execIdx++; + SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx); + SCH_LOG_TASK_START_TS(pTask); if (schJobNeedToStop(pJob, &status)) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b336ce8c76..1389459604 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1037,6 +1037,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; + SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx); } SEpSet epSet = addr->epSet; From 9bd195d94d2aa6dcdc15cd4f06be19c9698d2043 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 22:33:30 +0800 Subject: [PATCH 2/9] reschedule timeout task --- include/common/tmsg.h | 1 + include/libs/scheduler/scheduler.h | 2 + source/client/inc/clientInt.h | 10 -- source/client/src/clientEnv.c | 6 +- source/client/src/clientHb.c | 101 +++---------- source/client/src/clientImpl.c | 14 -- source/client/src/clientSml.c | 2 + source/libs/catalog/src/ctgDbg.c | 15 ++ source/libs/parser/src/parInsert.c | 2 + source/libs/qworker/inc/qwInt.h | 5 +- source/libs/qworker/src/qwUtil.c | 6 + source/libs/qworker/src/qworker.c | 21 ++- source/libs/scheduler/inc/schedulerInt.h | 4 +- source/libs/scheduler/src/schJob.c | 31 +--- source/libs/scheduler/src/schRemote.c | 86 +---------- source/libs/scheduler/src/schUtil.c | 177 ++++++++++++++++++++++- source/libs/scheduler/src/scheduler.c | 10 ++ 17 files changed, 268 insertions(+), 225 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ee858ad506..dfa641c83e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1982,6 +1982,7 @@ typedef struct { typedef struct { SClientHbKey connKey; + int64_t clusterId; SQueryHbReqBasic* query; SHashObj* info; // hash } SClientHbReq; diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 5cb6dbd3bc..22706f0953 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -104,6 +104,8 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); +void schedulerStopQueryHb(void *pTrans); + /** * Cancel query job diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c7b831893e..0000366880 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -58,11 +58,6 @@ enum { typedef struct SAppInstInfo SAppInstInfo; -typedef struct { - void* param; - SClientHbReq* req; -} SHbConnInfo; - typedef struct { char* key; // statistics @@ -72,11 +67,8 @@ typedef struct { int64_t startTime; // ctl SRWLatch lock; // lock is used in serialization - // connection SAppInstInfo* pAppInstInfo; - // info SHashObj* activeInfo; // hash - SHashObj* connInfo; // hash } SAppHbMgr; typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); @@ -326,8 +318,6 @@ void appHbMgrCleanup(void); int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); -int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); - // --- mq void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 9afb989d27..336924f934 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -130,8 +130,12 @@ void destroyTscObj(void *pObj) { SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); - atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); + schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); + if (0 == connNum) { + closeTransporter(pTscObj); + } tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 09c3d269c7..a4c109ee17 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -129,9 +129,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo } static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { - SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); - if (NULL == info) { - tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey)); + if (NULL == pReq) { + tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType); return TSDB_CODE_SUCCESS; } @@ -181,12 +181,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(*clusterId, &pCatalog); + int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); break; } @@ -199,12 +198,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(*clusterId, &pCatalog); + int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); break; } @@ -217,12 +215,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { break; } - int64_t *clusterId = (int64_t *)info->param; struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(*clusterId, &pCatalog); + int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code)); + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); break; } @@ -547,13 +544,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); - if (info) { - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq); - if (code) { - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - continue; - } + code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq); + if (code) { + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + continue; } //hbClearClientHbReq(pOneReq); @@ -569,23 +563,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return pBatchReq; } -void hbClearReqInfo(SAppHbMgr *pAppHbMgr) { - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); - while (pIter != NULL) { - SClientHbReq *pOneReq = pIter; - - tFreeReqKvHash(pOneReq->info); - taosHashClear(pOneReq->info); - - if (pOneReq->query) { - taosArrayDestroy(pOneReq->query->queryDesc); - taosMemoryFreeClear(pOneReq->query); - } - - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - } -} - void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); } @@ -715,14 +692,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { } taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq); - // init getInfoFunc - pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - - if (pAppHbMgr->connInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pAppHbMgr); - return NULL; - } taosThreadMutexLock(&clientHbMgr.lock); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); @@ -745,15 +714,6 @@ void appHbMgrCleanup(void) { taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; - pIter = taosHashIterate(pTarget->connInfo, NULL); - while (pIter != NULL) { - SHbConnInfo *info = pIter; - taosMemoryFree(info->param); - pIter = taosHashIterate(pTarget->connInfo, pIter); - } - taosHashCleanup(pTarget->connInfo); - pTarget->connInfo = NULL; - taosMemoryFree(pTarget->key); taosMemoryFree(pTarget); } @@ -791,7 +751,7 @@ void hbMgrCleanUp() { clientHbMgr.appHbMgrs = NULL; } -int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { +int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) { // init hash in activeinfo void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { @@ -799,17 +759,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } SClientHbReq hbReq = {0}; hbReq.connKey = connKey; + hbReq.clusterId = clusterId; //hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); - // init hash - if (info != NULL) { - SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - info->req = pReq; - taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo)); - } - atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } @@ -819,15 +773,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in .tscRid = tscRefId, .connType = connType, }; - SHbConnInfo info = {0}; switch (connType) { case CONN_TYPE__QUERY: { - int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t)); - *pClusterId = clusterId; - - info.param = pClusterId; - return hbRegisterConnImpl(pAppHbMgr, connKey, &info); + return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId); } case CONN_TYPE__TMQ: { return 0; @@ -844,26 +793,10 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); } - SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); - if (info) { - taosMemoryFree(info->param); - taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); - } - - if (NULL == pReq || NULL == info) { + if (NULL == pReq) { return; } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, - int32_t valueLen) { - // find req by connection id - SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - ASSERT(pReq != NULL); - - taosHashPut(pReq->info, key, keyLen, value, valueLen); - - return 0; -} diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6694954fa2..4c68edaff0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -741,20 +741,6 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { do { destroyRequest(pRequest); pRequest = launchQuery(pTscObj, sql, sqlLen); - if (*sql == 'y') { - SCatalog *pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); - break; - } else if (*sql == 'z') { - SCatalog *pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false); - break; - } - if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c559ac58f8..5643af746d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2332,6 +2332,8 @@ static int32_t isSchemalessDb(SSmlHandle* info){ smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code)); return code; } + taosArrayDestroy(pInfo.pRetensions); + if (!pInfo.schemaless){ info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF; smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index cb18e45638..42baa530ce 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -132,6 +132,21 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { } } + +/* +prepare SQL: +create database db1; +use db1; +create stable st1 (ts timestamp, f1 int) tags(t1 int); +create table tb1 using st1 tags(1); +insert into tb1 values (now, 1); +create qnode on dnode 1; +create user user1 pass "abc"; +create database db2; +grant write on db2.* to user1; +create function udf1 as '/tmp/libudf1.so' outputtype int; +create aggregate function udf2 as '/tmp/libudf2.so' outputtype int; +*/ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) { int32_t code = 0; SCatalogReq req = {0}; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 422c480397..01a6761b33 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1424,6 +1424,8 @@ int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName){ parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname); return code; } + taosArrayDestroy(pInfo.pRetensions); + if (pInfo.schemaless){ parserError("can not insert into schemaless db:%s", dbFname); return TSDB_CODE_SML_INVALID_DB_CONF; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 3804c114cb..082db6428f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -34,6 +34,7 @@ extern "C" { #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 +#define QW_SCH_TIMEOUT_MSEC 180000 enum { QW_PHASE_PRE_QUERY = 1, @@ -137,7 +138,7 @@ typedef struct SQWTaskCtx { } SQWTaskCtx; typedef struct SQWSchStatus { - int32_t lastAccessTs; // timestamp in second + int64_t hbBrokenTs; // timestamp in msecond SRWLatch hbConnLock; SRpcHandleInfo hbConnInfo; SQueryNodeEpId hbEpId; @@ -354,6 +355,8 @@ int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); +void qwClearExpiredSch(SArray* pExpiredSch); +int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 3d0204e355..8bfb80f061 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -535,3 +535,9 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return -1; } + +void qwClearExpiredSch(SArray* pExpiredSch) { + +} + + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8c0366fa0b..44a8fdf7f4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -21,10 +21,12 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re SSchedulerHbRsp rsp = {0}; SQWSchStatus *sch = NULL; - QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch)); QW_LOCK(QW_WRITE, &sch->hbConnLock); + sch->hbBrokenTs = taosGetTimestampMs(); + if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER); sch->hbConnInfo.handle = NULL; @@ -794,6 +796,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWSchStatus *sch = NULL; int32_t taskNum = 0; SQWHbInfo *rspList = NULL; + SArray *pExpiredSch = NULL; int32_t code = 0; qwDbgDumpMgmtInfo(mgmt); @@ -809,8 +812,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { } rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo)); - if (NULL == rspList) { + pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t)); + if (NULL == rspList || NULL == pExpiredSch) { QW_UNLOCK(QW_READ, &mgmt->schLock); + taosMemoryFree(rspList); + taosArrayDestroy(pExpiredSch); QW_ELOG("calloc %d SQWHbInfo failed", schNum); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); qwRelease(refId); @@ -820,6 +826,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { void *key = NULL; size_t keyLen = 0; int32_t i = 0; + int64_t currentMs = taosGetTimestampMs(); void *pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { @@ -827,6 +834,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (NULL == sch->hbConnInfo.handle) { uint64_t *sId = taosHashGetKey(pIter, NULL); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); + + if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) { + taosArrayPush(pExpiredSch, sId); + } + pIter = taosHashIterate(mgmt->schHash, pIter); continue; } @@ -852,7 +864,12 @@ _return: tFreeSSchedulerHbRsp(&rspList[j].rsp); } + if (taosArrayGetSize(pExpiredSch) > 0) { + qwClearExpiredSch(pExpiredSch); + } + taosMemoryFreeClear(rspList); + taosArrayDestroy(pExpiredSch); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); qwRelease(refId); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index c8fee1e532..d961780ae5 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -102,11 +102,11 @@ typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId uint64_t sId; // schedulerId SSchedulerCfg cfg; - SRWLatch lock; bool exit; int32_t jobRef; int32_t jobNum; SSchStat stat; + SRWLatch hbLock; SHashObj *hbConnections; } SSchedulerMgmt; @@ -320,6 +320,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); +void schCleanClusterHb(void* pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index be7b930d79..ca90d2fe34 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -126,30 +126,6 @@ _return: SCH_RET(code); } -void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { - if (!pTask->registerdHb) { - return; - } - - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SQueryNodeEpId epId = {0}; - - epId.nodeId = addr->nodeId; - - SEp* pEp = SCH_GET_CUR_EP(addr); - strcpy(epId.ep.fqdn, pEp->fqdn); - epId.ep.port = pEp->port; - - SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port); - return; - } - - atomic_sub_fetch_64(&hb->taskNum, 1); - - pTask->registerdHb = false; -} void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); @@ -1484,9 +1460,10 @@ void schFreeJobImpl(void *job) { qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); - atomic_sub_fetch_32(&schMgmt.jobNum, 1); - - schCloseJobRef(); + int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); + if (jobNum == 0) { + schCloseJobRef(); + } } int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1389459604..bf51d8d631 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -648,31 +648,6 @@ _return: SCH_RET(code); } -int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) { - int32_t code = 0; - SSchHbTrans hb = {0}; - - hb.trans.pTrans = pJob->pTrans; - - SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx)); - - code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans)); - if (code) { - schFreeRpcCtx(&hb.rpcCtx); - - if (HASH_NODE_EXIST(code)) { - *exist = true; - return TSDB_CODE_SUCCESS; - } - - qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); - SCH_ERR_RET(code); - } - - return TSDB_CODE_SUCCESS; -} - - int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) { SSchedulerHbReq req = {0}; int32_t code = 0; @@ -684,17 +659,20 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) { req.sId = schMgmt.sId; memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId)); + SCH_LOCK(SCH_READ, &schMgmt.hbLock); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId)); if (NULL == hb) { - qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn, + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn, nodeEpId->ep.port); - SCH_ERR_RET(code); + return TSDB_CODE_SUCCESS; } SCH_LOCK(SCH_WRITE, &hb->lock); code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx); memcpy(&trans, &hb->trans, sizeof(trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); SCH_ERR_RET(code); @@ -764,60 +742,6 @@ _return: SCH_RET(code); } - -int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SQueryNodeEpId epId = {0}; - - epId.nodeId = addr->nodeId; - - SEp* pEp = SCH_GET_CUR_EP(addr); - strcpy(epId.ep.fqdn, pEp->fqdn); - epId.ep.port = pEp->port; - - SSchHbTrans *hb = NULL; - while (true) { - hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - bool exist = false; - SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); - if (!exist) { - SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); - } - - continue; - } - - break; - } - - atomic_add_fetch_64(&hb->taskNum, 1); - - pTask->registerdHb = true; - - return TSDB_CODE_SUCCESS; -} - -int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { - int32_t code = 0; - SSchHbTrans *hb = NULL; - - hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - SCH_LOCK(SCH_WRITE, &hb->lock); - memcpy(&hb->trans, trans, sizeof(*trans)); - SCH_UNLOCK(SCH_WRITE, &hb->lock); - - qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, - epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); - - return TSDB_CODE_SUCCESS; -} - int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { SSchedulerHbRsp rsp = {0}; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 81c95ea976..a91dc359f2 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -21,17 +21,187 @@ #include "tref.h" #include "trpc.h" + +void schCleanClusterHb(void* pTrans) { + SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); + + SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL); + while (hb) { + if (hb->trans.pTrans == pTrans) { + SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL); + rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); + taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); + } + + hb = taosHashIterate(schMgmt.hbConnections, hb); + } + + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); +} + +int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) { + int32_t code = 0; + + SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); + SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port); + return TSDB_CODE_SUCCESS; + } + + int64_t taskNum = atomic_load_64(&hb->taskNum); + if (taskNum <= 0) { + rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT); + taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + } + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + + return TSDB_CODE_SUCCESS; +} + + +int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) { + int32_t code = 0; + SSchHbTrans hb = {0}; + + hb.trans.pTrans = pJob->pTrans; + hb.taskNum = 1; + + SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx)); + + SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); + code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans)); + if (code) { + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + schFreeRpcCtx(&hb.rpcCtx); + + if (HASH_NODE_EXIST(code)) { + *exist = true; + return TSDB_CODE_SUCCESS; + } + + qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_ERR_RET(code); + } + + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + + return TSDB_CODE_SUCCESS; +} + +int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) { + SSchHbTrans *hb = NULL; + + while (true) { + SCH_LOCK(SCH_READ, &schMgmt.hbLock); + hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + bool exist = false; + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist)); + if (!exist) { + SCH_RET(schBuildAndSendHbMsg(pEpId, NULL)); + } + + continue; + } + + break; + } + + atomic_add_fetch_64(&hb->taskNum, 1); + + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + + return TSDB_CODE_SUCCESS; +} + +void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { + if (!pTask->registerdHb) { + return; + } + + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SQueryNodeEpId epId = {0}; + + epId.nodeId = addr->nodeId; + + SEp* pEp = SCH_GET_CUR_EP(addr); + strcpy(epId.ep.fqdn, pEp->fqdn); + epId.ep.port = pEp->port; + + SCH_LOCK(SCH_READ, &schMgmt.hbLock); + SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port); + return; + } + + int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1); + if (0 == taskNum) { + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + schRemoveHbConnection(pJob, pTask, &epId); + } else { + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + } + + pTask->registerdHb = false; +} + + + +int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SQueryNodeEpId epId = {0}; + + epId.nodeId = addr->nodeId; + + SEp* pEp = SCH_GET_CUR_EP(addr); + strcpy(epId.ep.fqdn, pEp->fqdn); + epId.ep.port = pEp->port; + + SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId)); + + pTask->registerdHb = true; + + return TSDB_CODE_SUCCESS; +} + +int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { + int32_t code = 0; + SSchHbTrans *hb = NULL; + + SCH_LOCK(SCH_READ, &schMgmt.hbLock); + hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + SCH_LOCK(SCH_WRITE, &hb->lock); + memcpy(&hb->trans, trans, sizeof(*trans)); + SCH_UNLOCK(SCH_WRITE, &hb->lock); + SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); + + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, + epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); + + return TSDB_CODE_SUCCESS; +} + + void schCloseJobRef(void) { if (!atomic_load_8((int8_t *)&schMgmt.exit)) { return; } - SCH_LOCK(SCH_WRITE, &schMgmt.lock); - if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) { + if (schMgmt.jobRef >= 0) { taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = -1; } - SCH_UNLOCK(SCH_WRITE, &schMgmt.lock); } uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); } @@ -88,4 +258,3 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { (*pCtx->freeFunc)(pCtx->brokenVal.val); } - diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 32e00c1b70..b4dc067f4c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -182,6 +182,14 @@ int32_t scheduleCancelJob(int64_t job) { SCH_RET(code); } +void schedulerStopQueryHb(void *pTrans) { + if (NULL == pTrans) { + return; + } + + schCleanClusterHb(pTrans); +} + void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { @@ -220,6 +228,7 @@ void schedulerDestroy(void) { } } + SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); if (schMgmt.hbConnections) { void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); while (pIter != NULL) { @@ -230,4 +239,5 @@ void schedulerDestroy(void) { taosHashCleanup(schMgmt.hbConnections); schMgmt.hbConnections = NULL; } + SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); } From 31f10572c4e8f15f11d8502a99896c4565457cb4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sun, 5 Jun 2022 10:18:03 +0800 Subject: [PATCH 3/9] remove rpc close --- source/client/src/clientEnv.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 336924f934..be0a41ba40 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -134,7 +134,8 @@ void destroyTscObj(void *pObj) { closeAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); if (0 == connNum) { - closeTransporter(pTscObj); + // TODO + //closeTransporter(pTscObj); } tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); From 2c133b4a279f8a45eb08e85474d977bad5b62dce Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Jun 2022 12:28:12 +0800 Subject: [PATCH 4/9] notify if server receice release req --- source/libs/transport/src/transSvr.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 608fd00b2c..82d2b7a6db 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -180,6 +180,12 @@ static bool addHandleToAcceptloop(void* arg); if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ + if (conn->regArg.init) { \ + tTrace("server conn %p release, notify server app", conn); \ + STrans* pTransInst = conn->pTransInst; \ + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ + memset(&conn->regArg, 0, sizeof(conn->regArg)); \ + } \ uvStartSendRespInternal(srvMsg); \ return; \ } \ From 72657a3f60e8953dcf03e28f02cae694b92b639d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Jun 2022 23:01:48 +0800 Subject: [PATCH 5/9] fix invalid conn --- source/libs/transport/src/transSvr.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 82d2b7a6db..50f99128b2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1160,6 +1160,10 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { } SExHandle* ex = thandle; SSvrConn* pConn = ex->handle; + if (pConn == NULL) { + tTrace("invalid handle %p, failed to Get Conn info", thandle); + return -1; + } struct sockaddr_in addr = pConn->addr; pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); From 64de7ee1ac1c824e40bcae5798f653d2f8ad69f7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Jun 2022 09:12:54 +0800 Subject: [PATCH 6/9] disable hb cleanup --- source/libs/scheduler/src/schUtil.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index a91dc359f2..18398802db 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -40,6 +40,8 @@ void schCleanClusterHb(void* pTrans) { } int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) { + return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY + int32_t code = 0; SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); From eeb5c5578234e4d9ca9daf6ba6a57b5b454d56a1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Jun 2022 10:37:17 +0800 Subject: [PATCH 7/9] enh: should not alter stb if field used by topic --- include/common/tmsg.h | 2 +- include/util/taoserror.h | 1 + source/dnode/mnode/impl/inc/mndTopic.h | 2 +- source/dnode/mnode/impl/src/mndStb.c | 89 +++++++++++++++++--------- source/dnode/mnode/impl/src/mndTopic.c | 19 +++--- source/util/src/terror.c | 9 +-- 6 files changed, 75 insertions(+), 47 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 202aeaf69e..5d0937975b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -177,8 +177,8 @@ typedef struct { typedef struct SField { char name[TSDB_COL_NAME_LEN]; uint8_t type; - int32_t bytes; int8_t flags; + int32_t bytes; } SField; typedef struct SRetention { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index cbef0b46a7..0b331abf8a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -235,6 +235,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC) #define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD) #define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE) +#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF) #define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0) // mnode-infoSchema diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 4aa18ea591..4becad6da2 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); -bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds); +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 175878959d..388172f6ae 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -23,6 +23,7 @@ #include "mndPerfSchema.h" #include "mndScheduler.h" #include "mndShow.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -394,14 +395,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) != - TSDB_CODE_SUCCESS) { + if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, + req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) != - TSDB_CODE_SUCCESS) { + if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, + req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } @@ -949,13 +950,18 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p return 0; } -static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) { +static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) { int32_t tag = mndFindSuperTableTagIndex(pOld, tagName); if (tag < 0) { terrno = TSDB_CODE_MND_TAG_NOT_EXIST; return -1; } + col_id_t colId = pOld->pTags[tag].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + return -1; + } + if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } @@ -968,7 +974,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch return 0; } -static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) { +static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) { if ((int32_t)taosArrayGetSize(pFields) != 2) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; @@ -986,6 +992,11 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF return -1; } + col_id_t colId = pOld->pTags[tag].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + return -1; + } + if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) { terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST; return -1; @@ -1008,13 +1019,18 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF return 0; } -static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) { +static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) { int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name); if (tag < 0) { terrno = TSDB_CODE_MND_TAG_NOT_EXIST; return -1; } + col_id_t colId = pOld->pTags[tag].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + return -1; + } + if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } @@ -1075,7 +1091,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray return 0; } -static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) { +static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) { int32_t col = mndFindSuperTableColumnIndex(pOld, colName); if (col < 0) { terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST; @@ -1092,6 +1108,11 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const return -1; } + col_id_t colId = pOld->pTags[col].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + return -1; + } + if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } @@ -1104,7 +1125,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const return 0; } -static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) { +static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) { int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name); if (col < 0) { terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST; @@ -1121,6 +1142,11 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const return -1; } + col_id_t colId = pOld->pTags[col].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + return -1; + } + if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } @@ -1199,7 +1225,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } - static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) { taosRLockLatch(&pStb->lock); @@ -1269,13 +1294,13 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char return code; } - -static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) { +static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, + int32_t *pLen) { int ret; SEncoder ec = {0}; - uint32_t contLen = 0; + uint32_t contLen = 0; SMAlterStbRsp alterRsp = {0}; - SName name = {0}; + SName name = {0}; tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp)); @@ -1283,20 +1308,20 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - + ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta); if (ret) { tFreeSMAlterStbRsp(&alterRsp); return ret; } - + tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, ret); if (ret) { tFreeSMAlterStbRsp(&alterRsp); return ret; } - void* cont = taosMemoryMalloc(contLen); + void *cont = taosMemoryMalloc(contLen); tEncoderInit(&ec, cont, contLen); tEncodeSMAlterStbRsp(&ec, &alterRsp); tEncoderClear(&ec); @@ -1305,24 +1330,24 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S *pCont = cont; *pLen = contLen; - + return 0; } static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { - bool needRsp = true; + bool needRsp = true; + int32_t code = -1; + STrans *pTrans = NULL; + SField *pField0 = NULL; + SStbObj stbObj = {0}; taosRLockLatch(&pOld->lock); memcpy(&stbObj, pOld, sizeof(SStbObj)); + taosRUnLockLatch(&pOld->lock); stbObj.pColumns = NULL; stbObj.pTags = NULL; stbObj.updateTime = taosGetTimestampMs(); stbObj.lock = 0; - taosRUnLockLatch(&pOld->lock); - - int32_t code = -1; - STrans *pTrans = NULL; - SField *pField0 = NULL; switch (pAlter->alterType) { case TSDB_ALTER_TABLE_ADD_TAG: @@ -1330,25 +1355,25 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p break; case TSDB_ALTER_TABLE_DROP_TAG: pField0 = taosArrayGet(pAlter->pFields, 0); - code = mndDropSuperTableTag(pOld, &stbObj, pField0->name); + code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name); break; case TSDB_ALTER_TABLE_UPDATE_TAG_NAME: - code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields); + code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields); break; case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: pField0 = taosArrayGet(pAlter->pFields, 0); - code = mndAlterStbTagBytes(pOld, &stbObj, pField0); + code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0); break; case TSDB_ALTER_TABLE_ADD_COLUMN: code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields); break; case TSDB_ALTER_TABLE_DROP_COLUMN: pField0 = taosArrayGet(pAlter->pFields, 0); - code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name); + code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name); break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: pField0 = taosArrayGet(pAlter->pFields, 0); - code = mndAlterStbColumnBytes(pOld, &stbObj, pField0); + code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0); break; case TSDB_ALTER_TABLE_UPDATE_OPTIONS: needRsp = false; @@ -1370,12 +1395,12 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p mndTransSetDbName(pTrans, pDb->name); if (needRsp) { - void* pCont = NULL; + void *pCont = NULL; int32_t contLen = 0; - if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen)) goto _OVER; + if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER; mndTransSetRpcRsp(pTrans, pCont, contLen); } - + if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 95f22a33db..a810a6a487 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -71,7 +71,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { return strchr(topic, '.') + 1; } -bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) { +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; bool found = false; @@ -105,20 +105,21 @@ bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *col } } - for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) { - int16_t *pColId = taosArrayGet(colAndTagIds, i); - if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) { - found = true; - goto NEXT; - } + if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) { + found = true; + goto NEXT; } NEXT: sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); - if (found) return false; + if (found) { + terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; + return -1; + } } - return true; + + return 0; } SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0b1de5dd89..8d7bc0851b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -225,14 +225,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist") // mnode-stable -TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "Stable confilct with topic") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "STable not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "STable confilct with topic") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_ALTER_OPTION, "Invalid stable alter options") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable option unchanged") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "STable option unchanged") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREADY_EXIST, "Tag already exists") @@ -240,6 +240,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode") // mnode-infoSchema From f35ba2a85e80a7fc5ddf6796b97f6c2e554e96e0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 6 Jun 2022 11:51:00 +0800 Subject: [PATCH 8/9] fix: failed to update json idx --- source/dnode/vnode/src/meta/metaTable.c | 2 +- source/libs/index/src/indexFilter.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4f4a940bc7..c6a6dd8147 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -103,7 +103,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid); indexMultiTermDestroy(terms); #endif - return -1; + return 0; } int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 34251ab918..2e30ca704a 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -38,7 +38,7 @@ typedef struct SIFParam { col_id_t colId; int64_t suid; // add later char dbName[TSDB_DB_NAME_LEN]; - char colName[TSDB_COL_NAME_LEN]; + char colName[TSDB_COL_NAME_LEN * 2 + 4]; SIndexMetaArg arg; } SIFParam; @@ -171,6 +171,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) { param->colId = l->colId; param->colValType = l->node.resType.type; memcpy(param->dbName, l->dbName, sizeof(l->dbName)); + sprintf(param->colName, "%s_%s", l->colName, r->literal); param->colValType = r->typeData; return 0; From 79769dd2ad7642bc0c4ba902d96fb4c897f9136e Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 6 Jun 2022 13:47:32 +0800 Subject: [PATCH 9/9] feat: add vgroup epset for tsma --- include/common/tmsg.h | 44 +++++++++---------- source/common/src/tmsg.c | 62 ++++++++++++++------------- source/dnode/mnode/impl/inc/mndDef.h | 46 ++++++++++---------- source/dnode/mnode/impl/src/mndSma.c | 63 +++++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 73 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 39dc5361e6..2afab8b2f8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1122,13 +1122,13 @@ typedef struct { SSchema* pSchemas; } STableMetaRsp; -typedef struct { +typedef struct { STableMetaRsp* pMeta; } SMAlterStbRsp; -int32_t tEncodeSMAlterStbRsp(SEncoder *pEncoder, const SMAlterStbRsp *pRsp); -int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp); -void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp); +int32_t tEncodeSMAlterStbRsp(SEncoder* pEncoder, const SMAlterStbRsp* pRsp); +int32_t tDecodeSMAlterStbRsp(SDecoder* pDecoder, SMAlterStbRsp* pRsp); +void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp); int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); @@ -2303,23 +2303,23 @@ typedef struct { } SVgEpSet; typedef struct { - int8_t version; // for compatibility(default 0) - int8_t intervalUnit; // MACRO: TIME_UNIT_XXX - int8_t slidingUnit; // MACRO: TIME_UNIT_XXX - int8_t timezoneInt; // sma data expired if timezone changes. - int32_t dstVgId; - char indexName[TSDB_INDEX_NAME_LEN]; - int32_t exprLen; - int32_t tagsFilterLen; - int32_t numOfVgroups; - int64_t indexUid; - tb_uid_t tableUid; // super/child/common table uid - int64_t interval; - int64_t offset; // use unit by precision of DB - int64_t sliding; - char* expr; // sma expression - char* tagsFilter; - SVgEpSet vgEpSet[]; + int8_t version; // for compatibility(default 0) + int8_t intervalUnit; // MACRO: TIME_UNIT_XXX + int8_t slidingUnit; // MACRO: TIME_UNIT_XXX + int8_t timezoneInt; // sma data expired if timezone changes. + int32_t dstVgId; + char indexName[TSDB_INDEX_NAME_LEN]; + int32_t exprLen; + int32_t tagsFilterLen; + int32_t numOfVgroups; + int64_t indexUid; + tb_uid_t tableUid; // super/child/common table uid + int64_t interval; + int64_t offset; // use unit by precision of DB + int64_t sliding; + char* expr; // sma expression + char* tagsFilter; + SVgEpSet* pVgEpSet; } STSma; // Time-range-wise SMA typedef STSma SVCreateTSmaReq; @@ -2405,7 +2405,7 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) { } typedef struct { - int64_t indexUid; + int64_t indexUid; STimeWindow queryWindow; } SVGetTsmaExpWndsReq; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9c9f33ac96..7041c9478e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -694,7 +694,6 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) { pReq->pFields = NULL; } - int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3674,12 +3673,12 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1; } for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { - if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1; - if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1; - int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps; + if (tEncodeI32(pCoder, pSma->pVgEpSet[v].vgId) < 0) return -1; + if (tEncodeI8(pCoder, pSma->pVgEpSet[v].epSet.inUse) < 0) return -1; + int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps; if (tEncodeI8(pCoder, numOfEps) < 0) return -1; for (int32_t n = 0; n < numOfEps; ++n) { - const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n]; + const SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n]; if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1; if (tEncodeU16(pCoder, pEp->port) < 0) return -1; } @@ -3712,15 +3711,25 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { } else { pSma->tagsFilter = NULL; } - for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { - if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1; - if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1; - if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1; - int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps; - for (int32_t n = 0; n < numOfEps; ++n) { - SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n]; - if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1; - if (tDecodeU16(pCoder, &pEp->port) < 0) return -1; + if (pSma->numOfVgroups > 0) { + pSma->pVgEpSet = (SVgEpSet *)tDecoderMalloc(pCoder, pSma->numOfVgroups * sizeof(SVgEpSet)); + if (!pSma->pVgEpSet) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + memset(pSma->pVgEpSet, 0, pSma->numOfVgroups * sizeof(SVgEpSet)); + + for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { + if (tDecodeI32(pCoder, &pSma->pVgEpSet[v].vgId) < 0) return -1; + if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.inUse) < 0) return -1; + if (tDecodeI8(pCoder, &pSma->pVgEpSet[v].epSet.numOfEps) < 0) return -1; + int8_t numOfEps = pSma->pVgEpSet[v].epSet.numOfEps; + for (int32_t n = 0; n < numOfEps; ++n) { + SEp *pEp = &pSma->pVgEpSet[v].epSet.eps[n]; + if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1; + if (tDecodeU16(pCoder, &pEp->port) < 0) return -1; + } } } @@ -3765,7 +3774,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) { return 0; } -int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* pReq) { +int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder *pCoder, const SVGetTsmaExpWndsReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1; @@ -3773,10 +3782,10 @@ int32_t tEncodeSVGetTSmaExpWndsReq(SEncoder* pCoder, const SVGetTsmaExpWndsReq* if (tEncodeI64(pCoder, pReq->queryWindow.ekey) < 0) return -1; tEndEncode(pCoder); - return 0; + return 0; } -int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) { +int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder *pCoder, SVGetTsmaExpWndsReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1; @@ -3787,7 +3796,7 @@ int32_t tDecodeSVGetTsmaExpWndsReq(SDecoder* pCoder, SVGetTsmaExpWndsReq* pReq) return 0; } -int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder* pCoder, const SVGetTsmaExpWndsRsp* pReq) { +int32_t tEncodeSVGetTSmaExpWndsRsp(SEncoder *pCoder, const SVGetTsmaExpWndsRsp *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeI64(pCoder, pReq->indexUid) < 0) return -1; @@ -3814,7 +3823,7 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) return 0; } -int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) { +int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1; @@ -3832,7 +3841,7 @@ int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq) { return 0; } -int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) { +int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1; @@ -3850,7 +3859,7 @@ int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq) { return 0; } -int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) { +int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeI32(pCoder, pReq->code) < 0) return -1; @@ -3860,7 +3869,7 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) { return 0; } -int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) { +int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeI32(pCoder, &pReq->code) < 0) return -1; @@ -4502,7 +4511,7 @@ int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) { } int32_t tDeserializeSVAlterTbRsp(void *buf, int32_t bufLen, SVAlterTbRsp *pRsp) { - int32_t meta = 0; + int32_t meta = 0; SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -4543,7 +4552,7 @@ int32_t tDecodeSMAlterStbRsp(SDecoder *pDecoder, SMAlterStbRsp *pRsp) { } int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp) { - int32_t meta = 0; + int32_t meta = 0; SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -4559,7 +4568,7 @@ int32_t tDeserializeSMAlterStbRsp(void *buf, int32_t bufLen, SMAlterStbRsp *pRsp return 0; } -void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) { +void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { if (NULL == pRsp) { return; } @@ -4569,6 +4578,3 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp) { taosMemoryFree(pRsp->pMeta); } } - - - diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 83a36f4b0d..8ea172ef94 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -298,28 +298,30 @@ typedef struct { } SVgObj; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; - char stb[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createdTime; - int64_t uid; - int64_t stbUid; - int64_t dbUid; - int8_t intervalUnit; - int8_t slidingUnit; - int8_t timezone; - int32_t dstVgId; // for stream - int64_t interval; - int64_t offset; - int64_t sliding; - int32_t exprLen; // strlen + 1 - int32_t tagsFilterLen; - int32_t sqlLen; - int32_t astLen; - char* expr; - char* tagsFilter; - char* sql; - char* ast; + char name[TSDB_TABLE_FNAME_LEN]; + char stb[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createdTime; + int64_t uid; + int64_t stbUid; + int64_t dbUid; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int32_t dstVgId; // for stream + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; // strlen + 1 + int32_t tagsFilterLen; + int32_t sqlLen; + int32_t astLen; + int32_t numOfVgroups; + char* expr; + char* tagsFilter; + char* sql; + char* ast; + SVgEpSet* pVgEpSet; } SSmaObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e8e23ccc2a..f9cb63dc74 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -36,6 +36,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw); static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); +static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups); static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); @@ -262,7 +263,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm req.sliding = pSma->sliding; req.expr = pSma->expr; req.tagsFilter = pSma->tagsFilter; - + req.numOfVgroups = pSma->numOfVgroups; + req.pVgEpSet = pSma->pVgEpSet; + // get length int32_t ret = 0; tEncodeSize(tEncodeSVCreateTSmaReq, &req, contLen, ret); @@ -420,6 +423,15 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, mndReleaseDnode(pMnode, pDnode); // todo add sma info here + SVgEpSet *pVgEpSet = NULL; + int32_t numOfVgroups = 0; + if (mndSmaGetVgEpSet(pMnode, pDb, &pVgEpSet, &numOfVgroups) != 0) { + return -1; + } + + pSma->pVgEpSet = pVgEpSet; + pSma->numOfVgroups = numOfVgroups; + int32_t smaContLen = 0; void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); if (pSmaReq == NULL) return -1; @@ -964,3 +976,52 @@ static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + SVgEpSet *pVgEpSet = NULL; + int32_t nAllocVgs = 16; + int32_t nVgs = 0; + + pVgEpSet = taosMemoryCalloc(nAllocVgs, sizeof(SVgEpSet)); + if (!pVgEpSet) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + if (nVgs >= nAllocVgs) { + void *p = taosMemoryRealloc(pVgEpSet, nAllocVgs * 2 * sizeof(SVgEpSet)); + if (!p) { + taosMemoryFree(pVgEpSet); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pVgEpSet = (SVgEpSet *)p; + nAllocVgs *= 2; + } + + (pVgEpSet + nVgs)->vgId = pVgroup->vgId; + (pVgEpSet + nVgs)->epSet = mndGetVgroupEpset(pMnode, pVgroup); + + ++nVgs; + + sdbRelease(pSdb, pVgroup); + } + + *ppVgEpSet = pVgEpSet; + *numOfVgroups = nVgs; + + return 0; +}