enh: rsma support delete

This commit is contained in:
kailixu 2023-11-10 14:44:29 +08:00
parent ee6e68a71e
commit 674e878ffa
13 changed files with 531 additions and 59 deletions

View File

@ -3774,6 +3774,7 @@ typedef struct {
int64_t suid; int64_t suid;
SArray* deleteReqs; // SArray<SSingleDeleteReq> SArray* deleteReqs; // SArray<SSingleDeleteReq>
int64_t ctimeMs; // fill by vnode int64_t ctimeMs; // fill by vnode
int8_t level; // 0 tsdb(default), 1 rsma1 , 2 rsma2
} SBatchDeleteReq; } SBatchDeleteReq;
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq); int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);

View File

@ -1978,10 +1978,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
if(pDataBlock->info.type == STREAM_DELETE_RESULT) {
}
if (colNum <= 1) { // invalid if only with TS col if (colNum <= 1) { // invalid if only with TS col
continue; continue;
} }

View File

@ -8337,6 +8337,7 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq)
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1; if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->level) < 0) return -1;
return 0; return 0;
} }
@ -8361,6 +8362,9 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (!tDecodeIsEnd(pDecoder)) { if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
} }
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI8(pDecoder, &pReq->level) < 0) return -1;
}
return 0; return 0;
} }

View File

@ -147,7 +147,7 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t tqOffsetCommitFile(STqOffsetStore* pStore); int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink // tqSink
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr); const char* pIdStr);
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);

View File

@ -43,8 +43,8 @@ static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
ERsmaExecType type, int8_t level); SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type); static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
@ -654,7 +654,7 @@ static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatc
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL,
.pCont = pBuf, .pCont = pBuf,
.contLen = len + sizeof(SMsgHead), .contLen = len + sizeof(SMsgHead),
.info.wrapper = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)}; .info.ahandle = level == 1 ? VND_RSMA1(pSma->pVnode) : VND_RSMA2(pSma->pVnode)};
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg); code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -698,16 +698,15 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
if (streamFlushed) *streamFlushed = 1; if (streamFlushed) *streamFlushed = 1;
continue; continue;
} else if (output->info.type == STREAM_DELETE_RESULT) { } else if (output->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq *pDeleteReq = NULL; SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
pDeleteReq->suid = suid; deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); if (!deleteReq.deleteReqs) {
if (!pDeleteReq->deleteReqs) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tqBuildDeleteReq("", output, pDeleteReq, ""); code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "");
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tdRSmaProcessDelReq(pSma, suid, pItem->level, pDeleteReq); code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
continue; continue;
} }
@ -715,6 +714,19 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma), smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows); output->info.id.uid, output->info.id.groupId, output->info.rows);
if (STREAM_GET_ALL == execType) {
/**
* 1. reset the output version when reboot
* 2. delete msg version not updated from the result
*/
if (output->info.version < pItem->submitReqVer) {
// submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
output->info.version = pItem->submitReqVer;
} else if (output->info.version == pItem->fetchResultVer) {
ASSERTS(0, "duplicated fetch version:%" PRIi64, pItem->fetchResultVer);
continue;
}
}
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
@ -724,12 +736,6 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// reset the output version when reboot
if (STREAM_GET_ALL == execType && output->info.version == 0) {
// the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
output->info.version = pItem->submitReqVer;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) { if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
// TODO: reconfigure SSubmitReq2 // TODO: reconfigure SSubmitReq2
@ -858,7 +864,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
* @param level * @param level
* @return int32_t * @return int32_t
*/ */
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo,
ERsmaExecType type, int8_t level) { ERsmaExecType type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
@ -878,22 +884,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize); RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize);
#if 0
for (int32_t i = 0; i < msgSize; ++i) {
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
tdRsmaPrintSubmitReq(pSma, pReq);
}
#endif
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (STREAM_INPUT__MERGED_SUBMIT == inputType) { atomic_store_64(&pItem->submitReqVer, version);
SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1));
atomic_store_64(&pItem->submitReqVer, packData->ver);
}
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL); terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL);
@ -1515,6 +1511,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
int8_t resume = 0; int8_t resume = 0;
int32_t nSubmit = 0; int32_t nSubmit = 0;
int32_t nDelete = 0; int32_t nDelete = 0;
int64_t version = 0;
SPackedData packData; SPackedData packData;
@ -1534,6 +1531,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
packData.msgLen = RSMA_EXEC_MSG_LEN(msg); packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
packData.ver = RSMA_EXEC_MSG_VER(msg); packData.ver = RSMA_EXEC_MSG_VER(msg);
packData.msgStr = RSMA_EXEC_MSG_BODY(msg); packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
version = packData.ver;
if (!taosArrayPush(pSubmitArr, &packData)) { if (!taosArrayPush(pSubmitArr, &packData)) {
taosFreeQitem(msg); taosFreeQitem(msg);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1546,7 +1544,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
break; break;
} }
_resume_delete: _resume_delete:
if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), RSMA_EXEC_MSG_VER(msg), version = RSMA_EXEC_MSG_VER(msg);
if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
&packData.pDataBlock, 1))) { &packData.pDataBlock, 1))) {
taosFreeQitem(msg); taosFreeQitem(msg);
goto _err; goto _err;
@ -1569,7 +1568,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
ASSERTS(size > 0, "size is %d", size); ASSERTS(size > 0, "size is %d", size);
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK; int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, inputType, pInfo, type, i) < 0) { if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i) < 0) {
goto _err; goto _err;
} }
} }

