fix(stream): add ref for task.
This commit is contained in:
parent
43b14ba8b4
commit
a0d81734f1
|
@ -98,8 +98,10 @@ static void doReExecScanhistory(void* param, void* tmrId) {
|
|||
char* p = NULL;
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
|
||||
|
@ -108,6 +110,9 @@ static void doReExecScanhistory(void* param, void* tmrId) {
|
|||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
|
||||
pTask->info.fillHistory, ref);
|
||||
|
||||
// release the task.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
|
||||
&pTask->schedHistoryInfo.pTimer);
|
||||
|
@ -122,6 +127,10 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
numOfTicks = SCANHISTORY_IDLE_TICK;
|
||||
}
|
||||
|
||||
// add ref for task
|
||||
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
ASSERT(p != NULL);
|
||||
|
||||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
|
|
Loading…
Reference in New Issue