diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5fd9a8b12b..0d021e2fa2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -135,7 +135,6 @@ typedef struct { typedef struct { int8_t type; int64_t ver; - int32_t* dataRef; SSDataBlock* pBlock; } SStreamRefDataBlock; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 8d4f6af577..7e106eefde 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -135,6 +135,7 @@ typedef struct { // int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; + int8_t deleteMsg; int8_t enableRef; } SWalFilterCond; @@ -193,9 +194,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); +int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +int64_t walReaderGetValidFirstVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 542d584a24..cb83a561d2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -264,7 +264,7 @@ int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqNextBlockInWal(STqReader* pReader); bool tqNextBlockImpl(STqReader *pReader); -int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a7b5e5e582..ef36b8429a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -187,8 +187,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); // tq util +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1aea479511..0173db976d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -213,7 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); -int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); +int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 75a5aa59da..9730ad2877 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -664,7 +664,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + SWalFilterCond cond = {.deleteMsg = 1}; + pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } streamSetupTrigger(pTask); @@ -868,7 +869,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -892,6 +893,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } + qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion); + walReaderSeekVer(pTask->exec.pWalReader, sversion); + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; @@ -958,7 +962,60 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { + SDecoder* pCoder = &(SDecoder){0}; + SDeleteRes* pRes = &(SDeleteRes){0}; + + *pRefBlock = NULL; + + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); + if (pRes->uidList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tDecoderInit(pCoder, (uint8_t*)pData, len); + tDecodeDeleteRes(pCoder, pRes); + tDecoderClear(pCoder); + + int32_t numOfTables = taosArrayGetSize(pRes->uidList); + if (numOfTables == 0 || pRes->affectedRows == 0) { + taosArrayDestroy(pRes->uidList); + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + blockDataEnsureCapacity(pDelBlock, numOfTables); + pDelBlock->info.rows = numOfTables; + pDelBlock->info.version = ver; + + for (int32_t i = 0; i < numOfTables; i++) { + // start key column + SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column + SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); + colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); + // uid column + SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); + int64_t* pUid = taosArrayGet(pRes->uidList, i); + colDataSetVal(pUidCol, i, (const char*)pUid, false); + + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); + } + + taosArrayDestroy(pRes->uidList); + *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); + if (pRefBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; + (*pRefBlock)->pBlock = pDelBlock; + return TSDB_CODE_SUCCESS; +} + +int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { bool failed = false; SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -978,6 +1035,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { taosArrayDestroy(pRes->uidList); return 0; } + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); blockDataEnsureCapacity(pDelBlock, sz); pDelBlock->info.rows = sz; @@ -1024,8 +1082,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; - pRefBlock->dataRef = pRef; - atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { atomic_sub_fetch_32(pRef, 1); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 85fcb4fd80..50d94c5ed5 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -34,12 +34,12 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - if (msgType == TDMT_VND_SUBMIT) { + if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) { tqStartStreamTasks(pTq); } if (msgType == TDMT_VND_DELETE) { - tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); +// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0d2208fa94..09574a6b62 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -294,32 +294,51 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); return 0; } -int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { - if (walNextValidMsg(pReader) < 0) { - return -1; +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { + int32_t code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pHead->head.version; - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return -1; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("%s failed to create data submit for stream since out of memory", id); + return terrno; + } + } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); + int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); + + extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + } else { + ASSERT(0); } - memcpy(data, pBody, len); - *pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0bb33b1215..7db078140d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -107,42 +107,54 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - SWal *pWal = pTask->exec.pWalReader->pWal; - if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) { - pTask->chkInfo.currentVer = pWal->vers.firstVer; - code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); + + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer == -1) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask); continue; } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamMetaReleaseTask(pStreamMeta, pTask); - continue; } // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); +// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); - SPackedData packData = {0}; - code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + SStreamQueueItem* pItem = NULL; + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; } - SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); + // delete ignore + if (pItem == NULL) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } noNewDataInWal = false; - code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + code = tqAddInputBlockNLaunchTask(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, @@ -151,8 +163,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamDataSubmitDestroy(p); - taosFreeQitem(p); + streamMetaReleaseTask(pStreamMeta, pTask); } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 800bcc8b71..e4b2fa8821 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -82,6 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; qStreamSetOpen(task); + tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index dd4a02957d..622cc5b049 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -26,10 +26,10 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { +int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 303fa09c99..7d3165237e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1821,7 +1821,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; - qDebug("stream scan called"); + qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { @@ -1830,13 +1830,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { pTSInfo->base.cond.startVersion = 0; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; - qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, + qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } @@ -1941,7 +1941,6 @@ FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); - /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9ed297bd6b..7047461ca8 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -288,14 +288,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - + SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem; int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..96022850a3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; - - int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - blockDataDestroy(pRefBlock->pBlock); - taosMemoryFree(pRefBlock->dataRef); - } + blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e10562f5cb..e093d45074 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -60,7 +60,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0324580885..26429ea764 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -224,12 +224,19 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; + const char* id = pTask->id.idStr; - qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); + int64_t st = taosGetTimestampMs(); + qDebug("s-task:%s recover step2(blocking stage) started", id); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } - return streamScanExec(pTask, 100); + int32_t code = streamScanExec(pTask, 100); + + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el); + + return code; } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 36d526d162..eed0c0dcfc 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -62,9 +62,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { void walCloseReader(SWalReader *pReader) { taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pLogFile); - /*if (pReader->cond.enableRef) {*/ - /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/ - /*}*/ taosMemoryFreeClear(pReader->pHead); taosMemoryFree(pReader); } @@ -74,22 +71,25 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); + if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); -// taosMsleep(10); } -// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; + int64_t endVer = TMIN(appliedVer, committedVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", end index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || - (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { + + int32_t type = pReader->pHead->head.msgType; + if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || + (IS_META_MSG(type) && pReader->cond.scanMeta)) { if (walFetchBodyNew(pReader) < 0) { return -1; } @@ -98,13 +98,16 @@ int32_t walNextValidMsg(SWalReader *pReader) { if (walSkipFetchBodyNew(pReader) < 0) { return -1; } + fetchVer = pReader->curVersion; } } + return -1; } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; @@ -206,7 +209,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return 0; } -int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { +int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); @@ -236,7 +239,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); if (pRead->curVersion != fetchVer) { - if (walReadSeekVer(pRead, fetchVer) < 0) { + if (walReaderSeekVer(pRead, fetchVer) < 0) { return -1; } seeked = true; @@ -278,6 +281,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReader->pHead = ptr; pReadHead = &pReader->pHead->head; pReader->capacity = pReadHead->bodyLen; @@ -293,14 +297,11 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } -// pRead->curInvalid = 1; return -1; } if (walValidBodyCksum(pReader->pHead) != 0) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); -// pRead->curInvalid = 1; - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -342,7 +343,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); + code = walReaderSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; // pRead->curInvalid = 1; @@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReadSeekVer(pReader, ver) < 0) { + if (walReaderSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1;