diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 73a8731dfa..e8bdf97c70 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -186,9 +186,6 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { } streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); - streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); - - streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d21bf1ae58..a935eaf5f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1047,6 +1047,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); + tFreeStreamTask(pTask); taosWUnLockLatch(&pStreamMeta->lock); return -1; } @@ -1261,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t remain = streamAlignTransferState(pTask); if (remain > 0) { tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1475,9 +1477,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL } streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); - streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 269e00bc01..af93d95a9f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -408,14 +408,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); - int32_t taskId = pTask->id.taskId; - pTask->status.taskStatus = TASK_STATUS__DROPPING; // free it and remove it from disk meta-store - streamMetaUnregisterTask(pMeta, pTask->id.taskId); - streamMetaRemoveTask(pMeta, taskId); - streamMetaReleaseTask(pMeta, pTask); + streamMetaUnregisterTask(pMeta, taskId); // save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e8a6783971..758530f4fb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -226,10 +226,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { } int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { - taosWLockLatch(&pMeta->lock); int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn); - taosWUnLockLatch(&pMeta->lock); - if (code != 0) { qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno)); } else { @@ -248,6 +245,8 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + if (streamMetaSaveTask(pMeta, pTask) < 0) { tFreeStreamTask(pTask); return -1; @@ -257,8 +256,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa tFreeStreamTask(pTask); return -1; } - - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { return 0; } @@ -361,13 +358,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ASSERT(pTask->status.timerActive == 0); int32_t num = taosArrayGetSize(pMeta->pTaskList); - qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1); doRemoveIdFromList(pMeta, num, pTask->id.taskId); // remove the ref by timer if (pTask->triggerParam != 0) { taosTmrStop(pTask->schedTimer); } + + streamMetaRemoveTask(pMeta, taskId); + streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 863c4ce025..1eb8d11916 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -211,7 +211,7 @@ static void freeItem(void* p) { } void tFreeStreamTask(SStreamTask* pTask) { - qDebug("free s-task:%s", pTask->id.idStr); + qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { diff --git a/tests/script/tsim/stream/basic3.sim b/tests/script/tsim/stream/basic3.sim index 2df33541b4..f18061a6df 100644 --- a/tests/script/tsim/stream/basic3.sim +++ b/tests/script/tsim/stream/basic3.sim @@ -1,11 +1,9 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/cfg.sh -n dnode1 -c debugflag -v 131 system sh/cfg.sh -n dnode1 -c keepColumnName -v 1 system sh/exec.sh -n dnode1 -s start -sleep 5000 - +sleep 1000 sql connect print ========== interval\session\state window @@ -32,7 +30,6 @@ sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*), sql alter local 'keepColumnName' '1' - sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32)); sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); @@ -58,17 +55,13 @@ sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count sql alter local 'keepColumnName' '0' sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str); - sql desc realtime_meters; - if $rows == 0 then return -1 endi -sql create stream streamd7 into streamt7 as select _wstart, _wend, count(*), first(ca), last(ca) from t1 interval(10s); - +sql create stream streamd7 into streamt7 as select _wstart t1, _wend t2, count(*), first(ca), last(ca) from t1 interval(10s); sql desc streamt7; - if $rows == 0 then return -1 endi @@ -76,12 +69,11 @@ endi sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s); sql desc streamt71; - if $rows == 0 then return -1 endi -sleep 3000 +sleep 1000 sql drop stream if exists streamd1; sql drop stream if exists streamd2; @@ -93,23 +85,19 @@ sql drop stream if exists streamd6; sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s); sql desc streamd10; - if $rows == 0 then return -1 endi sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s); - sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s); - sql desc streamd12; if $rows == 0 then return -1 endi - _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check