View File

@ -188,7 +188,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "");
continue; continue;
} }

View File

@ -43,7 +43,7 @@ static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid); int64_t gid);
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
int32_t totalRows = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
@ -53,30 +53,44 @@ int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock,
tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
char tbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE];
for (int32_t row = 0; row < totalRows; row++) { for (int32_t row = 0; row < totalRows; row++) {
int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name; char* name = NULL;
void* varTbName = NULL; char* pName = NULL;
void* varTbName = NULL;
tbName[0] = '\0';
if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
varTbName = colDataGetVarData(pTbNameCol, row); varTbName = colDataGetVarData(pTbNameCol, row);
} }
if (varTbName != NULL && varTbName != (void*)-1) { if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); name = varDataVal(varTbName);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); } else if (stbFullName) {
pName = buildCtbNameByGroupId(stbFullName, groupId);
name = pName;
} else { } else {
name = buildCtbNameByGroupId(stbFullName, groupId); metaGetTableNameByUid(pTq->pVnode, groupId, tbName);
name = varDataVal(tbName);
} }
tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, if (!name || *name == '\0') {
pIdStr, groupId, name, skey, ekey); tqError("s-task:%s build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
" failed since invalid tbname:%s",
pIdStr, groupId, name, skey, ekey, name ? name : "NULL");
taosArrayDestroy(deleteReq->deleteReqs);
return -1;
}
SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr, groupId,
name, skey, ekey);
SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
taosMemoryFree(name); if (pName) taosMemoryFree(pName);
taosArrayPush(deleteReq->deleteReqs, &req); taosArrayPush(deleteReq->deleteReqs, &req);
} }
@ -345,7 +359,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
int64_t suid) { int64_t suid) {
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -412,10 +412,6 @@ static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
if (pMsg->info.wrapper) { // skip for rsma
return code;
}
int64_t ctimeMs = taosGetTimestampMs(); int64_t ctimeMs = taosGetTimestampMs();
SBatchDeleteReq pReq = {0}; SBatchDeleteReq pReq = {0};
SDecoder *pCoder = &(SDecoder){0}; SDecoder *pCoder = &(SDecoder){0};
@ -1890,6 +1886,13 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK); metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
STsdb *pTsdb = pVnode->pTsdb;
if (deleteReq.level == 1) {
pTsdb = VND_RSMA1(pVnode);
} else {
pTsdb = VND_RSMA2(pVnode);
}
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
@ -1902,22 +1905,22 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
int64_t uid = mr.me.uid; int64_t uid = mr.me.uid;
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); int32_t code = tsdbDeleteTableData(pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
if (code < 0) { if (code < 0) {
terrno = code; terrno = code;
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
} }
if() if (deleteReq.level == 0) {
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs); code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
if (code < 0) { if (code < 0) {
terrno = code; terrno = code;
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
", end ts:%" PRId64, ", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
}
} }
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
} }
metaReaderClear(&mr); metaReaderClear(&mr);

View File

@ -2105,6 +2105,12 @@ FETCH_NEXT_BLOCK:
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock; SSDataBlock* pBlock = pPacked->pDataBlock;
if(!pBlock) {
doClearBufferedBlocks(pInfo);
return NULL;
}
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
} }

View File

@ -1180,6 +1180,7 @@ e
,,y,script,./test.sh -f tsim/sma/sma_leak.sim ,,y,script,./test.sh -f tsim/sma/sma_leak.sim
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim ,,n,script,./test.sh -f tsim/valgrind/checkError1.sim

