fix(stream): relase the task.

This commit is contained in:
Haojun Liao 2024-10-28 10:20:08 +08:00
parent 51f3f29d5b
commit 0bc5f8cb88
3 changed files with 16 additions and 10 deletions

View File

@ -248,7 +248,7 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
return code; return code;
} }
mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
pTask->info.nodeId = pVgroup->vgId; pTask->info.nodeId = pVgroup->vgId;
pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -364,12 +364,13 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
static void addNewTaskList(SStreamObj* pStream) { static void addNewTaskList(SStreamObj* pStream) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) { if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
mError("failed to put array"); mError("failed to put into array");
} }
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
pTaskList = taosArrayInit(0, POINTER_BYTES); pTaskList = taosArrayInit(0, POINTER_BYTES);
if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) { if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
mError("failed to put array"); mError("failed to put into array");
} }
} }
} }
@ -402,7 +403,8 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
return code; return code;
} }
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
isFillhistory);
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isFillhistory); haltInitialTaskStatus(pTask, plan, isFillhistory);
@ -512,19 +514,20 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan,
SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) { SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
int32_t code = 0; int32_t code = 0;
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
const char* id = NULL;
code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask); code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
id = pTask->id.idStr;
if (pSnode != NULL) { if (pSnode != NULL) {
code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
mDebug("doAddAggTask taskId:%s, snode id:%d, isFillHistory:%d", pTask->id.idStr, pSnode->id, isFillhistory); mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
} else { } else {
code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
mDebug("doAddAggTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
} }
return code; return code;
} }
@ -678,7 +681,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
int32_t code = addSinkTask(pMnode, pStream, pEpset); code = addSinkTask(pMnode, pStream, pEpset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -1037,6 +1037,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
if (++pTmrInfo->activeCounter < 50) { if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId, streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor"); "trigger-recv-monitor");
doCleanup(pTask, pNotSendList);
return; return;
} }

View File

@ -355,7 +355,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
// fill-history task can only be launched by related stream tasks. // fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo; STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -363,6 +363,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
// concurrently start this task by two threads. // concurrently start this task by two threads.
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask); SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__UNINIT) { if (status.state != TASK_STATUS__UNINIT) {
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
@ -379,6 +380,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if(pTask->status.downstreamReady != 0) { if(pTask->status.downstreamReady != 0) {
stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId); stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_INTERNAL_ERROR; return TSDB_CODE_STREAM_INTERNAL_ERROR;
} }
@ -395,7 +397,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
} }
// concurrently start task may cause the later started task be failed, and also failed to added into meta result. // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {