Merge pull request #23636 from taosdata/enh/TD-25601-3.0
enh: support delete msg for rsma
This commit is contained in:
commit
a51df86e43
|
@ -3774,6 +3774,7 @@ typedef struct {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
SArray* deleteReqs; // SArray<SSingleDeleteReq>
|
SArray* deleteReqs; // SArray<SSingleDeleteReq>
|
||||||
int64_t ctimeMs; // fill by vnode
|
int64_t ctimeMs; // fill by vnode
|
||||||
|
int8_t level; // 0 tsdb(default), 1 rsma1 , 2 rsma2
|
||||||
} SBatchDeleteReq;
|
} SBatchDeleteReq;
|
||||||
|
|
||||||
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
|
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
|
||||||
|
|
|
@ -2121,6 +2121,7 @@ _end:
|
||||||
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
||||||
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
||||||
if (!pBuf) {
|
if (!pBuf) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
|
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
|
||||||
|
@ -2133,6 +2134,7 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
||||||
|
|
||||||
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
|
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
|
||||||
if (stbFullName[0] == 0) {
|
if (stbFullName[0] == 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2142,6 +2144,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cname == NULL) {
|
if (cname == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
taosArrayDestroy(tags);
|
taosArrayDestroy(tags);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -610,7 +610,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
0)
|
0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
tsNumOfVnodeRsmaThreads = tsNumOfCores / 2;
|
tsNumOfVnodeRsmaThreads = tsNumOfCores / 4;
|
||||||
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
|
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -8337,6 +8337,7 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq)
|
||||||
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
|
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pReq->level) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8361,6 +8362,9 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
|
||||||
if (!tDecodeIsEnd(pDecoder)) {
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->level) < 0) return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -296,7 +296,10 @@ static int compareKv(const void* p1, const void* p2) {
|
||||||
void buildChildTableName(RandTableName* rName) {
|
void buildChildTableName(RandTableName* rName) {
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
taosStringBuilderAppendStringLen(&sb, rName->stbFullName, rName->stbFullNameLen);
|
taosStringBuilderAppendStringLen(&sb, rName->stbFullName, rName->stbFullNameLen);
|
||||||
if (sb.buf == NULL) return;
|
if (sb.buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return;
|
||||||
|
}
|
||||||
taosArraySort(rName->tags, compareKv);
|
taosArraySort(rName->tags, compareKv);
|
||||||
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
|
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
|
||||||
taosStringBuilderAppendChar(&sb, ',');
|
taosStringBuilderAppendChar(&sb, ',');
|
||||||
|
|
|
@ -147,7 +147,7 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
|
||||||
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
||||||
|
|
||||||
// tqSink
|
// tqSink
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr);
|
const char* pIdStr);
|
||||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq);
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
int32_t tqStopStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||||
int32_t type, int64_t sver, int64_t ever);
|
int32_t type, int64_t sver, int64_t ever);
|
||||||
|
|
|
@ -286,7 +286,8 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||||
|
|
||||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||||
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
|
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len);
|
||||||
|
int32_t tdProcessRSmaDelete(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len);
|
||||||
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
|
||||||
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
|
||||||
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
|
||||||
|
|
|
@ -217,10 +217,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
if (!SMA_RSMA_ENV(pSma)) goto _exit;
|
||||||
if (!pSmaEnv) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo);
|
code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -17,12 +17,17 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
|
|
||||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
#define RSMA_EXEC_SMOOTH_SIZE (100) // cnt
|
||||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
#define RSMA_EXEC_BATCH_SIZE (1024) // cnt
|
||||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||||
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
||||||
#define RSMA_FETCH_INTERVAL (5000) // ms
|
#define RSMA_FETCH_INTERVAL (5000) // ms
|
||||||
#define RSMA_TASK_FLAG "rsma"
|
#define RSMA_EXEC_TASK_FLAG "rsma"
|
||||||
|
#define RSMA_EXEC_MSG_HLEN (13) // type(int8_t) + len(int32_t) + version(int64_t)
|
||||||
|
#define RSMA_EXEC_MSG_TYPE(msg) (*(int8_t *)(msg))
|
||||||
|
#define RSMA_EXEC_MSG_LEN(msg) (*(int32_t *)POINTER_SHIFT((msg), sizeof(int8_t)))
|
||||||
|
#define RSMA_EXEC_MSG_VER(msg) (*(int64_t *)POINTER_SHIFT((msg), sizeof(int8_t) + sizeof(int32_t)))
|
||||||
|
#define RSMA_EXEC_MSG_BODY(msg) (POINTER_SHIFT((msg), RSMA_EXEC_MSG_HLEN))
|
||||||
|
|
||||||
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
|
#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel)
|
||||||
|
|
||||||
|
@ -38,11 +43,11 @@ static void tdUidStoreDestory(STbUidStore *pStore);
|
||||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||||
int8_t idx);
|
int8_t idx);
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType,
|
||||||
ERsmaExecType type, int8_t level);
|
SRSmaInfo *pInfo, ERsmaExecType type, int8_t level);
|
||||||
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||||
static void tdFreeRSmaSubmitItems(SArray *pItems);
|
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type);
|
||||||
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
|
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
|
||||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
||||||
int32_t execType, int8_t *streamFlushed);
|
int32_t execType, int8_t *streamFlushed);
|
||||||
|
@ -288,8 +293,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
|
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
|
||||||
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
|
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
|
||||||
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
|
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
|
||||||
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
|
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_EXEC_TASK_FLAG) + 1);
|
||||||
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
|
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_EXEC_TASK_FLAG);
|
||||||
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
|
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
|
||||||
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
|
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
|
||||||
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
||||||
|
@ -624,6 +629,45 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tdRSmaProcessDelReq(SSma *pSma, int64_t suid, int8_t level, SBatchDeleteReq *pDelReq) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
|
||||||
|
int32_t len = 0;
|
||||||
|
tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
|
if (!pBuf) {
|
||||||
|
code = terrno;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
|
||||||
|
tEncodeSBatchDeleteReq(&encoder, pDelReq);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
|
||||||
|
|
||||||
|
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL,
|
||||||
|
.pCont = pBuf,
|
||||||
|
.contLen = len + sizeof(SMsgHead)};
|
||||||
|
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
taosArrayDestroy(pDelReq->deleteReqs);
|
||||||
|
if (code) {
|
||||||
|
smaError("vgId:%d, failed at line %d to process delete req for table:%" PRIi64 ", level:%" PRIi8 " since %s",
|
||||||
|
SMA_VID(pSma), lino, suid, level, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
||||||
int32_t execType, int8_t *streamFlushed) {
|
int32_t execType, int8_t *streamFlushed) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -652,9 +696,42 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
if (output->info.type == STREAM_CHECKPOINT) {
|
if (output->info.type == STREAM_CHECKPOINT) {
|
||||||
if (streamFlushed) *streamFlushed = 1;
|
if (streamFlushed) *streamFlushed = 1;
|
||||||
continue;
|
continue;
|
||||||
|
} else if (output->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
SBatchDeleteReq deleteReq = {.suid = suid, .level = pItem->level};
|
||||||
|
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
|
if (!deleteReq.deleteReqs) {
|
||||||
|
code = terrno;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
code = tqBuildDeleteReq(pSma->pVnode->pTq, NULL, output, &deleteReq, "");
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
code = tdRSmaProcessDelReq(pSma, suid, pItem->level, &deleteReq);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
smaDebug("vgId:%d, result block, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
|
||||||
|
", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64,
|
||||||
|
SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
|
||||||
|
pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
|
||||||
|
|
||||||
|
if (STREAM_GET_ALL == execType) {
|
||||||
|
/**
|
||||||
|
* 1. reset the output version when reboot
|
||||||
|
* 2. delete msg version not updated from the result
|
||||||
|
*/
|
||||||
|
if (output->info.version < pItem->submitReqVer) {
|
||||||
|
// submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
|
||||||
|
output->info.version = pItem->submitReqVer;
|
||||||
|
} else if (output->info.version == pItem->fetchResultVer) {
|
||||||
|
smaWarn("vgId:%d, result block, skip dup version, execType:%d, ver:%" PRIi64 ", submitReqVer:%" PRIi64
|
||||||
|
", fetchResultVer:%" PRIi64 ", suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIu64 ", groupid:%" PRIu64
|
||||||
|
", rows:%" PRIi64,
|
||||||
|
SMA_VID(pSma), execType, output->info.version, pItem->submitReqVer, pItem->fetchResultVer, suid,
|
||||||
|
pItem->level, output->info.id.uid, output->info.id.groupId, output->info.rows);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
|
|
||||||
output->info.id.uid, output->info.id.groupId, output->info.rows);
|
|
||||||
|
|
||||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||||
SSubmitReq2 *pReq = NULL;
|
SSubmitReq2 *pReq = NULL;
|
||||||
|
@ -664,12 +741,6 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the output version to handle reboot
|
|
||||||
if (STREAM_GET_ALL == execType && output->info.version == 0) {
|
|
||||||
// the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
|
|
||||||
output->info.version = pItem->submitReqVer;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||||
if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
|
if (terrno == TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE) {
|
||||||
// TODO: reconfigure SSubmitReq2
|
// TODO: reconfigure SSubmitReq2
|
||||||
|
@ -686,8 +757,9 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
atomic_store_64(&pItem->fetchResultVer, output->info.version);
|
atomic_store_64(&pItem->fetchResultVer, output->info.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
|
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level:%" PRIi8
|
||||||
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
|
", execType:%d, ver:%" PRIi64,
|
||||||
|
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, execType, output->info.version);
|
||||||
|
|
||||||
if (pReq) {
|
if (pReq) {
|
||||||
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
||||||
|
@ -722,7 +794,7 @@ _exit:
|
||||||
*/
|
*/
|
||||||
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
|
||||||
SRSmaInfo *pInfo, tb_uid_t suid) {
|
SRSmaInfo *pInfo, tb_uid_t suid) {
|
||||||
int32_t size = sizeof(int32_t) + sizeof(int64_t) + len;
|
int32_t size = RSMA_EXEC_MSG_HLEN + len; // header + payload
|
||||||
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
void *qItem = taosAllocateQitem(size, DEF_QITEM, 0);
|
||||||
|
|
||||||
if (!qItem) {
|
if (!qItem) {
|
||||||
|
@ -731,6 +803,8 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
|
||||||
|
|
||||||
void *pItem = qItem;
|
void *pItem = qItem;
|
||||||
|
|
||||||
|
*(int8_t *)pItem = (int8_t)inputType;
|
||||||
|
pItem = POINTER_SHIFT(pItem, sizeof(int8_t));
|
||||||
*(int32_t *)pItem = len;
|
*(int32_t *)pItem = len;
|
||||||
pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
|
pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
|
||||||
*(int64_t *)pItem = version;
|
*(int64_t *)pItem = version;
|
||||||
|
@ -749,7 +823,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
|
||||||
}
|
}
|
||||||
|
|
||||||
// smoothing consume
|
// smoothing consume
|
||||||
int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
|
int32_t n = nItems / RSMA_EXEC_SMOOTH_SIZE;
|
||||||
if (n > 1) {
|
if (n > 1) {
|
||||||
if (n > 10) {
|
if (n > 10) {
|
||||||
n = 10;
|
n = 10;
|
||||||
|
@ -796,7 +870,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
* @param level
|
* @param level
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int64_t version, int32_t inputType, SRSmaInfo *pInfo,
|
||||||
ERsmaExecType type, int8_t level) {
|
ERsmaExecType type, int8_t level) {
|
||||||
int32_t idx = level - 1;
|
int32_t idx = level - 1;
|
||||||
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
|
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
|
||||||
|
@ -813,25 +887,15 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level,
|
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p, suid:%" PRIu64 ", nMsg:%d, submitReqVer:%" PRIi64 ", inputType:%d", SMA_VID(pSma), level,
|
||||||
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize);
|
RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize, version, inputType);
|
||||||
|
|
||||||
#if 0
|
|
||||||
for (int32_t i = 0; i < msgSize; ++i) {
|
|
||||||
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
|
|
||||||
smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
|
|
||||||
tdRsmaPrintSubmitReq(pSma, pReq);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
|
if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) {
|
||||||
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (STREAM_INPUT__MERGED_SUBMIT == inputType) {
|
atomic_store_64(&pItem->submitReqVer, version);
|
||||||
SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1));
|
|
||||||
atomic_store_64(&pItem->submitReqVer, packData->ver);
|
|
||||||
}
|
|
||||||
|
|
||||||
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL);
|
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL);
|
||||||
|
|
||||||
|
@ -910,7 +974,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (inputType == STREAM_INPUT__DATA_SUBMIT || inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
|
||||||
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
tdReleaseRSmaInfo(pSma, pRSmaInfo);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -937,12 +1001,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
|
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
|
||||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
|
||||||
if (!pEnv) {
|
|
||||||
// only applicable when rsma env exists
|
|
||||||
return TDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
|
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
|
||||||
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr());
|
smaError("vgId:%d, failed to process rsma submit since invalid exec code: %s", SMA_VID(pSma), terrstr());
|
||||||
|
@ -951,27 +1011,25 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg,
|
||||||
|
|
||||||
STbUidStore uidStore = {0};
|
STbUidStore uidStore = {0};
|
||||||
|
|
||||||
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
|
||||||
if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
|
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
|
||||||
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uidStore.suid != 0) {
|
||||||
|
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, uidStore.suid) < 0) {
|
||||||
|
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (uidStore.suid != 0) {
|
void *pIter = NULL;
|
||||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) {
|
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
|
||||||
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
||||||
|
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__DATA_SUBMIT, *pTbSuid) < 0) {
|
||||||
|
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
|
||||||
|
taosHashCancelIterate(uidStore.uidHash, pIter);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pIter = NULL;
|
|
||||||
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
|
|
||||||
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
|
|
||||||
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) {
|
|
||||||
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
|
|
||||||
taosHashCancelIterate(uidStore.uidHash, pIter);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tdUidStoreDestory(&uidStore);
|
tdUidStoreDestory(&uidStore);
|
||||||
|
@ -981,6 +1039,24 @@ _err:
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len) {
|
||||||
|
if (!SMA_RSMA_ENV(pSma)) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if ((terrno = atomic_load_32(&SMA_RSMA_STAT(pSma)->execStat))) {
|
||||||
|
smaError("vgId:%d, failed to process rsma delete since invalid exec code: %s", SMA_VID(pSma), terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDeleteRes *pDelRes = pReq;
|
||||||
|
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, STREAM_INPUT__REF_DATA_BLOCK, pDelRes->suid) < 0) {
|
||||||
|
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief retrieve rsma meta and init
|
* @brief retrieve rsma meta and init
|
||||||
*
|
*
|
||||||
|
@ -1359,10 +1435,20 @@ _end:
|
||||||
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdFreeRSmaSubmitItems(SArray *pItems) {
|
static void tdFreeRSmaSubmitItems(SArray *pItems, int32_t type) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
|
int32_t arrSize = taosArrayGetSize(pItems);
|
||||||
SPackedData *packData = taosArrayGet(pItems, i);
|
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t)));
|
for (int32_t i = 0; i < arrSize; ++i) {
|
||||||
|
SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
|
||||||
|
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -RSMA_EXEC_MSG_HLEN));
|
||||||
|
}
|
||||||
|
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
for (int32_t i = 0; i < arrSize; ++i) {
|
||||||
|
SPackedData *packData = TARRAY_GET_ELEM(pItems, i);
|
||||||
|
blockDataDestroy(packData->pDataBlock);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERTS(0, "unknown type:%d", type);
|
||||||
}
|
}
|
||||||
taosArrayClear(pItems);
|
taosArrayClear(pItems);
|
||||||
}
|
}
|
||||||
|
@ -1427,40 +1513,98 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
|
||||||
|
void *msg = NULL;
|
||||||
|
int8_t resume = 0;
|
||||||
|
int32_t nSubmit = 0;
|
||||||
|
int32_t nDelete = 0;
|
||||||
|
int64_t version = 0;
|
||||||
|
|
||||||
|
SPackedData packData;
|
||||||
|
|
||||||
taosArrayClear(pSubmitArr);
|
taosArrayClear(pSubmitArr);
|
||||||
|
|
||||||
|
// the submitReq/deleteReq msg may exsit alternately in the msg queue, consume them sequentially in batch mode
|
||||||
while (1) {
|
while (1) {
|
||||||
void *msg = NULL;
|
|
||||||
taosGetQitem(qall, (void **)&msg);
|
taosGetQitem(qall, (void **)&msg);
|
||||||
if (msg) {
|
if (msg) {
|
||||||
SPackedData packData = {.msgLen = *(int32_t *)msg,
|
int8_t inputType = RSMA_EXEC_MSG_TYPE(msg);
|
||||||
.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)),
|
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))};
|
if (nDelete > 0) {
|
||||||
|
resume = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_resume_submit:
|
||||||
|
packData.msgLen = RSMA_EXEC_MSG_LEN(msg);
|
||||||
|
packData.ver = RSMA_EXEC_MSG_VER(msg);
|
||||||
|
packData.msgStr = RSMA_EXEC_MSG_BODY(msg);
|
||||||
|
version = packData.ver;
|
||||||
|
if (!taosArrayPush(pSubmitArr, &packData)) {
|
||||||
|
taosFreeQitem(msg);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
++nSubmit;
|
||||||
|
} else if (inputType == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
if (nSubmit > 0) {
|
||||||
|
resume = 2;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_resume_delete:
|
||||||
|
version = RSMA_EXEC_MSG_VER(msg);
|
||||||
|
if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
|
||||||
|
&packData.pDataBlock, 1))) {
|
||||||
|
taosFreeQitem(msg);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
if (!taosArrayPush(pSubmitArr, &packData)) {
|
if (packData.pDataBlock && !taosArrayPush(pSubmitArr, &packData)) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
taosFreeQitem(msg);
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
}
|
||||||
|
taosFreeQitem(msg);
|
||||||
|
if (packData.pDataBlock) {
|
||||||
|
// packData.pDataBlock is NULL if delete affects 0 row
|
||||||
|
++nDelete;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERTS(0, "unknown msg type:%d", inputType);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nSubmit > 0 || nDelete > 0) {
|
||||||
|
int32_t size = TARRAY_SIZE(pSubmitArr);
|
||||||
|
ASSERTS(size > 0, "size is %d", size);
|
||||||
|
int32_t inputType = nSubmit > 0 ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
|
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
||||||
|
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, version, inputType, pInfo, type, i) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tdFreeRSmaSubmitItems(pSubmitArr, inputType);
|
||||||
|
nSubmit = 0;
|
||||||
|
nDelete = 0;
|
||||||
} else {
|
} else {
|
||||||
break;
|
goto _rtn;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resume == 1) {
|
||||||
|
resume = 0;
|
||||||
|
goto _resume_submit;
|
||||||
|
} else if (resume == 2) {
|
||||||
|
resume = 0;
|
||||||
|
goto _resume_delete;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pSubmitArr);
|
_rtn:
|
||||||
if (size > 0) {
|
|
||||||
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
|
|
||||||
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
|
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
|
||||||
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
|
smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid,
|
||||||
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
|
type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr());
|
||||||
tdFreeRSmaSubmitItems(pSubmitArr);
|
tdFreeRSmaSubmitItems(pSubmitArr, nSubmit ? STREAM_INPUT__MERGED_SUBMIT : STREAM_INPUT__REF_DATA_BLOCK);
|
||||||
while (1) {
|
while (1) {
|
||||||
void *msg = NULL;
|
void *msg = NULL;
|
||||||
taosGetQitem(qall, (void **)&msg);
|
taosGetQitem(qall, (void **)&msg);
|
||||||
|
@ -1497,7 +1641,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(pSubmitArr =
|
if (!(pSubmitArr =
|
||||||
taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
|
taosArrayInit(TMIN(RSMA_EXEC_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,7 +188,8 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
pDeleteReq->suid = suid;
|
pDeleteReq->suid = suid;
|
||||||
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, "");
|
code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, pDeleteReq, "");
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -343,7 +343,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (*pItem == NULL) {
|
if (*pItem == NULL) {
|
||||||
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
|
||||||
|
|
|
@ -43,7 +43,7 @@ static SArray* createDefaultTagColName();
|
||||||
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
||||||
int64_t gid);
|
int64_t gid);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr) {
|
const char* pIdStr) {
|
||||||
int32_t totalRows = pDataBlock->info.rows;
|
int32_t totalRows = pDataBlock->info.rows;
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
@ -58,8 +58,9 @@ int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||||
int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
|
int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
|
||||||
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
||||||
|
|
||||||
char* name;
|
char* name = NULL;
|
||||||
void* varTbName = NULL;
|
char* originName = NULL;
|
||||||
|
void* varTbName = NULL;
|
||||||
if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
|
if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
|
||||||
varTbName = colDataGetVarData(pTbNameCol, row);
|
varTbName = colDataGetVarData(pTbNameCol, row);
|
||||||
}
|
}
|
||||||
|
@ -67,18 +68,29 @@ int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||||
if (varTbName != NULL && varTbName != (void*)-1) {
|
if (varTbName != NULL && varTbName != (void*)-1) {
|
||||||
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||||
} else {
|
} else if (stbFullName) {
|
||||||
name = buildCtbNameByGroupId(stbFullName, groupId);
|
name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||||
|
} else {
|
||||||
|
originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||||
|
if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
|
||||||
|
name = varDataVal(originName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64,
|
if (!name || *name == '\0') {
|
||||||
pIdStr, groupId, name, skey, ekey);
|
tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
|
||||||
|
" since invalid tbname:%s",
|
||||||
|
pIdStr, groupId, skey, ekey, name ? name : "NULL");
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
|
||||||
|
groupId, name, skey, ekey);
|
||||||
|
|
||||||
SSingleDeleteReq req = { .startTs = skey, .endTs = ekey};
|
SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
|
||||||
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
|
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
|
||||||
taosMemoryFree(name);
|
taosArrayPush(deleteReq->deleteReqs, &req);
|
||||||
|
}
|
||||||
taosArrayPush(deleteReq->deleteReqs, &req);
|
if (originName) name = originName;
|
||||||
|
taosMemoryFreeClear(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -345,7 +357,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
||||||
int64_t suid) {
|
int64_t suid) {
|
||||||
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
|
||||||
|
|
||||||
int32_t code = tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
|
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -399,7 +399,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
SDecoder* pCoder = &(SDecoder){0};
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
SDeleteRes* pRes = &(SDeleteRes){0};
|
||||||
|
|
||||||
|
@ -442,14 +442,21 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
if (type == 0) {
|
||||||
if (*pRefBlock == NULL) {
|
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
blockDataCleanup(pDelBlock);
|
if (*pRefBlock == NULL) {
|
||||||
taosMemoryFree(pDelBlock);
|
blockDataCleanup(pDelBlock);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
taosMemoryFree(pDelBlock);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
|
((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
|
||||||
|
} else if (type == 1) {
|
||||||
|
*pRefBlock = pDelBlock;
|
||||||
|
} else {
|
||||||
|
ASSERTS(0, "unknown type:%d", type);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
|
|
||||||
(*pRefBlock)->pBlock = pDelBlock;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -380,7 +380,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SEncoder *pCoder = &(SEncoder){0};
|
SEncoder *pCoder = &(SEncoder){0};
|
||||||
SDeleteRes res = {0};
|
SDeleteRes res = {0};
|
||||||
|
|
||||||
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .skipRollup = 1};
|
||||||
initStorageAPI(&handle.api);
|
initStorageAPI(&handle.api);
|
||||||
|
|
||||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||||
|
@ -1674,7 +1674,7 @@ _exit:
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
|
||||||
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
|
code = tdProcessRSmaSubmit(pVnode->pSma, ver, pSubmitReq, pReq, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear
|
// clear
|
||||||
|
@ -1891,6 +1891,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
|
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
|
||||||
|
STsdb *pTsdb = pVnode->pTsdb;
|
||||||
|
|
||||||
|
if (deleteReq.level) {
|
||||||
|
pTsdb = deleteReq.level == 1 ? VND_RSMA1(pVnode) : VND_RSMA2(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
|
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
@ -1903,21 +1908,22 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
|
||||||
|
|
||||||
int64_t uid = mr.me.uid;
|
int64_t uid = mr.me.uid;
|
||||||
|
|
||||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
int32_t code = tsdbDeleteTableData(pTsdb, ver, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
||||||
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
|
if (deleteReq.level == 0) {
|
||||||
if (code < 0) {
|
code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, deleteReq.ctimeMs);
|
||||||
terrno = code;
|
if (code < 0) {
|
||||||
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
|
terrno = code;
|
||||||
", end ts:%" PRId64,
|
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
|
||||||
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
", end ts:%" PRId64,
|
||||||
|
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&mr.coder);
|
tDecoderClear(&mr.coder);
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
@ -1952,6 +1958,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len);
|
||||||
|
|
||||||
tDecoderClear(pCoder);
|
tDecoderClear(pCoder);
|
||||||
taosArrayDestroy(pRes->uidList);
|
taosArrayDestroy(pRes->uidList);
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,12 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
||||||
SPackedData tmp = {.pDataBlock = input};
|
SPackedData tmp = {.pDataBlock = input};
|
||||||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
||||||
|
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||||
|
taosArrayPush(pInfo->pBlockLists, pReq);
|
||||||
|
}
|
||||||
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1183,6 +1183,7 @@ e
|
||||||
,,y,script,./test.sh -f tsim/sma/sma_leak.sim
|
,,y,script,./test.sh -f tsim/sma/sma_leak.sim
|
||||||
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||||
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||||
|
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
|
||||||
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||||
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
||||||
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
|
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
|
||||||
|
|
|
@ -0,0 +1,540 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print =============== create database with retentions
|
||||||
|
sql create database d0 retentions -:7d,10s:21d,15s:365d vgroups 1;
|
||||||
|
sql use d0
|
||||||
|
|
||||||
|
print =============== create super table and register rsma
|
||||||
|
sql create table if not exists stb (ts timestamp, c1 float, c2 double) tags (city binary(20),district binary(20)) rollup(sum) max_delay 1s,1s;
|
||||||
|
|
||||||
|
sql show stables
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== create child table
|
||||||
|
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
|
||||||
|
|
||||||
|
sql show tables
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== insert data and trigger rollup
|
||||||
|
sql insert into ct1 values(now, 10, NULL);
|
||||||
|
sql insert into ct1 values(now+60m, 1, NULL);
|
||||||
|
sql insert into ct1 values(now+120m, 100, NULL);
|
||||||
|
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
if $rows != 3 then
|
||||||
|
print retention level 2 file rows $rows != 3
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 10.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data21 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data22 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print retention level 2 file rows $rows != 3
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 10.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data21 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data22 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print retention level 2 file rows $rows != 3
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 10.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data21 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data22 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== delete row 0
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete row 0
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print retention level 2 file rows $rows != 2
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete row 0
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print retention level 2 file rows $rows != 2
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory after delete row 0
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print retention level 2 file rows $rows != 2
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== delete row 1
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete row 1
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete row 1
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory after delete row 1
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
#===================================================================
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
print =============== wait 7 seconds for results after reboot
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after reboot
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after reboot
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory after reboot
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
#==================== flush database to trigger commit data to file
|
||||||
|
sql flush database d0;
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from file
|
||||||
|
sql select * from ct1 where ts > now-365d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from file
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from file
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print retention level 2 file rows $rows != 1
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== delete row 2
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now + 120m;
|
||||||
|
sql delete from ct1 where ts < now + 200m;
|
||||||
|
sql delete from ct1 where ts < now + 300m;
|
||||||
|
sql delete from ct1 where ts < now + 60m;
|
||||||
|
sql delete from ct1 where ts < now;
|
||||||
|
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete row 2
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete row 2
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory after delete row 2
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
#===================================================================
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
print =============== wait 7 seconds for results after reboot
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete row 2
|
||||||
|
sql select * from ct1;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete row 2
|
||||||
|
sql select * from ct1 where ts > now-8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 0 from memory after delete row 2
|
||||||
|
sql select * from ct1 where ts > now-3d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== check delete multiple tables
|
||||||
|
sql create table ct2 using stb tags("BeiJing", "HaiDian");
|
||||||
|
sql create table ct3 using stb tags("ShangHai", "PuDong");
|
||||||
|
|
||||||
|
sql insert into ct2 values(now, 10, NULL);
|
||||||
|
sql insert into ct2 values(now+60m, 1, NULL);
|
||||||
|
sql insert into ct2 values(now+120m, 100, NULL);
|
||||||
|
sql insert into ct3 values(now, 10, NULL);
|
||||||
|
sql insert into ct3 values(now+60m, 1, NULL);
|
||||||
|
sql insert into ct3 values(now+120m, 100, NULL);
|
||||||
|
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory
|
||||||
|
sql select * from ct2;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
print $data10 $data11 $data12
|
||||||
|
print $data20 $data21 $data22
|
||||||
|
if $rows != 3 then
|
||||||
|
print retention level 2 file rows $rows != 3
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 10.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data02 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 1.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data12 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data21 != 100.00000 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data22 != NULL then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql delete from ct1 where ts < now + 120m;
|
||||||
|
sql delete from ct3 where ts < now;
|
||||||
|
sql delete from ct2 where ts < now + 60m;
|
||||||
|
sql delete from ct2 where ts < now + 120m;
|
||||||
|
sql delete from ct3 where ts < now + 60m;
|
||||||
|
sql delete from ct3 where ts < now + 120m;
|
||||||
|
sql delete from ct3 where ts < now;
|
||||||
|
|
||||||
|
print =============== wait 7 seconds for results
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete ct2
|
||||||
|
sql select * from ct2;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete ct3
|
||||||
|
sql select * from ct3 where ts > now - 8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
#===================================================================
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
#===================================================================
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
print =============== wait 7 seconds for results after reboot
|
||||||
|
sleep 7000
|
||||||
|
|
||||||
|
print =============== select * from retention level 1 from memory after delete ct2
|
||||||
|
sql select * from ct2 where ts > now - 8d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== select * from retention level 2 from memory after delete ct3
|
||||||
|
sql select * from ct3 where ts > now - 365d;
|
||||||
|
print $data00 $data01 $data02
|
||||||
|
if $rows != 0 then
|
||||||
|
print retention level 2 file rows $rows != 0
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
#===================================================================
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
#===================================================================
|
|
@ -130,5 +130,6 @@ run tsim/sync/3Replica1VgElect.sim
|
||||||
run tsim/sync/threeReplica1VgElectWihtInsert.sim
|
run tsim/sync/threeReplica1VgElectWihtInsert.sim
|
||||||
run tsim/sma/tsmaCreateInsertQuery.sim
|
run tsim/sma/tsmaCreateInsertQuery.sim
|
||||||
run tsim/sma/rsmaCreateInsertQuery.sim
|
run tsim/sma/rsmaCreateInsertQuery.sim
|
||||||
|
run tsim/sma/rsmaCreateInsertQueryDelete.sim
|
||||||
run tsim/valgrind/basic.sim
|
run tsim/valgrind/basic.sim
|
||||||
run tsim/valgrind/checkError.sim
|
run tsim/valgrind/checkError.sim
|
|
@ -319,6 +319,7 @@
|
||||||
./test.sh -f tsim/sma/sma_leak.sim
|
./test.sh -f tsim/sma/sma_leak.sim
|
||||||
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||||
|
./test.sh -f tsim/sma/rsmaCreateInsertQueryDelete.sim
|
||||||
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||||
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
||||||
./test.sh -f tsim/valgrind/checkError1.sim
|
./test.sh -f tsim/valgrind/checkError1.sim
|
||||||
|
|
Loading…
Reference in New Issue