fix: catalog append subtask issue

This commit is contained in:
dapan1121 2023-07-10 11:23:38 +08:00
parent 8ca0dbd2a2
commit f505993d05
2 changed files with 29 additions and 12 deletions

View File

@ -938,7 +938,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
void* param); void* param);
int32_t ctgLaunchJob(SCtgJob* pJob); int32_t ctgLaunchJob(SCtgJob* pJob);
int32_t ctgMakeAsyncRes(SCtgJob* pJob); int32_t ctgMakeAsyncRes(SCtgJob* pJob);
int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); int32_t ctgLaunchSubTask(SCtgTask** ppTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
int32_t ctgGetTbCfgCb(SCtgTask* pTask); int32_t ctgGetTbCfgCb(SCtgTask* pTask);
void ctgFreeHandle(SCatalog* pCatalog); void ctgFreeHandle(SCatalog* pCatalog);

View File

@ -2097,7 +2097,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
SCtgTbMetaParam param; SCtgTbMetaParam param;
param.pName = pCtx->pName; param.pName = pCtx->pName;
param.flag = 0; param.flag = 0;
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, &param)); CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, &param));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -2108,7 +2108,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {
CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo)); CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName)); CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -2145,7 +2145,7 @@ int32_t ctgLaunchGetTbTagTask(SCtgTask* pTask) {
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {
CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo)); CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbTagCb, dbFName)); CTG_ERR_JRET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbTagCb, dbFName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -2331,7 +2331,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
SCtgTbMetaParam param; SCtgTbMetaParam param;
param.pName = &pCtx->user.tbName; param.pName = &pCtx->user.tbName;
param.flag = CTG_FLAG_SYNC_OP; param.flag = CTG_FLAG_SYNC_OP;
CTG_ERR_RET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetUserCb, &param)); CTG_ERR_RET(ctgLaunchSubTask(&pTask, CTG_TASK_GET_TB_META, ctgGetUserCb, &param));
} else { } else {
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask)); CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask));
} }
@ -2541,19 +2541,35 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) { SCtgTask* ctgGetTask(SCtgJob* pJob, int32_t taskId) {
SCtgJob* pJob = pTask->pJob; int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
if (pTask->taskId == taskId) {
return pTask;
}
}
return NULL;
}
int32_t ctgLaunchSubTask(SCtgTask** ppTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
SCtgJob* pJob = (*ppTask)->pJob;
int32_t subTaskId = -1; int32_t subTaskId = -1;
bool newTask = false; bool newTask = false;
int32_t taskId = (*ppTask)->taskId;
ctgClearSubTaskRes(&pTask->subRes); ctgClearSubTaskRes(&(*ppTask)->subRes);
pTask->subRes.type = type; (*ppTask)->subRes.type = type;
pTask->subRes.fp = fp; (*ppTask)->subRes.fp = fp;
CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId)); CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId));
if (subTaskId < 0) { if (subTaskId < 0) {
CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId)); CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId));
newTask = true; newTask = true;
*ppTask = ctgGetTask(pJob, taskId);
} }
SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId); SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);
@ -2561,10 +2577,10 @@ int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
pSub->subTask = true; pSub->subTask = true;
} }
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, *ppTask));
if (newTask) { if (newTask) {
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(*ppTask, -1);
SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pSubMsgCtx->pBatchs = pMsgCtx->pBatchs; pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;
@ -2584,6 +2600,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) {
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
pTask = taosArrayGet(pJob->pTasks, i);
pTask->status = CTG_TASK_LAUNCHED; pTask->status = CTG_TASK_LAUNCHED;
} }