diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e34b27e9b8..3c171ca510 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -635,13 +635,15 @@ void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); + +// save to b-tree meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 8a7b61135b..73a8731dfa 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -160,7 +160,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 2.save task taosWLockLatch(&pSnode->pMeta->lock); - code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); + code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask); if (code < 0) { taosWUnLockLatch(&pSnode->pMeta->lock); return -1; @@ -179,7 +179,17 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); + if (pTask == NULL) { + qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); + return 0; + } + + streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId); streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); + + streamMetaReleaseTask(pSnode->pMeta, pTask); + streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30194360a8..d21bf1ae58 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1041,7 +1041,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. int32_t taskId = 0; taosWLockLatch(&pStreamMeta->lock); - code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask); + code = streamMetaRegisterTask(pStreamMeta, sversion, pTask); taskId = pTask->id.taskId; int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); @@ -1468,8 +1468,17 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); + return 0; + } + streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId); streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1c824db3b0..fa0561a722 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -101,7 +101,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->triggerParam != 0) { + if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); ASSERT(ref == 2 && pTask->schedTimer == NULL); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a1922f3553..f27e7e6316 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -409,12 +409,15 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + int32_t taskId = pTask->id.taskId; pTask->status.taskStatus = TASK_STATUS__DROPPING; - streamMetaRemoveTask(pMeta, pTask->id.taskId); + + // free it and remove it from disk meta-store + streamMetaUnregisterTask(pMeta, pTask->id.taskId); + streamMetaRemoveTask(pMeta, taskId); // save to disk taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -499,6 +502,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; + if (streamTaskShouldStop(&pTask->status)) { + qDebug("s-task:%s stream task stopped, abort", id); + break; + } // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0ef9807f8a..be8dd38b86 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -217,6 +217,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { + qError("s-task:0x%x save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); return -1; } @@ -224,8 +225,22 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { + taosWLockLatch(&pMeta->lock); + int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn); + taosWUnLockLatch(&pMeta->lock); + + if (code != 0) { + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno)); + } else { + qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId); + } + + return code; +} + // add to the ready tasks hash map, not the restored tasks hash map -int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { @@ -242,6 +257,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* tFreeStreamTask(pTask); return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { return 0; @@ -281,6 +297,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); + qTrace("s-task:%s all refs are gone, free it"); tFreeStreamTask(pTask); } else if (ref < 0) { qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); @@ -297,7 +314,7 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) } } -void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { +int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = NULL; // pre-delete operation @@ -309,7 +326,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } else { qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); taosWUnLockLatch(&pMeta->lock); - return; + return 0; } taosWUnLockLatch(&pMeta->lock); @@ -339,9 +356,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); - tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); + ASSERT(pTask->status.timerActive == 0); int32_t num = taosArrayGetSize(pMeta->pTaskList); @@ -351,15 +367,13 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { // remove the ref by timer if (pTask->triggerParam != 0) { taosTmrStop(pTask->schedTimer); - streamMetaReleaseTask(pMeta, pTask); } - - streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } taosWUnLockLatch(&pMeta->lock); + return 0; } int32_t streamMetaBegin(SStreamMeta* pMeta) { @@ -404,7 +418,9 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); return -1; } @@ -413,6 +429,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; + SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t)); tdbTbcMoveToFirst(pCur); @@ -422,6 +439,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); + taosArrayDestroy(pRecycleList); return -1; } @@ -429,16 +447,29 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - // remove duplicate + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + int32_t taskId = pTask->id.taskId; + tFreeStreamTask(pTask); + + taosArrayPush(pRecycleList, &taskId); + + int32_t total = taosArrayGetSize(pRecycleList); + qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); + continue; + } + + // do duplicate task check. void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); + taosArrayDestroy(pRecycleList); return -1; } + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { tdbFree(pKey); @@ -452,7 +483,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); - taosMemoryFree(pTask); + tFreeStreamTask(pTask); + taosArrayDestroy(pRecycleList); return -1; } @@ -462,8 +494,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { + taosArrayDestroy(pRecycleList); return -1; } + if (taosArrayGetSize(pRecycleList) > 0) { + for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { + int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i); + streamMetaRemoveTask(pMeta, taskId); + } + } + + qDebug("vgId:%d load %d task from disk", pMeta->vgId, taosArrayGetSize(pMeta->pTaskList)); + taosArrayDestroy(pRecycleList); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1c9e2672d1..dffa28e769 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -765,7 +765,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } else { SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; + int64_t ekey = 0; + if (pRange->window.ekey < INT64_MAX) { + ekey = pRange->window.ekey + 1; + } else { + ekey = pRange->window.ekey; + } + int64_t ver = pRange->range.minVer; pRange->window.skey = ekey;