fix: fix catalog crash issue
This commit is contained in:
parent
0a84910be1
commit
d281990195
|
@ -279,6 +279,7 @@ typedef struct SCtgMsgCtx {
|
||||||
void* lastOut;
|
void* lastOut;
|
||||||
void* out;
|
void* out;
|
||||||
char* target;
|
char* target;
|
||||||
|
SHashObj* pBatchs;
|
||||||
} SCtgMsgCtx;
|
} SCtgMsgCtx;
|
||||||
|
|
||||||
|
|
||||||
|
@ -315,7 +316,6 @@ typedef struct SCtgTask {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
SArray* pParents;
|
SArray* pParents;
|
||||||
SCtgSubRes subRes;
|
SCtgSubRes subRes;
|
||||||
SHashObj* pBatchs;
|
|
||||||
} SCtgTask;
|
} SCtgTask;
|
||||||
|
|
||||||
typedef struct SCtgTaskReq {
|
typedef struct SCtgTaskReq {
|
||||||
|
|
|
@ -855,6 +855,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
|
||||||
|
|
||||||
int32_t parentNum = taosArrayGetSize(pTask->pParents);
|
int32_t parentNum = taosArrayGetSize(pTask->pParents);
|
||||||
for (int32_t i = 0; i < parentNum; ++i) {
|
for (int32_t i = 0; i < parentNum; ++i) {
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
|
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
|
||||||
|
|
||||||
pParent->subRes.code = pTask->code;
|
pParent->subRes.code = pTask->code;
|
||||||
|
@ -865,7 +866,9 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pParent->pBatchs = pTask->pBatchs;
|
SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
|
||||||
|
|
||||||
|
pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
|
||||||
CTG_ERR_JRET(pParent->subRes.fp(pParent));
|
CTG_ERR_JRET(pParent->subRes.fp(pParent));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1625,6 +1628,11 @@ _return:
|
||||||
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
|
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
|
CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
|
||||||
if (pTask->res) {
|
if (pTask->res) {
|
||||||
|
@ -1645,6 +1653,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
|
||||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||||
int32_t fetchIdx = 0;
|
int32_t fetchIdx = 0;
|
||||||
|
@ -1670,7 +1679,11 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
|
||||||
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
|
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
|
||||||
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
|
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
|
||||||
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
|
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgTaskReq tReq;
|
SCtgTaskReq tReq;
|
||||||
tReq.pTask = pTask;
|
tReq.pTask = pTask;
|
||||||
tReq.msgIdx = pFetch->fetchIdx;
|
tReq.msgIdx = pFetch->fetchIdx;
|
||||||
|
@ -1686,6 +1699,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
|
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
||||||
if (NULL != dbCache) {
|
if (NULL != dbCache) {
|
||||||
|
@ -1722,6 +1740,11 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
|
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
||||||
if (NULL != dbCache) {
|
if (NULL != dbCache) {
|
||||||
|
@ -1761,6 +1784,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
|
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||||
int32_t fetchIdx = 0;
|
int32_t fetchIdx = 0;
|
||||||
int32_t baseResIdx = 0;
|
int32_t baseResIdx = 0;
|
||||||
|
@ -1803,7 +1827,11 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
|
||||||
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
|
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
|
||||||
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
|
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
|
||||||
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
SBuildUseDBInput input = {0};
|
SBuildUseDBInput input = {0};
|
||||||
strcpy(input.db, pReq->dbFName);
|
strcpy(input.db, pReq->dbFName);
|
||||||
|
|
||||||
|
@ -1831,6 +1859,11 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
||||||
SArray* pRes = NULL;
|
SArray* pRes = NULL;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
|
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
|
@ -1852,6 +1885,11 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
|
||||||
SArray* pRes = NULL;
|
SArray* pRes = NULL;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
tNameGetFullDbName(pCtx->pName, dbFName);
|
tNameGetFullDbName(pCtx->pName, dbFName);
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
if (pCtx->tbType <= 0) {
|
if (pCtx->tbType <= 0) {
|
||||||
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
||||||
|
@ -1890,6 +1928,11 @@ _return:
|
||||||
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
|
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask));
|
CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1898,6 +1941,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
|
||||||
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
|
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
|
CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1908,6 +1956,11 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
|
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
|
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
|
||||||
|
|
||||||
|
@ -1919,6 +1972,11 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
|
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo));
|
pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo));
|
||||||
if (NULL == pTask->res) {
|
if (NULL == pTask->res) {
|
||||||
|
@ -1953,6 +2011,11 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
|
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask));
|
CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask));
|
||||||
|
|
||||||
|
@ -1963,6 +2026,11 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
|
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask));
|
CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask));
|
||||||
|
|
||||||
|
@ -1975,6 +2043,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
|
||||||
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
|
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
bool pass = false;
|
bool pass = false;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass));
|
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass));
|
||||||
if (inCache) {
|
if (inCache) {
|
||||||
|
@ -1996,6 +2069,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
|
||||||
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
|
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));
|
CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));
|
||||||
|
|
||||||
|
@ -2129,7 +2207,10 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
|
||||||
if (CTG_TASK_DONE == pSub->status) {
|
if (CTG_TASK_DONE == pSub->status) {
|
||||||
pTask->subRes.code = pSub->code;
|
pTask->subRes.code = pSub->code;
|
||||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
|
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
|
||||||
pTask->pBatchs = pSub->pBatchs;
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
|
||||||
|
pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
|
||||||
|
|
||||||
CTG_ERR_JRET(pTask->subRes.fp(pTask));
|
CTG_ERR_JRET(pTask->subRes.fp(pTask));
|
||||||
} else {
|
} else {
|
||||||
if (NULL == pSub->pParents) {
|
if (NULL == pSub->pParents) {
|
||||||
|
@ -2167,7 +2248,10 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
|
||||||
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
||||||
|
|
||||||
if (newTask) {
|
if (newTask) {
|
||||||
pSub->pBatchs = pTask->pBatchs;
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
|
||||||
|
pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;
|
||||||
|
|
||||||
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
|
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
|
||||||
pSub->status = CTG_TASK_LAUNCHED;
|
pSub->status = CTG_TASK_LAUNCHED;
|
||||||
}
|
}
|
||||||
|
@ -2180,7 +2264,6 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < taskNum; ++i) {
|
for (int32_t i = 0; i < taskNum; ++i) {
|
||||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
||||||
pTask->pBatchs = pJob->pBatchs;
|
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
|
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
|
||||||
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
||||||
|
|
|
@ -68,14 +68,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
||||||
taskMsg.pData = NULL;
|
taskMsg.pData = NULL;
|
||||||
taskMsg.len = 0;
|
taskMsg.len = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->pBatchs = pBatchs;
|
|
||||||
|
|
||||||
SCtgTaskReq tReq;
|
SCtgTaskReq tReq;
|
||||||
tReq.pTask = pTask;
|
tReq.pTask = pTask;
|
||||||
tReq.msgIdx = rsp.msgIdx;
|
tReq.msgIdx = rsp.msgIdx;
|
||||||
|
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
|
||||||
|
pMsgCtx->pBatchs = pBatchs;
|
||||||
|
|
||||||
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1));
|
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
|
||||||
|
|
||||||
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
|
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
|
||||||
}
|
}
|
||||||
|
@ -343,7 +343,9 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
|
||||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
pTask->pBatchs = pBatchs;
|
|
||||||
|
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
pMsgCtx->pBatchs = pBatchs;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SCtgTaskReq tReq;
|
SCtgTaskReq tReq;
|
||||||
|
@ -444,7 +446,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
|
||||||
uint32_t msgSize) {
|
uint32_t msgSize) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgTask* pTask = tReq->pTask;
|
SCtgTask* pTask = tReq->pTask;
|
||||||
SHashObj* pBatchs = pTask->pBatchs;
|
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
|
||||||
|
SHashObj* pBatchs = pMsgCtx->pBatchs;
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
|
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
|
||||||
SCtgBatch newBatch = {0};
|
SCtgBatch newBatch = {0};
|
||||||
|
|
Loading…
Reference in New Issue