From e2bb64eb18e5b7d2e040d0e8a82d6c607aa6b711 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 14:23:46 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 12 ++++---- source/dnode/vnode/CMakeLists.txt | 2 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 4 +-- .../src/tq/{tqRestore.c => tqStreamTask.c} | 28 ++++++++++--------- source/libs/stream/src/streamRecover.c | 8 +++--- 6 files changed, 28 insertions(+), 28 deletions(-) rename source/dnode/vnode/src/tq/{tqRestore.c => tqStreamTask.c} (93%) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 710069582c..908b250e61 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -248,7 +248,7 @@ typedef struct SStreamChildEpInfo { typedef struct SStreamTaskKey { int64_t streamId; - int64_t taskId; + int32_t taskId; } SStreamTaskKey; typedef struct SStreamTaskId { @@ -273,10 +273,10 @@ typedef struct SStreamStatus { int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; -typedef struct SHistDataRange { +typedef struct SDataRange { SVersionRange range; STimeWindow window; -} SHistDataRange; +} SDataRange; typedef struct SSTaskBasicInfo { int32_t nodeId; // vgroup id or snode id @@ -309,7 +309,6 @@ typedef struct STaskInputInfo { typedef struct STaskSchedInfo { int8_t status; -// int64_t triggerParam; void* pTimer; } STaskSchedInfo; @@ -330,7 +329,7 @@ struct SStreamTask { SStreamStatus status; SCheckpointInfo chkInfo; STaskExec exec; - SHistDataRange dataRange; + SDataRange dataRange; SStreamTaskId historyTaskId; SStreamTaskId streamTaskId; STaskTimestamp tsInfo; @@ -419,6 +418,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); bool streamQueueIsFull(const STaosQueue* pQueue); typedef struct { @@ -681,8 +681,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); -int32_t appendTranstateIntoInputQ(SStreamTask* pTask); - // agg level int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 6c5eeb3424..b66d811284 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -64,7 +64,7 @@ set( "src/tq/tqPush.c" "src/tq/tqSink.c" "src/tq/tqCommit.c" - "src/tq/tqRestore.c" + "src/tq/tqStreamTask.c" "src/tq/tqSnapshot.c" "src/tq/tqOffsetSnapshot.c" "src/tq/tqStreamStateSnap.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2bc41e6b94..1146cfdc46 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -163,7 +163,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqScanWalForStreamTasks(STQ* pTq); +int32_t tqScanWal(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d47baad64..5b848b51bd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1172,7 +1172,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); - appendTranstateIntoInputQ(pTask); + streamTaskPutTranstateIntoInputQ(pTask); streamTryExec(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; @@ -1346,7 +1346,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal - tqScanWalForStreamTasks(pTq); + tqScanWal(pTq); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqStreamTask.c similarity index 93% rename from source/dnode/vnode/src/tq/tqRestore.c rename to source/dnode/vnode/src/tq/tqStreamTask.c index 5efccc8f3c..3c0321f300 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,11 +16,12 @@ #include "tq.h" #include "vnd.h" -static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); -static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); +static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); -// extract submit block from WAL, and add them into the input queue for the sources tasks. -int32_t tqScanWalForStreamTasks(STQ* pTq) { +// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. +int32_t tqScanWal(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); @@ -31,7 +32,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) { // check all tasks bool shouldIdle = true; - createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle); + doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); int32_t times = 0; @@ -140,7 +141,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - // for follower or vnode does not restored, do not launch the stream tasks. + // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { return TSDB_CODE_SUCCESS; } @@ -223,7 +224,7 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } -int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { +int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); if (pTask->chkInfo.currentVer < firstVer) { @@ -267,7 +268,8 @@ int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { return TSDB_CODE_SUCCESS; } -static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { +// todo handle memory error +void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; int64_t maxVer = pTask->dataRange.range.maxVer; @@ -279,7 +281,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - appendTranstateIntoInputQ(pTask); + /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */ streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", @@ -288,7 +290,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { } } -int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noDataInWal = true; int32_t vgId = pStreamMeta->vgId; @@ -356,7 +358,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = doSetOffsetForWalReader(pTask, vgId); + int32_t code = setWalReaderStartOffset(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); continue; @@ -369,7 +371,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue - checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); + handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -390,7 +392,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; - checkForFillHistoryVerRange(pTask, ver); + handleFillhistoryScanComplete(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5dd6d9087b..4b86b9713c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -108,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { // check status static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; SStreamTaskCheckReq req = { @@ -365,7 +365,7 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { return streamScanExec(pTask, 100); } -int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { +int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -764,7 +764,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { if (pTask->historyTaskId.taskId == 0) { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; if (pTask->info.fillHistory == 1) { qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, @@ -775,7 +775,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } } else { - SHistDataRange* pRange = &pTask->dataRange; + SDataRange* pRange = &pTask->dataRange; int64_t ekey = 0; if (pRange->window.ekey < INT64_MAX) {