View File

@ -0,0 +1,446 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database with retentions
sql create database d0 retentions -:7d,10s:21d,15s:365d;
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 float, c2 double) tags (city binary(20),district binary(20)) rollup(sum) max_delay 1s,1s;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10, NULL);
sql insert into ct1 values(now+60m, 1, NULL);
sql insert into ct1 values(now+120m, 100, NULL);
print =============== wait 5 seconds for results
sleep 5000
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows != 3 then
print retention level 2 file rows $rows != 3
return -1
endi
if $data01 != 10.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 1.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
if $data21 != 100.00000 then
return -1
endi
if $data22 != NULL then
return -1
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows != 3 then
print retention level 2 file rows $rows != 3
return -1
endi
if $data01 != 10.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 1.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
if $data21 != 100.00000 then
return -1
endi
if $data22 != NULL then
return -1
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows != 3 then
print retention level 2 file rows $rows != 3
return -1
endi
if $data01 != 10.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 1.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
if $data21 != 100.00000 then
return -1
endi
if $data22 != NULL then
return -1
endi
print =============== delete row 0
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now;
print =============== wait 5 seconds for results
sleep 5000
print =============== select * from retention level 2 from memory after delete row 0
sql select * from ct1;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows != 2 then
print retention level 2 file rows $rows != 2
return -1
endi
if $data01 != 1.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 100.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
print =============== select * from retention level 1 from memory after delete row 0
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows != 2 then
print retention level 2 file rows $rows != 2
return -1
endi
if $data01 != 1.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 100.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
print =============== select * from retention level 0 from memory after delete row 0
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
print $data10 $data11 $data12
if $rows != 2 then
print retention level 2 file rows $rows != 2
return -1
endi
if $data01 != 1.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
if $data11 != 100.00000 then
return -1
endi
if $data12 != NULL then
return -1
endi
print =============== delete row 1
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
print =============== wait 5 seconds for results
sleep 5000
print =============== select * from retention level 2 from memory after delete row 1
sql select * from ct1;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 1 from memory after delete row 1
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 0 from memory after delete row 1
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== wait 5 seconds for results after reboot
sleep 5000
print =============== select * from retention level 2 from memory after reboot
sql select * from ct1;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 1 from memory after reboot
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 0 from memory after reboot
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
#==================== flush database to trigger commit data to file
sql flush database d0;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1 where ts > now-365d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
if $rows != 1 then
print retention level 2 file rows $rows != 1
return -1
endi
if $data01 != 100.00000 then
return -1
endi
if $data02 != NULL then
return -1
endi
print =============== delete row 2
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now + 120m;
sql delete from ct1 where ts < now + 200m;
sql delete from ct1 where ts < now + 300m;
sql delete from ct1 where ts < now + 60m;
sql delete from ct1 where ts < now;
print =============== wait 5 seconds for results
sleep 5000
print =============== select * from retention level 2 from memory after delete row 2
sql select * from ct1;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
print =============== select * from retention level 1 from memory after delete row 2
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
print =============== select * from retention level 0 from memory after delete row 2
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== wait 5 seconds for results after reboot
sleep 5000
print =============== select * from retention level 2 from memory after delete row 2
sql select * from ct1;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
print =============== select * from retention level 1 from memory after delete row 2
sql select * from ct1 where ts > now-8d;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
print =============== select * from retention level 0 from memory after delete row 2
sql select * from ct1 where ts > now-3d;
print $data00 $data01 $data02
if $rows != 0 then
print retention level 2 file rows $rows != 0
return -1
endi
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#===================================================================

View File

@ -130,5 +130,6 @@ run tsim/sync/3Replica1VgElect.sim
run tsim/sync/threeReplica1VgElectWihtInsert.sim run tsim/sync/threeReplica1VgElectWihtInsert.sim
run tsim/sma/tsmaCreateInsertQuery.sim run tsim/sma/tsmaCreateInsertQuery.sim
run tsim/sma/rsmaCreateInsertQuery.sim run tsim/sma/rsmaCreateInsertQuery.sim
run tsim/sma/rsmaCreateInsertQueryDelete.sim
run tsim/valgrind/basic.sim run tsim/valgrind/basic.sim
run tsim/valgrind/checkError.sim run tsim/valgrind/checkError.sim

View File

@ -319,6 +319,7 @@
./test.sh -f tsim/sma/sma_leak.sim ./test.sh -f tsim/sma/sma_leak.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim ./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError1.sim