From 117de7ab7d6b7eaa95e40b00bd2a6b48073f076b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Sep 2023 00:51:13 +0800 Subject: [PATCH 1/3] fix(stream): fix error in drop task. --- source/dnode/vnode/src/tq/tq.c | 7 ------- source/libs/stream/src/streamTask.c | 3 ++- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0bf9cba2dd..09d96f1ad7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1330,14 +1330,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); - return 0; - } - streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index af550f86cb..d42dee76f6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -458,7 +458,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); - qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d taskId:0x%x newEpset:%s", pTask->id.taskId, nodeId, + pInfo->taskId, buf); break; } } From 2bfd6e33553343601961bc575fc69a867fa0c2d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Sep 2023 09:25:22 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 32 +++------ source/dnode/vnode/src/tq/tq.c | 45 +++++++------ source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 6 +- source/dnode/vnode/src/tq/tqStreamTask.c | 8 +-- source/libs/stream/inc/streamInt.h | 11 ++- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamMeta.c | 81 +++++++++++++---------- source/libs/stream/src/streamQueue.c | 5 +- source/libs/stream/src/streamRecover.c | 6 +- source/libs/stream/src/streamTask.c | 43 ++++++++---- 12 files changed, 136 insertions(+), 107 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 932a6d951b..1fd2f2bc13 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -321,15 +321,13 @@ typedef struct { int64_t init; int64_t step1Start; int64_t step2Start; - int64_t sinkStart; -} STaskTimestamp; + int64_t execStart; + int32_t taskUpdateCount; + int64_t latestUpdateTs; +} STaskExecStatisInfo; -typedef struct STokenBucket { - int32_t capacity; // total capacity - int64_t fillTimestamp;// fill timestamp - int32_t numOfToken; // total available tokens - int32_t rate; // number of token per second -} STokenBucket; +typedef struct STokenBucket STokenBucket; +typedef struct SMetaHbInfo SMetaHbInfo; struct SStreamTask { int64_t ver; @@ -345,7 +343,7 @@ struct SStreamTask { SDataRange dataRange; SStreamTaskId historyTaskId; SStreamTaskId streamTaskId; - STaskTimestamp tsInfo; + STaskExecStatisInfo taskExecInfo; SArray* pReadyMsgList; // SArray TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ SArray* pUpstreamInfoList; @@ -359,7 +357,7 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; SSinkTaskRecorder sinkRecorder; - STokenBucket tokenBucket; + STokenBucket* pTokenBucket; void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle @@ -381,19 +379,13 @@ struct SStreamTask { char reserve[256]; }; -typedef struct SMetaHbInfo { - tmr_h hbTmr; - int32_t stopFlag; - int32_t tickCounter; -} SMetaHbInfo; - // meta typedef struct SStreamMeta { char* path; TDB* db; TTB* pTaskDb; TTB* pCheckpointDb; - SHashObj* pTasks; + SHashObj* pTasksMap; SArray* pTaskList; // SArray void* ahandle; TXN* txn; @@ -403,15 +395,13 @@ typedef struct SStreamMeta { bool leader; int8_t taskWillbeLaunched; SRWLatch lock; -// TdThreadRwlock lock; int32_t walScanCounter; void* streamBackend; int64_t streamBackendRid; SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; - SMetaHbInfo hbInfo; - SHashObj* pUpdateTaskList; -// int32_t closedTask; + SMetaHbInfo* pHbInfo; + SHashObj* pUpdateTaskSet; int32_t totalTasks; // this value should be increased when a new task is added into the meta int32_t chkptNotReadyTasks; int64_t rid; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 09d96f1ad7..41322d2b21 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -993,8 +993,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms bool restored = pTq->pVnode->restored; if (p != NULL && restored) { - p->tsInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->tsInfo.init); + p->taskExecInfo.init = taosGetTimestampMs(); + tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->taskExecInfo.init); streamTaskCheckDownstream(p); } else if (!restored) { @@ -1032,14 +1032,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); - if (pTask->tsInfo.step1Start == 0) { + if (pTask->taskExecInfo.step1Start == 0) { ASSERT(pTask->status.pauseAllowed == false); - pTask->tsInfo.step1Start = taosGetTimestampMs(); + pTask->taskExecInfo.step1Start = taosGetTimestampMs(); if (pTask->info.fillHistory == 1) { streamTaskEnablePause(pTask); } } else { - tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); + tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->taskExecInfo.step1Start); } // we have to continue retrying to successfully execute the scan history task. @@ -1059,7 +1059,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; int8_t status = streamTaskSetSchedStatusInActive(pTask); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); streamMetaReleaseTask(pMeta, pTask); @@ -1067,7 +1067,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // the following procedure should be executed, no matter status is stop/pause or not - double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { @@ -1115,7 +1115,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); if (done) { - pTask->tsInfo.step2Start = taosGetTimestampMs(); + pTask->taskExecInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamTryExec(pTask); // exec directly @@ -1127,7 +1127,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - pTask->tsInfo.step2Start = taosGetTimestampMs(); + pTask->taskExecInfo.step2Start = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); int64_t dstVer = pTask->dataRange.range.minVer; @@ -1331,6 +1331,13 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); streamMetaUnregisterTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); + + // commit the update + taosWLockLatch(&pTq->pStreamMeta->lock); + if (streamMetaCommit(pTq->pStreamMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pTq->pStreamMeta->lock); return 0; } @@ -1662,9 +1669,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // update the nodeEpset when it exists taosWLockLatch(&pMeta->lock); - // when replay the WAL, we should update the task epset one again and again, the task may be in stop status. + // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. int64_t keys[2] = {req.streamId, req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask == NULL || *ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, @@ -1676,8 +1683,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } SStreamTask* pTask = *ppTask; + tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr); - tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamSetStatusNormal(pTask); @@ -1686,7 +1693,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { keys[0] = pTask->historyTaskId.streamId; keys[1] = pTask->historyTaskId.taskId; - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); @@ -1708,14 +1715,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } streamTaskStop(pTask); + taosHashPut(pMeta->pUpdateTaskSet, &pTask->id, sizeof(pTask->id), NULL, 0); + if (ppHTask != NULL) { streamTaskStop(*ppHTask); - } - - taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0); - if (ppHTask != NULL) { tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); - taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + taosHashPut(pMeta->pUpdateTaskSet, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); } @@ -1724,14 +1729,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList); + int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); if (updateTasks < numOfTasks) { pMeta->taskWillbeLaunched = 1; tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); } else { - taosHashClear(pMeta->pUpdateTaskList); + taosHashClear(pMeta->pUpdateTaskSet); if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cadbc70c6f..25a07a55d3 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1129,7 +1129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); while (1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter); if (pIter == NULL) { break; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 0925573248..106a4cc9b0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - double el = (taosGetTimestampMs() - pTask->tsInfo.sinkStart) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); @@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; - if (pTask->tsInfo.sinkStart == 0) { - pTask->tsInfo.sinkStart = taosGetTimestampMs(); + if (pTask->taskExecInfo.execStart == 0) { + pTask->taskExecInfo.execStart = taosGetTimestampMs(); } bool onlySubmitData = true; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 8c45aa4f8c..1e66988aab 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -94,8 +94,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { continue; } - pTask->tsInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.init); + pTask->taskExecInfo.init = taosGetTimestampMs(); + tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); streamSetStatusNormal(pTask); streamTaskCheckDownstream(pTask); @@ -241,7 +241,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, key, sizeof(key)); int8_t status = (*pTask)->status.taskStatus; if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) { @@ -307,7 +307,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); - double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamSchedExec(pTask); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bbb7595e5a..fb11ec4ea4 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -29,17 +29,24 @@ extern "C" { #define ONE_MB_F (1048576.0) #define SIZE_IN_MB(_v) ((_v) / ONE_MB_F) -typedef struct { +typedef struct SStreamGlobalEnv { int8_t inited; void* timer; } SStreamGlobalEnv; -typedef struct { +typedef struct SStreamContinueExecInfo { SEpSet epset; int32_t taskId; SRpcMsg msg; } SStreamContinueExecInfo; +struct STokenBucket { + int32_t capacity; // total capacity + int64_t fillTimestamp;// fill timestamp + int32_t numOfToken; // total available tokens + int32_t rate; // number of token per second +}; + extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cc93d25fd5..cfbfdb5da4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -270,7 +270,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { keys[0] = pId->streamId; keys[1] = pId->taskId; - SStreamTask** ppTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask == NULL) { continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a45555d4a..3a34d941dd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -201,7 +201,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { while (!finished) { if (streamTaskShouldPause(&pTask->status)) { - double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0; qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); break; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5497fdc98c..2c44935917 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -43,6 +43,12 @@ typedef struct { SHashObj* pTable; } SMetaRefMgt; +struct SMetaHbInfo { + tmr_h hbTmr; + int32_t stopFlag; + int32_t tickCounter; +}; + SMetaRefMgt gMetaRefMgt; void metaRefMgtInit(); @@ -134,13 +140,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); - pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); - if (pMeta->pTasks == NULL) { + pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); + if (pMeta->pTasksMap == NULL) { goto _err; } - pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->pUpdateTaskList == NULL) { + pMeta->pUpdateTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pMeta->pUpdateTaskSet == NULL) { + goto _err; + } + + pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); + if (pMeta->pHbInfo == NULL) { goto _err; } @@ -164,9 +175,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); - pMeta->hbInfo.tickCounter = 0; - pMeta->hbInfo.stopFlag = 0; + pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); + pMeta->pHbInfo->tickCounter = 0; + pMeta->pHbInfo->stopFlag = 0; pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -200,11 +211,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: taosMemoryFree(pMeta->path); - if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); + if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); + if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); + if (pMeta->pUpdateTaskSet) taosHashCleanup(pMeta->pUpdateTaskSet); taosMemoryFree(pMeta); @@ -258,7 +271,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { void streamMetaClear(SStreamMeta* pMeta) { void* pIter = NULL; - while ((pIter = taosHashIterate(pMeta->pTasks, pIter)) != NULL) { + while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) { SStreamTask* p = *(SStreamTask**)pIter; // release the ref by timer @@ -274,7 +287,7 @@ void streamMetaClear(SStreamMeta* pMeta) { taosRemoveRef(streamBackendId, pMeta->streamBackendRid); - taosHashClear(pMeta->pTasks); + taosHashClear(pMeta->pTasksMap); taosHashClear(pMeta->pTaskBackendUnique); taosArrayClear(pMeta->pTaskList); @@ -315,9 +328,9 @@ void streamMetaCloseImpl(void* arg) { taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpInUse); - taosHashCleanup(pMeta->pTasks); + taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskBackendUnique); - taosHashCleanup(pMeta->pUpdateTaskList); + taosHashCleanup(pMeta->pUpdateTaskSet); taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); @@ -379,7 +392,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -401,14 +414,14 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return 0; } - taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, POINTER_BYTES); + taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, POINTER_BYTES); *pAdded = true; return 0; } int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { - size_t size = taosHashGetSize(pMeta->pTasks); - ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); + size_t size = taosHashGetSize(pMeta->pTasksMap); + ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasksMap)); return (int32_t)size; } @@ -419,7 +432,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (p == NULL) { continue; } @@ -436,7 +449,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taosRLockLatch(&pMeta->lock); int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); @@ -480,7 +493,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t taosWLockLatch(&pMeta->lock); int64_t keys[2] = {streamId, taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; if (streamTaskShouldPause(&pTask->status)) { @@ -500,7 +513,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t while (1) { taosRLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask) { if ((*ppTask)->status.timerActive == 0) { @@ -519,9 +532,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // let's do delete of stream task taosWLockLatch(&pMeta->lock); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask) { - taosHashRemove(pMeta->pTasks, keys, sizeof(keys)); + taosHashRemove(pMeta->pTasksMap, keys, sizeof(keys)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); ASSERT(pTask->status.timerActive == 0); @@ -673,7 +686,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { // do duplicate task check. int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + void* p = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (p == NULL) { // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader // In this case, we try not to start fill-history task anymore. @@ -691,7 +704,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasksMap, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); return -1; @@ -778,8 +791,8 @@ void metaHbToMnode(void* param, void* tmrId) { } // need to stop, stop now - if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) { - pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP; + if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { + pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; qDebug("vgId:%d jump out of meta timer", pMeta->vgId); taosReleaseRef(streamMetaId, rid); return; @@ -792,8 +805,8 @@ void metaHbToMnode(void* param, void* tmrId) { return; } - if (!enoughTimeDuration(&pMeta->hbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); + if (!enoughTimeDuration(pMeta->pHbInfo)) { + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } @@ -812,7 +825,7 @@ void metaHbToMnode(void* param, void* tmrId) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if ((*pTask)->info.fillHistory == 1) { continue; @@ -872,7 +885,7 @@ void metaHbToMnode(void* param, void* tmrId) { } taosArrayDestroy(hbMsg.pTaskStatus); - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } @@ -883,7 +896,7 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); + pIter = taosHashIterate(pMeta->pTasksMap, pIter); if (pIter == NULL) { break; } @@ -906,7 +919,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); + pIter = taosHashIterate(pMeta->pTasksMap, pIter); if (pIter == NULL) { break; } @@ -920,8 +933,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->leader) { - pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; - while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { + pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; + while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { taosMsleep(100); qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index a9d0c3b77e..6aaea2ce24 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -190,9 +190,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu return TSDB_CODE_SUCCESS; } - STokenBucket* pBucket = &pTask->tokenBucket; - bool has = streamTaskHasAvailableToken(pBucket); - if (!has) { // no available token in th bucket, ignore this execution + STokenBucket* pBucket = pTask->pTokenBucket; + if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution // qInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, // pBucket->capacity, pBucket->rate); return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 8b2a800576..db2e418171 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); + int64_t el = (taosGetTimestampMs() - pTask->taskExecInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s", pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); } @@ -525,7 +525,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosWLockLatch(&pMeta->lock); int64_t keys[2] = {pInfo->streamId, pInfo->taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (ppTask) { ASSERT((*ppTask)->status.timerActive >= 1); @@ -590,7 +590,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if (pHTask == NULL) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, hTaskId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d42dee76f6..01318d89cd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -355,6 +355,7 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pUpstreamInfoList = NULL; } + taosMemoryFree(pTask->pTokenBucket); taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); @@ -371,10 +372,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) { qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } - pTask->tsInfo.created = taosGetTimestampMs(); + pTask->taskExecInfo.created = taosGetTimestampMs(); pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; @@ -384,19 +385,25 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; - streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50); - - TdThreadMutexAttr attr = {0}; - int ret = taosThreadMutexAttrInit(&attr); - if (ret != 0) { - qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret)); - return ret; + pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); + if (pTask->pTokenBucket == NULL) { + qError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; } - ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); - if (ret != 0) { - qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret)); - return ret; + streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50); + + TdThreadMutexAttr attr = {0}; + int code = taosThreadMutexAttrInit(&attr); + if (code != 0) { + qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + if (code != 0) { + qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code)); + return code; } taosThreadMutexInit(&pTask->lock, &attr); @@ -517,7 +524,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { taosMsleep(100); } - pTask->tsInfo.init = 0; + pTask->taskExecInfo.init = 0; int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el); return 0; @@ -547,10 +554,18 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { } int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { + STaskExecStatisInfo* p = &pTask->taskExecInfo; + qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, + p->taskUpdateCount + 1, p->latestUpdateTs); + + p->taskUpdateCount += 1; + p->latestUpdateTs = taosGetTimestampMs(); + for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp); } + return 0; } From 91e3d7079624d7c36885390e614b09a34ec79a05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 15 Sep 2023 17:46:17 +0800 Subject: [PATCH 3/3] fix(stream): remove fill-history sink task. --- include/libs/stream/tstream.h | 6 ++-- source/dnode/vnode/src/tq/tq.c | 7 ++++- source/dnode/vnode/src/tq/tqSink.c | 6 ++-- source/libs/stream/src/streamExec.c | 22 +++++++------- source/libs/stream/src/streamMeta.c | 29 +++++++++++++----- source/libs/stream/src/streamTask.c | 46 +++++++++++++++++++++++++---- 6 files changed, 84 insertions(+), 32 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1fd2f2bc13..c41834bd82 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -321,8 +321,8 @@ typedef struct { int64_t init; int64_t step1Start; int64_t step2Start; - int64_t execStart; - int32_t taskUpdateCount; + int64_t start; + int32_t updateCount; int64_t latestUpdateTs; } STaskExecStatisInfo; @@ -722,7 +722,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - +int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 41322d2b21..e665e4c408 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,7 +1084,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1334,10 +1335,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL // commit the update taosWLockLatch(&pTq->pStreamMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks); + if (streamMetaCommit(pTq->pStreamMeta) < 0) { // persist to disk } taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 106a4cc9b0..e0bae18545 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); @@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; - if (pTask->taskExecInfo.execStart == 0) { - pTask->taskExecInfo.execStart = taosGetTimestampMs(); + if (pTask->taskExecInfo.start == 0) { + pTask->taskExecInfo.start = taosGetTimestampMs(); } bool onlySubmitData = true; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3a34d941dd..3b3dca7f5f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -303,7 +303,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTask->id.idStr, pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 2. save to disk taosWLockLatch(&pMeta->lock); @@ -365,7 +366,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); // 4. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; @@ -408,6 +410,8 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); + } else { // drop fill-history task + streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); } return code; @@ -503,16 +507,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } else { // non-dispatch task, do task state transfer directly streamFreeQitem((SStreamQueueItem*)pBlock); - if (level != TASK_LEVEL__SINK) { - qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); - ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); - } - } else { - qDebug("s-task:%s sink task does not transfer state", id); + if (code != TSDB_CODE_SUCCESS) { + /*int8_t status = */ streamTaskSetSchedStatusInActive(pTask); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2c44935917..78eee339f1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -47,6 +47,8 @@ struct SMetaHbInfo { tmr_h hbTmr; int32_t stopFlag; int32_t tickCounter; + int32_t hbCount; + int64_t hbStart; }; SMetaRefMgt gMetaRefMgt; @@ -332,6 +334,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pUpdateTaskSet); + taosMemoryFree(pMeta->pHbInfo); taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); @@ -784,7 +787,6 @@ static bool enoughTimeDuration(SMetaHbInfo* pInfo) { void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; - SStreamHbMsg hbMsg = {0}; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { return; @@ -802,31 +804,37 @@ void metaHbToMnode(void* param, void* tmrId) { if (!pMeta->leader) { qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); taosReleaseRef(streamMetaId, rid); + pMeta->pHbInfo->hbStart = 0; return; } + // set the hb start time + if (pMeta->pHbInfo->hbStart == 0) { + pMeta->pHbInfo->hbStart = taosGetTimestampMs(); + } + if (!enoughTimeDuration(pMeta->pHbInfo)) { taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } - qInfo("vgId:%d start hb", pMeta->vgId); + qDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader); + SStreamHbMsg hbMsg = {0}; taosRLockLatch(&pMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; bool hasValEpset = false; - hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + int64_t keys[2] = {pId->streamId, pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if ((*pTask)->info.fillHistory == 1) { continue; } @@ -878,10 +886,13 @@ void metaHbToMnode(void* param, void* tmrId) { initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); msg.info.noResp = 1; - qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); + pMeta->pHbInfo->hbCount += 1; + + qDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, + pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else { - qError("vgId:%d no mnd epset", pMeta->vgId); + qDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } taosArrayDestroy(hbMsg.pTaskStatus); @@ -914,7 +925,9 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); + qDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, + pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + taosWLockLatch(&pMeta->lock); void* pIter = NULL; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01318d89cd..23ace63d18 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -277,7 +277,20 @@ static void freeUpstreamItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { int32_t taskId = pTask->id.taskId; - qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); + STaskExecStatisInfo* pStatis = &pTask->taskExecInfo; + + qDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, + streamGetTaskStatusStr(pTask->status.taskStatus)); + + qDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 + ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 + " nextProcessVer:%" PRId64, + taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, + pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer); + + if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) { + int32_t k = 1; + } // remove the ref by timer while (pTask->status.timerActive > 0) { @@ -396,7 +409,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); if (code != 0) { - qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); + qError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); return code; } @@ -524,9 +537,8 @@ int32_t streamTaskStop(SStreamTask* pTask) { taosMsleep(100); } - pTask->taskExecInfo.init = 0; int64_t el = taosGetTimestampMs() - st; - qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el); + qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el); return 0; } @@ -556,9 +568,9 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->taskExecInfo; qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, - p->taskUpdateCount + 1, p->latestUpdateTs); + p->updateCount + 1, p->latestUpdateTs); - p->taskUpdateCount += 1; + p->updateCount += 1; p->latestUpdateTs = taosGetTimestampMs(); for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { @@ -615,3 +627,25 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { return status; } + +int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { + SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = vgId; + pReq->taskId = pTaskId->taskId; + pReq->streamId = pTaskId->streamId; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; + int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); + if (code != TSDB_CODE_SUCCESS) { + qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); + return code; + } + + qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); + return code; +}