From f505993d053c9a92b91c5d754fd81ffdbfe2e26d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 10 Jul 2023 11:23:38 +0800 Subject: [PATCH] fix: catalog append subtask issue --- source/libs/catalog/inc/catalogInt.h | 2 +- source/libs/catalog/src/ctgAsync.c | 39 ++++++++++++++++++++-------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 5746ea2340..7d47e82164 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -938,7 +938,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const void* param); int32_t ctgLaunchJob(SCtgJob* pJob); int32_t ctgMakeAsyncRes(SCtgJob* pJob); -int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); +int32_t ctgLaunchSubTask(SCtgTask** ppTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); int32_t ctgGetTbCfgCb(SCtgTask* pTask); void ctgFreeHandle(SCatalog* pCatalog); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 562343c9c7..96b18175ab 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2097,7 +2097,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) { SCtgTbMetaParam param; param.pName = pCtx->pName; param.flag = 0; - CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, ¶m)); + CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, ¶m)); return TSDB_CODE_SUCCESS; } } @@ -2108,7 +2108,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) { if (NULL == pCtx->pVgInfo) { CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo)); if (NULL == pCtx->pVgInfo) { - CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName)); + CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName)); return TSDB_CODE_SUCCESS; } } @@ -2145,7 +2145,7 @@ int32_t ctgLaunchGetTbTagTask(SCtgTask* pTask) { if (NULL == pCtx->pVgInfo) { CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo)); if (NULL == pCtx->pVgInfo) { - CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbTagCb, dbFName)); + CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbTagCb, dbFName)); return TSDB_CODE_SUCCESS; } } @@ -2331,7 +2331,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask* pTask) { SCtgTbMetaParam param; param.pName = &pCtx->user.tbName; param.flag = CTG_FLAG_SYNC_OP; - CTG_ERR_RET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetUserCb, ¶m)); + CTG_ERR_RET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_TB_META, ctgGetUserCb, ¶m)); } else { CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask)); } @@ -2541,19 +2541,35 @@ _return: CTG_RET(code); } -int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { - SCtgJob* pJob = pTask->pJob; +SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) { + int32_t taskNum = taosArrayGetSize(pJob->pTasks); + + for (int32_t i = 0; i < taskNum; ++i) { + SCtgTask* pTask = taosArrayGet(pJob->pTasks, i); + if (pTask->taskId == taskId) { + return pTask; + } + } + + return NULL; +} + + +int32_t ctgLaunchSubTask(SCtgTask** ppTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { + SCtgJob* pJob = (*ppTask)->pJob; int32_t subTaskId = -1; bool newTask = false; + int32_t taskId = (*ppTask)->taskId; - ctgClearSubTaskRes(&pTask->subRes); - pTask->subRes.type = type; - pTask->subRes.fp = fp; + ctgClearSubTaskRes(&(*ppTask)->subRes); + (*ppTask)->subRes.type = type; + (*ppTask)->subRes.fp = fp; CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId)); if (subTaskId < 0) { CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId)); newTask = true; + *ppTask = ctgGetTask(pJob, taskId); } SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId); @@ -2561,10 +2577,10 @@ int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, pSub->subTask = true; } - CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); + CTG_ERR_RET(ctgSetSubTaskCb(pSub, *ppTask)); if (newTask) { - SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(*ppTask, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; @@ -2584,6 +2600,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) { qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); + pTask = taosArrayGet(pJob->pTasks, i); pTask->status = CTG_TASK_LAUNCHED; }