diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bf21b2eda0..4b777dac82 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1978,6 +1978,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; + if(pDataBlock->info.type == STREAM_DELETE_RESULT) { + + } + if (colNum <= 1) { // invalid if only with TS col continue; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e1cada4665..d0ef36d17f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -629,6 +629,46 @@ _end: return code; } +static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) { + int32_t code = 0; + int32_t lino = 0; + + if (taosArrayGetSize(pDelReq->deleteReqs) > 0) { + int32_t len = 0; + tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code); + TSDB_CHECK_CODE(code, lino, _exit); + + void *pBuf = rpcMallocCont(len + sizeof(SMsgHead)); + if (!pBuf) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SEncoder encoder; + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len); + tEncodeSBatchDeleteReq(&encoder, pDelReq); + tEncoderClear(&encoder); + + ((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode); + + SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, + .pCont = pBuf, + .contLen = len + sizeof(SMsgHead), + .info.wrapper = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; + code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + taosArrayDestroy(pDelReq->deleteReqs); + if (code) { + smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s", + SMA_VID(pSma), lino, suid, level, tstrerror(code)); + } + + return code; +} + static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, int32_t execType, int8_t *streamFlushed) { int32_t code = 0; @@ -657,10 +697,25 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (output->info.type == STREAM_CHECKPOINT) { if (streamFlushed) *streamFlushed = 1; continue; + } else if (output->info.type == STREAM_DELETE_RESULT) { + SBatchDeleteReq *pDeleteReq = NULL; + pDeleteReq->suid = suid; + pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + if (!pDeleteReq->deleteReqs) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + code = tqBuildDeleteReq("", output, pDeleteReq, ""); + TSDB_CHECK_CODE(code, lino, _exit); + code = tdRSmaProcessDelReq(pSma, suid, pItem->level, pDeleteReq); + TSDB_CHECK_CODE(code, lino, _exit); + continue; } + smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), output->info.id.uid, output->info.id.groupId, output->info.rows); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; @@ -669,7 +724,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma TSDB_CHECK_CODE(code, lino, _exit); } - // reset the output version to handle reboot + // reset the output version when reboot if (STREAM_GET_ALL == execType && output->info.version == 0) { // the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously output->info.version = pItem->submitReqVer; @@ -1511,30 +1566,25 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA if (nSubmit > 0 || nDelete > 0) { int32_t size = TARRAY_SIZE(pSubmitArr); - if (size > 0) { - int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { - goto _err; - } + ASSERTS(size > 0, "size is %d", size); + int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { + goto _err; } - tdFreeRSmaSubmitItems(pSubmitArr, inputType); } + tdFreeRSmaSubmitItems(pSubmitArr, inputType); + nSubmit = 0; + nDelete = 0; } else { goto _rtn; } if (resume == 1) { - nSubmit = 0; - nDelete = 0; resume = 0; - tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK); goto _resume_submit; } else if (resume == 2) { - nSubmit = 0; - nDelete = 0; resume = 0; - tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT); goto _resume_delete; } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fbd59203dc..0eb6448fca 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -412,6 +412,10 @@ static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t lino = 0; + if (pMsg->info.wrapper) { // skip for rsma + return code; + } + int64_t ctimeMs = taosGetTimestampMs(); SBatchDeleteReq pReq = {0}; SDecoder *pCoder = &(SDecoder){0}; @@ -1905,6 +1909,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); } + if() code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); if (code < 0) { terrno = code;