refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-12 10:02:18 +08:00
parent f2ccb8aa7e
commit d799212fb2
5 changed files with 72 additions and 122 deletions

View File

@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -31,7 +31,6 @@ extern "C" {
#endif
struct SSnode {
char* path;
SStreamMeta* pMeta;
SMsgCb msgCb;
};

View File

@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) {
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
@ -48,42 +49,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamTaskOpenAllUpstreamInput(pTask);
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
sndError("s-task:%s failed to open state for task", pTask->id.idStr);
return -1;
} else {
sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId);
ASSERT(pTask->exec.pExecutor);
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
@ -117,11 +92,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSnode->path = taosStrdup(path);
if (pSnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
@ -140,7 +110,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
return pSnode;
FAIL:
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
return NULL;
}
@ -156,7 +125,6 @@ void sndClose(SSnode *pSnode) {
streamMetaNotifyClose(pSnode->pMeta);
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
}

View File

@ -699,22 +699,6 @@ end:
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
@ -724,74 +708,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
return code;
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
}
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList),
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// sink
@ -827,6 +746,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.

View File

@ -23,6 +23,67 @@ typedef struct STaskUpdateEntry {
int32_t transId;
} STaskUpdateEntry;
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) {
int32_t vgId = pMeta->vgId;
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
handle.vnode = pVnode;
handle.initTqReader = 1;
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
}
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return -1;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);