enh: rsma support delete

This commit is contained in:
kailixu 2023-11-10 07:02:46 +08:00
parent 7288b0fc77
commit ee6e68a71e
3 changed files with 73 additions and 14 deletions

View File

@ -1978,6 +1978,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
if(pDataBlock->info.type == STREAM_DELETE_RESULT) {
}
if (colNum <= 1) { // invalid if only with TS col
continue;
}

View File

@ -629,6 +629,46 @@ _end:
return code;
}
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
int32_t code = 0;
int32_t lino = 0;
if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
int32_t len = 0;
tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
TSDB_CHECK_CODE(code, lino, _exit);
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
if (!pBuf) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
SEncoder encoder;
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
tEncodeSBatchDeleteReq(&encoder, pDelReq);
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL,
.pCont = pBuf,
.contLen = len + sizeof(SMsgHead),
.info.wrapper = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)};
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
taosArrayDestroy(pDelReq->deleteReqs);
if (code) {
smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
SMA_VID(pSma), lino, suid, level, tstrerror(code));
}
return code;
}
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int32_t execType, int8_t *streamFlushed) {
int32_t code = 0;
@ -657,10 +697,25 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
if (output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1;
continue;
} else if (output->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq *pDeleteReq = NULL;
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
if (!pDeleteReq->deleteReqs) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tqBuildDeleteReq("", output, pDeleteReq, "");
TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaProcessDelReq(pSma, suid, pItem->level, pDeleteReq);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
}
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL;
@ -669,7 +724,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
TSDB_CHECK_CODE(code, lino, _exit);
}
// reset the output version to handle reboot
// reset the output version when reboot
if (STREAM_GET_ALL == execType && output->info.version == 0) {
// the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
output->info.version = pItem->submitReqVer;
@ -1511,7 +1566,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
if (nSubmit > 0 || nDelete > 0) {
int32_t size = TARRAY_SIZE(pSubmitArr);
if (size > 0) {
ASSERTS(size > 0, "size is %d", size);
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) {
@ -1519,22 +1574,17 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
}
}
tdFreeRSmaSubmitItems(pSubmitArr, inputType);
}
nSubmit = 0;
nDelete = 0;
} else {
goto _rtn;
}
if (resume == 1) {
nSubmit = 0;
nDelete = 0;
resume = 0;
tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__REF_DATA_BLOCK);
goto _resume_submit;
} else if (resume == 2) {
nSubmit = 0;
nDelete = 0;
resume = 0;
tdFreeRSmaSubmitItems(pSubmitArr, STREAM_INPUT__MERGED_SUBMIT);
goto _resume_delete;
}
}

View File

@ -412,6 +412,10 @@ static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
if (pMsg->info.wrapper) { // skip for rsma
return code;
}
int64_t ctimeMs = taosGetTimestampMs();
SBatchDeleteReq pReq = {0};
SDecoder *pCoder = &(SDecoder){0};
@ -1905,6 +1909,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
}
if()
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
if (code < 0) {
terrno = code;