refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-10-16 22:07:37 +08:00
parent f83993757f
commit dd05353b74
8 changed files with 14 additions and 9 deletions

View File

@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaAcquireOneTask(SStreamTask* pTask);
int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
streamMetaAcquireOneTask(*ppTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
SStreamTask* pTask = *ppTask;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {

View File

@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
return;
}
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);

View File

@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;

View File

@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");

View File

@ -753,9 +753,10 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
return code;
}
void streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return ref;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
@ -866,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it
// it is a fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) {
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}

View File

@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
int32_t ref = streamMetaAcquireOneTask(pTask);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
@ -80,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref);
// add one ref count for task
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr");
}

View File

@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->inputq.queue) {
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
pTask->inputq.queue = NULL;
}
if (pTask->outputq.queue) {
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
pTask->outputq.queue = NULL;
}
if (pTask->exec.qmsg) {
@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
pTask->exec.pWalReader = NULL;
}
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);