From 97c44e7c1824da05d7f8ff62710b66843bfed2c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Jul 2023 18:42:02 +0800 Subject: [PATCH 01/15] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 12 ++++---- source/dnode/mnode/impl/src/mndScheduler.c | 8 ++--- source/dnode/snode/src/snode.c | 6 ++-- source/dnode/vnode/src/tq/tq.c | 10 +++---- source/libs/stream/src/stream.c | 17 ++++++----- source/libs/stream/src/streamDispatch.c | 32 ++++++++++---------- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamRecover.c | 20 ++++++------- source/libs/stream/src/streamTask.c | 34 +++++++++++----------- 9 files changed, 70 insertions(+), 71 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 34a0bc8657..bbeeb33523 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -296,15 +296,15 @@ typedef struct SDispatchMsgInfo { } SDispatchMsgInfo; typedef struct { - int8_t outputType; - int8_t outputStatus; - SStreamQueue* outputQueue; -} SSTaskOutputInfo; + int8_t type; + int8_t status; + SStreamQueue* queue; +} STaskOutputInfo; struct SStreamTask { SStreamId id; SSTaskBasicInfo info; - int8_t outputType; + STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; SStreamStatus status; SCheckpointInfo chkInfo; @@ -326,9 +326,7 @@ struct SStreamTask { }; int8_t inputStatus; - int8_t outputStatus; SStreamQueue* inputQueue; - SStreamQueue* outputQueue; // trigger int8_t triggerStatus; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 33905bad86..7cb52d2bc9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -87,10 +87,10 @@ END: int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; + pTask->outputInfo.type = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; } else { - pTask->outputType = TASK_OUTPUT__TABLE; + pTask->outputInfo.type = TASK_OUTPUT__TABLE; pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); @@ -110,7 +110,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { isShuffle = true; - pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; + pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { return -1; @@ -291,7 +291,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDispatcher->nodeId = pTask->info.nodeId; pDispatcher->epSet = pTask->info.epSet; - pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; + pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e4bc184be3..700b0191a5 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -66,14 +66,14 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(512 << 10); - pTask->outputQueue = streamQueueOpen(512 << 10); + pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; pTask->chkInfo.version = ver; pTask->pMeta = pSnode->pMeta; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bbdd98e356..97aa69bab6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -811,14 +811,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(512 << 10); - pTask->outputQueue = streamQueueOpen(512 << 10); + pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; @@ -885,10 +885,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } // sink - if (pTask->outputType == TASK_OUTPUT__SMA) { + if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; - } else if (pTask->outputType == TASK_OUTPUT__TABLE) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 07d7cb3040..4de9f6a7ed 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -216,15 +216,16 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // todo add log int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; - if (pTask->outputType == TASK_OUTPUT__TABLE) { + int32_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); destroyStreamDataBlock(pBlock); - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (type == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); destroyStreamDataBlock(pBlock); } else { - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); + code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); if (code != 0) { // todo failed to add it into the output queue, free it. return code; } @@ -274,7 +275,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); // there are other dispatch message not response yet - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); if (leftRsp > 0) { @@ -283,9 +284,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); + qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp @@ -309,7 +310,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // now ready for next data output - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline streamDispatchStreamBlock(pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9241df2e70..566e7da9e4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -437,7 +437,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; @@ -467,7 +467,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); return code; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -545,7 +545,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { @@ -561,29 +561,29 @@ void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH)); + STaskOutputInfo* pInfo = &pTask->outputInfo; + ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH)); - int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); + int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, numOfElems); } // to make sure only one dispatch is running - int8_t old = - atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); return 0; } ASSERT(pTask->msgInfo.pData == NULL); - qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); + qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status); - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); + SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue); if (pBlock == NULL) { - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus); + atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status); return 0; } @@ -599,19 +599,19 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, - tstrerror(terrno), pTask->outputStatus, retryCount); + tstrerror(terrno), pInfo->status, retryCount); // todo deal with only partially success dispatch case atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore + if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; return code; } - if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr, - retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c8aa6f5615..06eed5fee2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -561,7 +561,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { } // blocked by downstream task - if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) { + if (pTask->outputInfo.status == TASK_OUTPUT_STATUS__BLOCKED) { return false; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f51efb23d1..43449a1a62 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -96,7 +96,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; @@ -108,7 +108,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { pWindow->skey, pWindow->ekey, req.reqId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -153,9 +153,9 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId); - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -179,7 +179,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs const char* id = pTask->id.idStr; if (pRsp->status == 1) { - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); @@ -218,7 +218,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH); + ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH); if (pRsp->reqId != pTask->checkReqId) { return -1; } @@ -296,10 +296,10 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.taskId = pTask->fixedEpDispatcher.taskId; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -362,10 +362,10 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.taskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ef83583ea4..ca4586a1b4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -44,7 +44,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; addToTaskset(pTaskList, pTask); return pTask; @@ -74,7 +74,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; @@ -109,19 +109,19 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FETCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } @@ -137,7 +137,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; @@ -179,21 +179,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FETCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } @@ -211,8 +211,8 @@ void tFreeStreamTask(SStreamTask* pTask) { streamQueueClose(pTask->inputQueue); } - if (pTask->outputQueue) { - streamQueueClose(pTask->outputQueue); + if (pTask->outputInfo.queue) { + streamQueueClose(pTask->outputInfo.queue); } if (pTask->exec.qmsg) { @@ -229,11 +229,11 @@ void tFreeStreamTask(SStreamTask* pTask) { } taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); tSimpleHashCleanup(pTask->tbSink.pTblInfo); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; From aed99da6c1dfac8bc547b6098778b39b063e6c23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 12:14:53 +0800 Subject: [PATCH 02/15] refactor: refactor the stream task starting order. --- include/libs/stream/tstream.h | 12 +- source/dnode/snode/src/snode.c | 6 +- source/dnode/vnode/src/tq/tq.c | 119 +++++++------- source/dnode/vnode/src/tq/tqRestore.c | 10 +- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamRecover.c | 206 +++++++++++++++++-------- 6 files changed, 224 insertions(+), 133 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bbeeb33523..2ce640a2bc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -46,7 +46,7 @@ enum { TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused - TASK_STATUS__PAUSE, + TASK_STATUS__PAUSE, // pause }; enum { @@ -315,7 +315,7 @@ struct SStreamTask { SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray - + int64_t initTs; // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -572,12 +572,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history -void streamPrepareNdoCheckDownstream(SStreamTask* pTask); -int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); +void streamTaskCheckDownstreamTasks(SStreamTask* pTask); +int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); -int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 700b0191a5..3b19c7ae4a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -72,6 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } + pTask->initTs = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; @@ -166,11 +167,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); taosWUnLockLatch(&pSnode->pMeta->lock); - - streamPrepareNdoCheckDownstream(pTask); qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + streamTaskCheckDownstreamTasks(pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 97aa69bab6..c550b06acb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -817,11 +817,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } + pTask->initTs = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + // backup the initial status, and set it to be TASK_STATUS__INIT pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; @@ -880,7 +882,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->exec.pExecutor == NULL) { return -1; } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } @@ -963,28 +964,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - SEncoder encoder; - int32_t code; - int32_t len; - - tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); - if (code < 0) { - tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); - return -1; - } - - void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeStreamTaskCheckRsp(&encoder, &rsp); - tEncoderClear(&encoder); - - SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; - - tmsgSendRsp(&rspMsg); - return 0; + return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, pTask->id.taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { @@ -1062,13 +1042,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } taosWUnLockLatch(&pStreamMeta->lock); - - // 3. It's an fill history task, do nothing. wait for the main task to start it - streamPrepareNdoCheckDownstream(pTask); - tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + // 3. It's an fill history task, do nothing. wait for the main task to start it + streamTaskCheckDownstreamTasks(pTask); return 0; } @@ -1087,8 +1065,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - const char* pId = pTask->id.idStr; - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId, + const char* id = pTask->id.idStr; + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); int64_t st = taosGetTimestampMs(); @@ -1104,14 +1082,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); return 0; } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el); + tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; @@ -1125,7 +1103,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + tqDebug("s-task:%s scan-history-task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); @@ -1139,14 +1117,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); + id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); taosMsleep(100); } // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pId); + pStreamTask->info.taskLevel, id); // if it's an source task, extract the last version in wal. streamHistoryTaskSetVerRangeStep2(pTask); @@ -1154,7 +1132,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (!streamTaskRecoverScanStep1Finished(pTask)) { tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", - pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId); + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); @@ -1165,7 +1143,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); + tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); // 3. notify downstream tasks to transfer executor state after handle all history blocks. if (!pTask->status.transferState) { @@ -1191,7 +1169,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTryExec(pTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + tqDebug("s-task:%s scan-history-task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1212,13 +1190,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; - tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId, - pWindow->skey, pWindow->ekey); + tqDebug( + "s-task:%s scan history in current time window completed, no related fill history task, reset the time " + "window:%" PRId64 " - %" PRId64, + id, pWindow->skey, pWindow->ekey); } else { tqDebug( - "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start " + "s-task:%s scan history in current time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, - pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); + id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the @@ -1452,31 +1432,56 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } -int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { - if (pTask) { - if (!streamTaskShouldPause(&pTask->status)) { - tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - streamMetaReleaseTask(pStreamMeta, pTask); - } else { - return -1; +// todo rule out the status when pause not suitable. +static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { + if (!streamTaskShouldPause(&pTask->status)) { + tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); } + return 0; } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); - int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + pReq->taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); + + int32_t code = tqProcessTaskPauseImpl(pMeta, pTask); if (code != 0) { + streamMetaReleaseTask(pMeta, pTask); return code; } - SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); - if (pHistoryTask) { - code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask); + + SStreamTask* pHistoryTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + if (pHistoryTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, + pTask->historyTaskId.taskId); + + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr); + code = tqProcessTaskPauseImpl(pMeta, pHistoryTask); } + + streamMetaReleaseTask(pMeta, pTask); + if (pHistoryTask != NULL) { + streamMetaReleaseTask(pMeta, pHistoryTask); + } + return code; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 5db3e735cc..0d9edbe5f4 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { continue; } - streamTaskCheckDownstreamTasks(pTask); + if (pTask->info.fillHistory == 1) { + tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr, + pTask->streamTaskId.taskId); + continue; + } + + streamTaskDoCheckDownstreamTasks(pTask); streamMetaReleaseTask(pMeta, pTask); } - taosArrayDestroy(pTaskList); + taosArrayDestroy(pTaskList); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a2b5d0e396..0ef9807f8a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); - qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); + qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } @@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) { - qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 43449a1a62..852622e5a1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,6 +17,18 @@ #include "ttimer.h" #include "wal.h" +static void launchFillHistoryTask(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); + +static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { + ASSERT(pTask->status.downstreamReady == 0); + pTask->status.downstreamReady = 1; + int64_t el = (taosGetTimestampMs() - pTask->initTs); + + qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", + pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); +} + int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; streamBuildSourceRecover1Req(pTask, &req, igUntreated); @@ -51,9 +63,9 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, - pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); - +// qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, +// pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); +// streamSetParamForScanHistory(pTask); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); @@ -84,7 +96,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } // check status -int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { +int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { SHistDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; @@ -129,11 +141,13 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - pTask->status.downstreamReady = 1; - qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s", - pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); + + streamTaskSetForReady(pTask, 0); + streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); + launchFillHistoryTask(pTask); } return 0; @@ -171,7 +185,28 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p } int32_t streamTaskCheckStatus(SStreamTask* pTask) { - return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; + return (pTask->status.downstreamReady == 1)? 1:0; +} + +static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { + streamTaskSetForReady(pTask, numOfReqs); + const char* id = pTask->id.idStr; + + int8_t status = pTask->status.taskStatus; + const char* str = streamGetTaskStatusStr(status); + + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL); + streamTaskSetRangeStreamCalc(pTask); + + if (status == TASK_STATUS__SCAN_HISTORY) { + qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, numOfReqs, str); + streamTaskLaunchScanHistory(pTask); + } else { + qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + } + + // when current stream task is ready, check the related fill history task. + launchFillHistoryTask(pTask); } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -201,17 +236,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - pTask->status.downstreamReady = 1; - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, - numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus)); - streamTaskLaunchScanHistory(pTask); - } else { - ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); - qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - } + doProcessDownstreamReadyRsp(pTask, numOfReqs); } else { int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, @@ -223,19 +249,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } - // set the downstream tasks have been checked flag - ASSERT(pTask->status.downstreamReady == 0); - pTask->status.downstreamReady = 1; - - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL); - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - streamTaskLaunchScanHistory(pTask); - } else { - qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - } + doProcessDownstreamReadyRsp(pTask, 1); } } else { // not ready, wait for 100ms and retry qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, @@ -248,6 +262,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo *pRpcInfo, int32_t taskId) { + SEncoder encoder; + int32_t code; + int32_t len; + + tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); + if (code < 0) { + qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); + return -1; + } + + void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamTaskCheckRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + + tmsgSendRsp(&rspMsg); + return 0; +} + // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); @@ -434,7 +474,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { } // check if downstream tasks have been ready - streamTaskCheckDownstreamTasks(pHTask); + streamTaskDoCheckDownstreamTasks(pHTask); } typedef struct SStreamTaskRetryInfo { @@ -500,7 +540,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. -int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) { +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; @@ -675,40 +715,78 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory return 0; } -// todo handle race condition, this task may be destroyed -void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { - if (pTask->info.fillHistory) { - qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); +void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { + if (pTask->historyTaskId.taskId == 0) { + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } else { - // calculate the correct start time window, and start the handle the history data for the main task. - if (pTask->historyTaskId.taskId != 0) { - // check downstream tasks for associated scan-history-data tasks - streamCheckHistoryTaskDownstream(pTask); + SHistDataRange* pRange = &pTask->dataRange; - // launch current task - SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; - int64_t ver = pRange->range.minVer; + int64_t ekey = pRange->window.ekey + 1; + int64_t ver = pRange->range.minVer; - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; - qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); - } else { - SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - } - - ASSERT(pTask->status.downstreamReady == 0); - - // check downstream tasks for itself - streamTaskCheckDownstreamTasks(pTask); + qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 + ", verRang:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); } } + +void launchFillHistoryTask(SStreamTask* pTask) { + int32_t tId = pTask->historyTaskId.taskId; + if (tId == 0) { + return; + } + + ASSERT(pTask->status.downstreamReady == 1); + qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); + + // launch associated fill history task + streamLaunchFillHistoryTask(pTask); +} + +// todo handle race condition, this task may be destroyed +void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { + if (pTask->info.fillHistory) { + qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); + return; + } + + // calculate the correct start time window, and start the handle the history data for the main task. +/* if (pTask->historyTaskId.taskId != 0) { + // check downstream tasks for associated scan-history-data tasks + streamLaunchFillHistoryTask(pTask); + + // launch current task + SHistDataRange* pRange = &pTask->dataRange; + int64_t ekey = pRange->window.ekey + 1; + int64_t ver = pRange->range.minVer; + + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; + + qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 + ", ver range:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); + } else { + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + }*/ + + ASSERT(pTask->status.downstreamReady == 0); + + // check downstream tasks for itself + streamTaskDoCheckDownstreamTasks(pTask); +} From 45cb478b319c6de8736408c45d7f132ee7227f1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 15:36:46 +0800 Subject: [PATCH 03/15] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/stream/src/streamRecover.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c550b06acb..5c4f4048d1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -964,7 +964,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, pTask->id.taskId); + return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 06b90d0a51..4c06b34df4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -122,7 +122,7 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { return; } - qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN); + qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN); pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 852622e5a1..a39741117c 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -199,7 +199,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamTaskSetRangeStreamCalc(pTask); if (status == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, numOfReqs, str); + qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); From c9fa170e65b631218a22cce1dcefceb5cf1cc215 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 16:31:46 +0800 Subject: [PATCH 04/15] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 36 ++++-------------------- source/libs/stream/src/streamExec.c | 38 +++++++++++++++++++------- source/libs/stream/src/streamRecover.c | 35 +----------------------- 3 files changed, 35 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5c4f4048d1..887620c8fa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1066,8 +1066,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step 1 const char* id = pTask->id.idStr; - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); int64_t st = taosGetTimestampMs(); int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, @@ -1112,6 +1112,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + // todo remove this // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { @@ -1168,20 +1169,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 5. resume the related stream task. streamTryExec(pTask); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", id); - - streamMetaSaveTask(pMeta, pTask); - streamMetaSaveTask(pMeta, pStreamTask); - streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); - - taosWLockLatch(&pMeta->lock); - if (streamMetaCommit(pTask->pMeta) < 0) { - // persist to disk - } - taosWUnLockLatch(&pMeta->lock); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1241,22 +1230,9 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { } // transfer the ownership of executor state - tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr); + tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); + ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - // related stream task load the state from the state storage backend - SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId); - return -1; - } - - // when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure. - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); - streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask); - - ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; streamSchedExec(pTask); @@ -1366,7 +1342,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06eed5fee2..0a248d0ffe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -351,30 +351,36 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { - SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); + SStreamMeta* pMeta = pTask->pMeta; + + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", - pTask->id.idStr, pTask->streamTaskId.taskId); + pTask->status.transferState = false; // reset this value, to avoid transfer state again + + qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, + pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); } - ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); + STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; // It must be halted for a source stream task, since when the related scan-history-data task start scan the history // for the step 2. For a agg task + int8_t status = pStreamTask->status.taskStatus; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); + ASSERT(status == TASK_STATUS__HALT); } else { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__SCAN_HISTORY); pStreamTask->status.taskStatus = TASK_STATUS__HALT; - qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } - // wait for the stream task to be idle + // wait for the stream task to handle all in the inputQ, and to be idle waitForTaskIdle(pTask, pStreamTask); // In case of sink tasks, no need to be halted for them. @@ -399,10 +405,23 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // reset the status of stream task streamSetStatusNormal(pStreamTask); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + + // save to disk + taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pTask); + streamMetaSaveTask(pMeta, pStreamTask); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pMeta->lock); + streamSchedExec(pStreamTask); - streamMetaReleaseTask(pTask->pMeta, pStreamTask); + streamMetaReleaseTask(pMeta, pStreamTask); return TSDB_CODE_SUCCESS; } @@ -480,7 +499,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); - pTask->status.transferState = false; // reset this value, to avoid transfer state again if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a39741117c..ddd7ae4676 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -62,10 +62,6 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - -// qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, -// pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); -// streamSetParamForScanHistory(pTask); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); @@ -84,12 +80,10 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { walReaderGetCurrentVer(pTask->exec.pWalReader)); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - streamSetStatusNormal(pTask); streamSetParamForScanHistory(pTask); streamAggScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - streamSetStatusNormal(pTask); - qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); + qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } return 0; @@ -145,7 +139,6 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetForReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); - streamTaskLaunchScanHistory(pTask); launchFillHistoryTask(pTask); } @@ -759,32 +752,6 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { return; } - // calculate the correct start time window, and start the handle the history data for the main task. -/* if (pTask->historyTaskId.taskId != 0) { - // check downstream tasks for associated scan-history-data tasks - streamLaunchFillHistoryTask(pTask); - - // launch current task - SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; - int64_t ver = pRange->range.minVer; - - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; - - qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); - } else { - SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 - " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - }*/ - ASSERT(pTask->status.downstreamReady == 0); // check downstream tasks for itself From 9353f7c8cfb4b5b1793907ba54974886a4a57618 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 16:35:02 +0800 Subject: [PATCH 05/15] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 887620c8fa..81176f8929 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1112,10 +1112,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - // todo remove this // wait for the stream task get ready for scan history data - while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || - pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); From db46d31ca5e4eea0f250fe4f7824c9643fc282b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 01:32:42 +0800 Subject: [PATCH 06/15] fix(stream): refactor the halt function. --- include/libs/stream/tstream.h | 7 ++- source/dnode/mnode/impl/src/mndScheduler.c | 23 +++++++-- source/dnode/vnode/src/tq/tq.c | 60 ++++++++++++---------- source/libs/stream/src/streamExec.c | 25 ++++----- source/libs/stream/src/streamRecover.c | 49 ++++++++++++++++-- 5 files changed, 112 insertions(+), 52 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2ce640a2bc..329a6bbc29 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused + TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it? TASK_STATUS__PAUSE, // pause }; @@ -272,6 +272,7 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; bool transferState; int8_t timerActive; // timer is active + int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; typedef struct SHistDataRange { @@ -557,7 +558,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); @@ -593,6 +593,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); +void streamTaskPause(SStreamTask* pTask); +void streamTaskDisablePause(SStreamTask* pTask); +void streamTaskEnablePause(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 7cb52d2bc9..8984b798ab 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,6 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; +static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); @@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas return terrno; } + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pTask, pSinkTask); + } + return TSDB_CODE_SUCCESS; } -static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -295,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { +int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return -1; } - return setEpToDownstreamTask(pTask, pDownstreamTask); + return setTaskUpstreamEpInfo(pTask, pDownstreamTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } +static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { + SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask); + } +} + static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } + setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); + setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); + // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); } else if (numOfPlanLevel == 1) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81176f8929..201c621ab3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -914,6 +914,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } + // reset the task status from unfinished transaction + if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr); + pTask->status.taskStatus = TASK_STATUS__NORMAL; + } + streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 @@ -1031,9 +1037,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the newest commit version as the initial start version of stream task. + int32_t taskId = 0; taosWLockLatch(&pStreamMeta->lock); code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask); + taskId = pTask->id.taskId; int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); @@ -1046,7 +1054,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); // 3. It's an fill history task, do nothing. wait for the main task to start it - streamTaskCheckDownstreamTasks(pTask); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); + if (p != NULL) { + streamTaskCheckDownstreamTasks(pTask); + } + + streamMetaReleaseTask(pStreamMeta, p); return 0; } @@ -1073,7 +1086,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { - ASSERT(0); + tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id, + schedStatus); + + streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1215,9 +1231,11 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); + tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already", req.taskId); + tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId); return -1; } @@ -1406,17 +1424,6 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } -// todo rule out the status when pause not suitable. -static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { - if (!streamTaskShouldPause(&pTask->status)) { - tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - - return 0; -} - int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; @@ -1425,30 +1432,29 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - - int32_t code = tqProcessTaskPauseImpl(pMeta, pTask); - if (code != 0) { - streamMetaReleaseTask(pMeta, pTask); - return code; - } + streamTaskPause(pTask); SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, - pTask->historyTaskId.taskId); + tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", + pMeta->vgId, pTask->historyTaskId.taskId); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; } - tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr); - code = tqProcessTaskPauseImpl(pMeta, pHistoryTask); + tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); + streamTaskPause(pHistoryTask); } streamMetaReleaseTask(pMeta, pTask); @@ -1456,7 +1462,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg streamMetaReleaseTask(pMeta, pHistoryTask); } - return code; + return TSDB_CODE_SUCCESS; } int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0a248d0ffe..0654bcf69f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -365,6 +365,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } + // todo fix race condition + streamTaskDisablePause(pTask); + streamTaskDisablePause(pStreamTask); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -420,6 +424,10 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } taosWUnLockLatch(&pMeta->lock); + // pause allowed + streamTaskEnablePause(pStreamTask); + streamTaskEnablePause(pTask); + streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); return TSDB_CODE_SUCCESS; @@ -568,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } bool streamTaskIsIdle(const SStreamTask* pTask) { - int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue); - if (numOfItems > 0) { - return false; - } - - numOfItems = taosQallItemSize(pTask->inputQueue->qall); - if (numOfItems > 0) { - return false; - } - - // blocked by downstream task - if (pTask->outputInfo.status == TASK_OUTPUT_STATUS__BLOCKED) { - return false; - } - - return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/); } int32_t streamTryExec(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ddd7ae4676..d3687c3845 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -72,7 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - return doLaunchScanHistoryTask(pTask); + int32_t code = doLaunchScanHistoryTask(pTask); + streamTaskEnablePause(pTask); + return code; } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, @@ -86,6 +88,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } + streamTaskEnablePause(pTask); return 0; } @@ -198,6 +201,11 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } + // enable pause when init completed. + if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) { + streamTaskEnablePause(pTask); + } + // when current stream task is ready, check the related fill history task. launchFillHistoryTask(pTask); } @@ -415,8 +423,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // agg int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr, - pTask->numOfWaitingUpstream); + qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, + pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); return 0; } @@ -745,7 +753,6 @@ void launchFillHistoryTask(SStreamTask* pTask) { streamLaunchFillHistoryTask(pTask); } -// todo handle race condition, this task may be destroyed void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { if (pTask->info.fillHistory) { qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); @@ -757,3 +764,37 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { // check downstream tasks for itself streamTaskDoCheckDownstreamTasks(pTask); } + +void streamTaskPause(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + + int64_t st = taosGetTimestampMs(); + while(!pTask->status.pauseAllowed) { + qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); + taosMsleep(100); + } + + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + + int64_t el = taosGetTimestampMs() - st; + qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.keepTaskStatus), el); +} + +// todo fix race condition +void streamTaskDisablePause(SStreamTask* pTask) { + // pre-condition check + while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + taosMsleep(10); + qDebug("s-task:%s already in pause, wait for pause being cancelled, and then set pause disabled", pTask->id.idStr); + } + + qDebug("s-task:%s disable task pause", pTask->id.idStr); + pTask->status.pauseAllowed = 0; +} + +void streamTaskEnablePause(SStreamTask* pTask) { + qDebug("s-task:%s enable task pause", pTask->id.idStr); + pTask->status.pauseAllowed = 1; +} \ No newline at end of file From 0d1c4f9ee22ed6b7b4a66144a6e3f5cfc688132e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 09:13:06 +0800 Subject: [PATCH 07/15] fix(stream): fix syntax error --- source/libs/stream/src/streamRecover.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d3687c3845..022ebd24c7 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -779,7 +779,7 @@ void streamTaskPause(SStreamTask* pTask) { int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.keepTaskStatus), el); + streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); } // todo fix race condition From bd8b811b9dc5c602e6a3a2dd6ece68c8da709ea8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 09:14:25 +0800 Subject: [PATCH 08/15] refactor: do some internal refactor. --- source/libs/stream/src/streamRecover.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 022ebd24c7..80b3c2a618 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -785,12 +785,13 @@ void streamTaskPause(SStreamTask* pTask) { // todo fix race condition void streamTaskDisablePause(SStreamTask* pTask) { // pre-condition check + const char* id = pTask->id.idStr; while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - taosMsleep(10); - qDebug("s-task:%s already in pause, wait for pause being cancelled, and then set pause disabled", pTask->id.idStr); + taosMsleep(100); + qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, check in 100ms", id); } - qDebug("s-task:%s disable task pause", pTask->id.idStr); + qDebug("s-task:%s disable task pause", id); pTask->status.pauseAllowed = 0; } From c9d75eab5a2cc4f445fed03f8722664a0abec834 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 09:16:13 +0800 Subject: [PATCH 09/15] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 201c621ab3..0d36933b9b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -923,7 +923,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 - " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", + " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms, disable pause", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); From 39ca9b537f0a68b6b447480e09a207ad34298782 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 09:43:08 +0800 Subject: [PATCH 10/15] fix(stream): fix syntax error --- source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/streamRecover.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0d36933b9b..98e0696c19 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -814,6 +814,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->outputInfo.queue = streamQueueOpen(512 << 10); if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { + tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr); return -1; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 80b3c2a618..693d067506 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -26,7 +26,7 @@ static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { int64_t el = (taosGetTimestampMs() - pTask->initTs); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", - pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); + pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); } int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { From 16d7707b9095e17488c477f13e15573b585f97e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 19:33:43 +0800 Subject: [PATCH 11/15] fix(stream): align the scan real time data for stream task. --- include/libs/stream/tstream.h | 22 +++- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/snode/src/snode.c | 8 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tq/tq.c | 80 +++++++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/stream.c | 14 ++- source/libs/stream/src/streamDispatch.c | 107 ++++++++++++++++- source/libs/stream/src/streamRecover.c | 121 +++++++++++++------- 11 files changed, 283 insertions(+), 82 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 329a6bbc29..b7a516190b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -336,6 +336,7 @@ struct SStreamTask { void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend + SArray* pRspMsgList; // the followings attributes don't be serialized int32_t notReadyTasks; @@ -457,7 +458,9 @@ typedef struct { typedef struct { int64_t streamId; - int32_t taskId; + int32_t upstreamTaskId; + int32_t downstreamTaskId; + int32_t upstreamNodeId; int32_t childId; } SStreamScanHistoryFinishReq, SStreamTransferReq; @@ -518,6 +521,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); +typedef struct { + int64_t streamId; + int32_t upstreamTaskId; + int32_t upstreamNodeId; + int32_t downstreamId; + int32_t downstreamNode; +} SStreamCompleteHistoryMsg; + +int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); +int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); + typedef struct { int64_t streamId; int32_t downstreamTaskId; @@ -567,6 +581,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); @@ -607,8 +622,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); // agg level -int32_t streamAggScanHistoryPrepare(SStreamTask* pTask); -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); +int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo); +int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b2fb7243ff..8206b4e425 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -79,6 +79,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d48bd3f847..462b5b9080 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3b19c7ae4a..6288a048f7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -274,7 +274,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return 0; } -int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { +int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -287,12 +287,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); // find task - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId); + SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); if (pTask == NULL) { return -1; } // do process request - if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) { + if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) { streamMetaReleaseTask(pSnode->pMeta, pTask); return -1; } @@ -415,7 +415,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: - return sndProcessTaskRecoverFinishReq(pSnode, pMsg); + return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); case TDMT_STREAM_TASK_CHECK: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f5a610efc7..77e4a48249 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); // sma diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 98e0696c19..aeb9b55d12 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1084,14 +1084,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); int64_t st = taosGetTimestampMs(); - int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, - TASK_SCHED_STATUS__WAITING); - if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { - tqDebug("s-task:%s failed to launch scan history data in current time window, unexpected sched status:%d", id, - schedStatus); - streamMetaReleaseTask(pMeta, pTask); - return 0; + // we have to continue retrying to successfully execute the scan history task. + while (1) { + int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, + TASK_SCHED_STATUS__WAITING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + break; + } + + tqError("s-task:%s failed to start scan history in current time window, unexpected sched-status:%d, retry in 100ms", + id, schedStatus); + taosMsleep(100); } if (!streamTaskRecoverScanStep1Finished(pTask)) { @@ -1195,12 +1199,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in current time window completed, no related fill history task, reset the time " + "s-task:%s scan history in stream time window completed, no related fill history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); } else { tqDebug( - "s-task:%s scan history in current time window completed, now start to handle data from WAL, start " + "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } @@ -1209,11 +1213,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { code = streamTaskScanHistoryDataComplete(pTask); streamMetaReleaseTask(pMeta, pTask); - // let's start the stream task by extracting data from wal - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - tqStartStreamTasks(pTq); - } - + // when all source task complete to scan history data in stream time window, they are allowed to handle stream data + // at the same time. return code; } @@ -1232,17 +1233,17 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.taskId); + tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.taskId); + tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); return -1; } int32_t remain = streamAlignTransferState(pTask); if (remain > 0) { - tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain); + tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); return 0; } @@ -1257,7 +1258,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1269,20 +1270,49 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); - // find task - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); if (pTask == NULL) { - tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId); + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", + pTq->pStreamMeta->vgId, req.downstreamTaskId); return -1; } - int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId); + tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); + + int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return code; } -int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - // +int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pTq->pStreamMeta->vgId, req.upstreamTaskId); + return -1; + } + + tqDebug("s-task:%s scan-history finish rsp received from task:0x%x", pTask->id.idStr, req.downstreamId); + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); + } else { + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e4a7ed224c..33af7631de 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TRANSFER_STATE: return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: - return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); + return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); + return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 2164b63caf..ff3f35bfed 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -54,6 +54,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); +int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); + extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 4de9f6a7ed..090cef48de 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -419,4 +419,16 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { } } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } + +SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + if (pInfo->taskId == taskId) { + return pInfo; + } + } + + return NULL; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 566e7da9e4..a22a9ec534 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,6 +25,12 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; +static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { + pMsg->msgType = msgType; + pMsg->pCont = pCont; + pMsg->contLen = contLen; +} + static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; - msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, - pReq->taskId, vgId); + qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, + pReq->downstreamTaskId, vgId); return 0; } @@ -620,3 +625,99 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; } + +int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +typedef struct { + SEpSet epset; + int32_t taskId; + SRpcMsg msg; +} SStreamContinueExecInfo; + +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { + int32_t len = 0; + int32_t code = 0; + SEncoder encoder; + + SStreamCompleteHistoryMsg msg = { + .streamId = pReq->streamId, + .upstreamTaskId = pReq->upstreamTaskId, + .upstreamNodeId = pReq->upstreamNodeId, + .downstreamId = pReq->downstreamTaskId, + .downstreamNode = pTask->pMeta->vgId, + }; + + tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code); + if (code < 0) { + return code; + } + + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeCompleteHistoryDataMsg(&encoder, &msg); + tEncoderClear(&encoder); + + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + + SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; + initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); + info.msg.info = *pRpcInfo; + + // todo: fix race condition here + if (pTask->pRspMsgList == NULL) { + pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); + } + + taosArrayPush(pTask->pRspMsgList, &info); + + int32_t num = taosArrayGetSize(pTask->pRspMsgList); + qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, + num); + return TSDB_CODE_SUCCESS; +} + +int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); + + int32_t num = taosArrayGetSize(pTask->pRspMsgList); + for (int32_t i = 0; i < num; ++i) { + SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); + tmsgSendRsp(&pInfo->msg); + + qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel, + pInfo->taskId); + } + + taosArrayClear(pTask->pRspMsgList); + qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, + num); + return 0; +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 693d067506..958a94d741 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -72,9 +72,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - int32_t code = doLaunchScanHistoryTask(pTask); - streamTaskEnablePause(pTask); - return code; + return doLaunchScanHistoryTask(pTask); } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, @@ -83,12 +81,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamSetParamForScanHistory(pTask); - streamAggScanHistoryPrepare(pTask); + streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); + streamTaskScanHistoryPrepare(pTask); } - - streamTaskEnablePause(pTask); return 0; } @@ -143,6 +140,12 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetForReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); + + // enable pause when init completed. + if (pTask->historyTaskId.taskId == 0) { + streamTaskEnablePause(pTask); + } + launchFillHistoryTask(pTask); } @@ -195,14 +198,14 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { streamTaskSetRangeStreamCalc(pTask); if (status == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, str); + qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } // enable pause when init completed. - if (pTask->historyTaskId.taskId == 0 && pTask->info.fillHistory == 0) { + if (pTask->historyTaskId.taskId == 0) { streamTaskEnablePause(pTask); } @@ -296,7 +299,7 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { } int32_t streamRestoreParam(SStreamTask* pTask) { - qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr); + qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); return qRestoreStreamOperatorOption(pTask->exec.pExecutor); } @@ -334,23 +337,33 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { } int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; + SStreamScanHistoryFinishReq req = { + .streamId = pTask->id.streamId, + .childId = pTask->info.selfChildId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->pMeta->vgId, + }; // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.taskId = pTask->fixedEpDispatcher.taskId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + pTask->notReadyTasks = 1; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->notReadyTasks = numOfVgs; qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.taskId = pVgInfo->taskId; + req.downstreamTaskId = pVgInfo->taskId; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + } else { + qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr); + streamProcessScanHistoryFinishRsp(pTask); } return 0; @@ -394,7 +407,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe tmsgSendReq(pEpSet, &msg); qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId); + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId); return 0; } @@ -404,7 +417,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.taskId = pTask->fixedEpDispatcher.taskId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -412,7 +425,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.taskId = pVgInfo->taskId; + req.downstreamTaskId = pVgInfo->taskId; doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } @@ -421,10 +434,11 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { } // agg -int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) { +int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s agg task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, - pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, + streamGetTaskStatusStr(pTask->status.taskStatus)); return 0; } @@ -440,27 +454,56 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { return 0; } -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { - if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - ASSERT(left >= 0); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, + SRpcHandleInfo* pRpcInfo) { + int32_t taskLevel = pTask->info.taskLevel; + ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data", - pTask->id.idStr, numOfTasks); + // sink node do not send end of scan history msg to its upstream, which is agg task. + streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); + int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); + ASSERT(left >= 0); + + if (left == 0) { + int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); + qDebug( + "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " + "rsp to all upstream tasks", + pTask->id.idStr, numOfTasks); + + if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamAggUpstreamScanHistoryFinish(pTask); - } else { - qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", - pTask->id.idStr, taskId, childId, left); } + streamNotifyUpstreamContinue(pTask); + } else { + qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", + pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); } return 0; } +int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); + SStreamMeta* pMeta = pTask->pMeta; + + // execute in the scan history complete call back msg, ready to process data from inputQ + streamSetStatusNormal(pTask); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + + taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamSchedExec(pTask); + } + + return TSDB_CODE_SUCCESS; +} + static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; @@ -579,7 +622,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -596,16 +638,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return -1; } - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); - - // ready to process data from inputQ - streamSetStatusNormal(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - - taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pTask); - taosWUnLockLatch(&pMeta->lock); - return 0; } @@ -702,15 +734,20 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } + int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; tEndDecode(pDecoder); return 0; From 1b2636028a5220d5f55231a297f5ae8417d5e3a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 23:05:42 +0800 Subject: [PATCH 12/15] fix(stream): fix memory leak. --- include/libs/stream/tstream.h | 1 + source/dnode/snode/src/snode.c | 1 + source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/streamDispatch.c | 3 ++- source/libs/stream/src/streamTask.c | 5 +++++ 5 files changed, 10 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7a516190b..b169d82574 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -337,6 +337,7 @@ struct SStreamTask { SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; + TdThreadMutex lock; // the followings attributes don't be serialized int32_t notReadyTasks; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6288a048f7..8a7b61135b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -91,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aeb9b55d12..5cae6793be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -921,6 +921,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->status.taskStatus = TASK_STATUS__NORMAL; } + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a22a9ec534..88af841f05 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -691,10 +691,11 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); info.msg.info = *pRpcInfo; - // todo: fix race condition here + taosThreadMutexLock(&pTask->lock); if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } + taosThreadMutexUnlock(&pTask->lock); taosArrayPush(pTask->pRspMsgList, &info); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ca4586a1b4..d54d5fa8b8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,5 +251,10 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->pNameMap); } + if (pTask->pRspMsgList != NULL) { + pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList); + } + + taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); } From bfc4a07207fa47a4e6ebdb1ad6a48bac90ad067f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jul 2023 00:20:02 +0800 Subject: [PATCH 13/15] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tq.c | 2 ++ source/libs/stream/src/streamRecover.c | 12 +++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5cae6793be..69d07a84c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1114,6 +1114,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { + streamTaskEnablePause(pTask); + SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 958a94d741..54688ed0cc 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -204,11 +204,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); } - // enable pause when init completed. - if (pTask->historyTaskId.taskId == 0) { - streamTaskEnablePause(pTask); - } - // when current stream task is ready, check the related fill history task. launchFillHistoryTask(pTask); } @@ -477,6 +472,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory } streamNotifyUpstreamContinue(pTask); + + // sink node does not receive the pause msg from mnode + if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + streamTaskEnablePause(pTask); + } } else { qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left); @@ -497,6 +497,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { streamMetaSaveTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); + streamTaskEnablePause(pTask); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { streamSchedExec(pTask); } From f4864b87135a7c8e852e30b8e680e32699732582 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jul 2023 01:02:58 +0800 Subject: [PATCH 14/15] fix(stream): fix memory leak. --- source/libs/stream/inc/streamInt.h | 6 ++++++ source/libs/stream/src/streamDispatch.c | 6 ------ source/libs/stream/src/streamTask.c | 10 +++++++++- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ff3f35bfed..add893c8c7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -31,6 +31,12 @@ typedef struct { void* timer; } SStreamGlobalEnv; +typedef struct { + SEpSet epset; + int32_t taskId; + SRpcMsg msg; +} SStreamContinueExecInfo; + extern SStreamGlobalEnv streamEnv; void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 88af841f05..ca5d5994b7 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -648,12 +648,6 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory return 0; } -typedef struct { - SEpSet epset; - int32_t taskId; - SRpcMsg msg; -} SStreamContinueExecInfo; - int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { int32_t len = 0; int32_t code = 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d54d5fa8b8..863c4ce025 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include +#include #include "executor.h" #include "tstream.h" #include "wal.h" @@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { return 0; } +static void freeItem(void* p) { + SStreamContinueExecInfo* pInfo = p; + rpcFreeCont(pInfo->msg.pCont); +} + void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); @@ -252,7 +259,8 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->pRspMsgList != NULL) { - pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList); + taosArrayDestroyEx(pTask->pRspMsgList, freeItem); + pTask->pRspMsgList = NULL; } taosThreadMutexDestroy(&pTask->lock); From 9679cc45272cb2be8421b582361eb8451245b57e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jul 2023 10:19:51 +0800 Subject: [PATCH 15/15] fix(stream): fix race condition. --- source/libs/stream/src/streamDispatch.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ca5d5994b7..8334ea1c88 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -689,9 +689,8 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } - taosThreadMutexUnlock(&pTask->lock); - taosArrayPush(pTask->pRspMsgList, &info); + taosThreadMutexUnlock(&pTask->lock); int32_t num = taosArrayGetSize(pTask->pRspMsgList); qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,