diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 4ef23860cd..777dcd0592 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -279,6 +279,7 @@ typedef struct SCtgMsgCtx { void* lastOut; void* out; char* target; + SHashObj* pBatchs; } SCtgMsgCtx; @@ -315,7 +316,6 @@ typedef struct SCtgTask { SRWLatch lock; SArray* pParents; SCtgSubRes subRes; - SHashObj* pBatchs; } SCtgTask; typedef struct SCtgTaskReq { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 63d99cc58b..c8165969a5 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -855,6 +855,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { int32_t parentNum = taosArrayGetSize(pTask->pParents); for (int32_t i = 0; i < parentNum; ++i) { + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); pParent->subRes.code = pTask->code; @@ -865,7 +866,9 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { } } - pParent->pBatchs = pTask->pBatchs; + SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); + + pParMsgCtx->pBatchs = pMsgCtx->pBatchs; CTG_ERR_JRET(pParent->subRes.fp(pParent)); } @@ -1625,6 +1628,11 @@ _return: int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res)); if (pTask->res) { @@ -1645,6 +1653,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t fetchIdx = 0; @@ -1670,7 +1679,11 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) { SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); - + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } + SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = pFetch->fetchIdx; @@ -1686,6 +1699,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgDBCache *dbCache = NULL; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); if (NULL != dbCache) { @@ -1722,6 +1740,11 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgDBCache *dbCache = NULL; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); if (NULL != dbCache) { @@ -1761,6 +1784,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgDBCache *dbCache = NULL; + SCtgJob* pJob = pTask->pJob; int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t fetchIdx = 0; int32_t baseResIdx = 0; @@ -1803,7 +1827,11 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) { SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); - + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } + SBuildUseDBInput input = {0}; strcpy(input.db, pReq->dbFName); @@ -1831,6 +1859,11 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SArray* pRes = NULL; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes)); if (pRes) { @@ -1852,6 +1885,11 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { SArray* pRes = NULL; char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pCtx->pName, dbFName); + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } if (pCtx->tbType <= 0) { CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType)); @@ -1890,6 +1928,11 @@ _return: int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask)); return TSDB_CODE_SUCCESS; @@ -1898,6 +1941,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask)); return TSDB_CODE_SUCCESS; @@ -1908,6 +1956,11 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask)); @@ -1919,6 +1972,11 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SCtgDBCache *dbCache = NULL; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo)); if (NULL == pTask->res) { @@ -1953,6 +2011,11 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask)); @@ -1963,6 +2026,11 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask)); @@ -1975,6 +2043,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; bool inCache = false; bool pass = false; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass)); if (inCache) { @@ -1996,6 +2069,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgJob* pJob = pTask->pJob; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + if (NULL == pMsgCtx->pBatchs) { + pMsgCtx->pBatchs = pJob->pBatchs; + } CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask)); @@ -2129,7 +2207,10 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { if (CTG_TASK_DONE == pSub->status) { pTask->subRes.code = pSub->code; CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); - pTask->pBatchs = pSub->pBatchs; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); + pMsgCtx->pBatchs = pSubMsgCtx->pBatchs; + CTG_ERR_JRET(pTask->subRes.fp(pTask)); } else { if (NULL == pSub->pParents) { @@ -2167,7 +2248,10 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); if (newTask) { - pSub->pBatchs = pTask->pBatchs; + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); + pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; + CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); pSub->status = CTG_TASK_LAUNCHED; } @@ -2180,7 +2264,6 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { for (int32_t i = 0; i < taskNum; ++i) { SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); - pTask->pBatchs = pJob->pBatchs; qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 45f97865ce..1fdf84e120 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -68,14 +68,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu taskMsg.pData = NULL; taskMsg.len = 0; } - - pTask->pBatchs = pBatchs; SCtgTaskReq tReq; tReq.pTask = pTask; tReq.msgIdx = rsp.msgIdx; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx); + pMsgCtx->pBatchs = pBatchs; - ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1)); + ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); } @@ -343,7 +343,9 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - pTask->pBatchs = pBatchs; + + SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + pMsgCtx->pBatchs = pBatchs; #endif SCtgTaskReq tReq; @@ -444,7 +446,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT uint32_t msgSize) { int32_t code = 0; SCtgTask* pTask = tReq->pTask; - SHashObj* pBatchs = pTask->pBatchs; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); + SHashObj* pBatchs = pMsgCtx->pBatchs; SCtgJob* pJob = pTask->pJob; SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); SCtgBatch newBatch = {0};