diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1f16659499..b7d16924ef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -51,7 +51,7 @@ enum { TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, - TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint + TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint, todo remove it TASK_STATUS__PAUSE, }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c616f1eaa5..13b3ec086b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1220,11 +1220,22 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { - tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr); streamSetStatusNormal(pTask); + + // no lock needs to secure the access of the version + if (pReq->igUntreated) { // discard all the data when the stream task is suspended. + pTask->chkInfo.currentVer = sversion; + tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + } else { // from the previous paused version and go on + tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqStartStreamTasks(pTq); } + return 0; } @@ -1329,7 +1340,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { taosWLockLatch(&pMeta->lock); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); if (numOfTasks == 0) { - tqInfo("vgId:%d no stream tasks exists", vgId); + tqInfo("vgId:%d no stream tasks exist", vgId); taosWUnLockLatch(&pTq->pStreamMeta->lock); return 0; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 0ff6feb39a..081537e2c5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -273,17 +273,11 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("task %d receive retrieve req from node %d task %d", pTask->id.taskId, pReq->srcNodeId, pReq->srcTaskId); - + qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); streamSchedExec(pTask); - - /*streamTryExec(pTask);*/ - - /*streamDispatch(pTask);*/ - return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 63141d6219..d7c5602799 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -206,8 +206,9 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); - ASSERT(left >= 0); - if (left == 0) { + if (left < 0) { + qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr); + } else if (left == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); } @@ -238,12 +239,12 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { - ASSERT(0); + qError("failed to commit stream meta"); return -1; } if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { - ASSERT(0); + qError("failed to commit stream meta"); return -1; }