From 72ecb0431c6fc8d44531240354d5f6942691d562 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 8 Nov 2023 20:45:44 +0800 Subject: [PATCH] enh: rsma checkpoint --- source/dnode/vnode/src/inc/sma.h | 20 +- source/dnode/vnode/src/sma/smaCommit.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 133 ++++++---- source/dnode/vnode/src/sma/smaUtil.c | 5 - source/dnode/vnode/src/tsdb/tsdbMemTable.c | 12 - source/libs/stream/src/streamTask.c | 9 +- tests/parallel_test/cases.task | 1 + .../tsim/sync/vnodesnapshot-rsma-test.sim | 2 +- tests/script/win-test-file | 1 + tests/system-test/1-insert/rsma.py | 248 ++++++++++++++++++ 10 files changed, 348 insertions(+), 85 deletions(-) create mode 100644 tests/system-test/1-insert/rsma.py diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 5e808c217c..de6bb23f04 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -137,15 +137,17 @@ struct SSmaStat { #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; - int8_t fetchLevel; - int8_t triggerStat; - int32_t nScanned; - int32_t streamFlushed : 1; - int32_t maxDelay : 31; // ms - tmr_h tmrId; - void *pStreamState; - void *pStreamTask; // SStreamTask + int8_t level; + int8_t fetchLevel; + int8_t triggerStat; + int32_t nScanned; + int32_t streamFlushed : 1; + int32_t maxDelay : 31; // ms + int64_t submitReqVer; + int64_t fetchResultVer; + tmr_h tmrId; + void *pStreamState; + void *pStreamTask; // SStreamTask }; struct SRSmaInfo { diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index fad2e4d7e9..92181f054d 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -169,7 +169,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit, + smaInfo("vgId:%d, rsma commit, type:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit, (void *)taosGetSelfPthreadId()); nLoops = 0; while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 68d829bea4..7296f3d468 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -44,8 +44,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); -static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, SArray **ppResList, int8_t *streamFlushed); +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, + int32_t execType, SArray **ppResList, int8_t *streamFlushed); static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); @@ -235,19 +235,16 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui return TSDB_CODE_SUCCESS; } -#if 0 -static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { - int64_t checkpointId = -1; - STaskId id = {.streamId = streamId, .taskId = taskId}; +static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) { + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; taosRLockLatch(&pMeta->lock); SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask && *ppTask) { - checkpointId = (*ppTask)->chkInfo.checkpointId; + pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer; + pItem->fetchResultVer = (*ppTask)->info.triggerParam; } taosRUnLockLatch(&pMeta->lock); - return checkpointId; } -#endif static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { streamMetaUnregisterTask(pMeta, streamId, taskId); @@ -295,12 +292,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pStreamTask->pMeta = pVnode->pTq->pStreamMeta; pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1); sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG); -#if 0 - pStreamTask->chkInfo.checkpointId = - tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId); -#else pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta); -#endif + tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; @@ -318,7 +311,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat return TSDB_CODE_FAILED; } - pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot + if (pItem->fetchResultVer < pItem->submitReqVer) { + // fetch the data when reboot + pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; + } + if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -337,10 +334,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64 - " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, + smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 + ", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64 + ", finally maxdelay:%" PRIi32, TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId, - param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); + pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); } return TSDB_CODE_SUCCESS; } @@ -624,12 +622,14 @@ _end: return code; } -static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid, SArray **ppResList, int8_t *streamFlushed) { +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, + int32_t execType, SArray **ppResList, int8_t *streamFlushed) { int32_t code = 0; int32_t lino = 0; SSDataBlock *output = NULL; SArray *pResList = NULL; + STSchema *pTSchema = pInfo->pTSchema; + int64_t suid = pInfo->suid; if (!(*ppResList)) { pResList = taosArrayInit(1, POINTER_BYTES); @@ -657,11 +657,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (taosArrayGetSize(pResList) == 0) { break; } -#if 0 - char flag[10] = {0}; - snprintf(flag, 10, "level %" PRIi8, pItem->level); - blockDebugShowDataBlocks(pResList, flag); -#endif + for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { output = taosArrayGetP(pResList, i); if (output->info.type == STREAM_CHECKPOINT) { @@ -674,12 +670,17 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; - // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; TSDB_CHECK_CODE(code, lino, _exit); } + // reset the output version to handle reboot + if (STREAM_GET_ALL == execType && output->info.version == 0) { + // the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously + output->info.version = pItem->submitReqVer; + } + if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); @@ -691,6 +692,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma TSDB_CHECK_CODE(code, lino, _exit); } + if (STREAM_GET_ALL == execType) { + atomic_store_64(&pItem->fetchResultVer, output->info.version); + } + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version); @@ -803,9 +808,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { */ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, ERsmaExecType type, int8_t level) { - int32_t idx = level - 1; - void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); - SArray *pResList = NULL; + int32_t idx = level - 1; + void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); + SArray *pResList = NULL; if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, @@ -833,8 +839,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_FAILED; } - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL); + if (STREAM_INPUT__MERGED_SUBMIT == inputType) { + SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1)); + atomic_store_64(&pItem->submitReqVer, packData->ver); + } + + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL); taosArrayDestroy(pResList); return TSDB_CODE_SUCCESS; @@ -1161,8 +1171,8 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { int8_t streamFlushed = 0; - code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, - pRSmaInfo->suid, &pResList, &streamFlushed); + code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo, + STREAM_CHECKPOINT, &pResList, &streamFlushed); if (code) { taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); @@ -1190,7 +1200,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { _checkpoint: // stream state: build checkpoint in backend do { - void *infoHash = NULL; + SStreamMeta *pMeta = NULL; + int64_t checkpointId = taosGetTimestampNs(); + bool checkpointBuilt = false; + void *infoHash = NULL; while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -1202,32 +1215,48 @@ _checkpoint: if (pItem && pItem->pStreamTask) { SStreamTask *pTask = pItem->pStreamTask; atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1); - pTask->checkpointingId = taosGetTimestampNs(); + pTask->checkpointingId = checkpointId; pTask->chkInfo.checkpointId = pTask->checkpointingId; - code = streamTaskBuildCheckpoint(pTask); - if (code) { - taosHashCancelIterate(pInfoHash, infoHash); - TSDB_CHECK_CODE(code, lino, _exit); + pTask->chkInfo.checkpointVer = pItem->submitReqVer; + pTask->info.triggerParam = pItem->fetchResultVer; + + if (!checkpointBuilt) { + // the stream states share one checkpoint + code = streamTaskBuildCheckpoint(pTask); + if (code) { + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); + } + pMeta = pTask->pMeta; + checkpointBuilt = true; } - taosWLockLatch(&pTask->pMeta->lock); - if (0 != streamMetaSaveTask(pTask->pMeta, pTask) || 0 != streamMetaCommit(pTask->pMeta)) { - taosWUnLockLatch(&pTask->pMeta->lock); + taosWLockLatch(&pMeta->lock); + if (0 != streamMetaSaveTask(pMeta, pTask)) { + taosWUnLockLatch(&pMeta->lock); code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY; taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); } - taosWUnLockLatch(&pTask->pMeta->lock); - - smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint/task:%" PRIi64 "/%p, table:%" PRIi64 ", level:%d", - TD_VID(pVnode), pTask->checkpointingId, pTask, pRSmaInfo->suid, i + 1); - - // the stream states share one checkpoint - taosHashCancelIterate(pInfoHash, infoHash); - goto _exit; + taosWUnLockLatch(&pMeta->lock); + smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 + ", table:%" PRIi64 ", level:%d", + TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1); } } } + if (pMeta) { + taosWLockLatch(&pMeta->lock); + if (0 != streamMetaCommit(pMeta)) { + taosWUnLockLatch(&pMeta->lock); + code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + taosWUnLockLatch(&pMeta->lock); + } + if (checkpointBuilt) { + smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId); + } } while (0); _exit: taosArrayDestroy(pResList); @@ -1366,7 +1395,6 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { continue; } -#if 0 if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed", SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); @@ -1384,12 +1412,11 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } pItem->nScanned = 0; -#endif if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, &pResList, NULL) < 0) { goto _err; } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 8c04306d0f..e45cbac329 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -30,13 +30,8 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN offset = strlen(outputName); // rsma -#if 1 snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR, (endWithSep ? TD_DIRSEP : "")); -#else - snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s%s%s%s", TD_DIRSEP, "tq", TD_DIRSEP, "stream", - TD_DIRSEP, "state", (endWithSep ? TD_DIRSEP : "")); -#endif } // smaXXXUtil ================ diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 69b19f4bc5..cc77474e79 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -661,9 +661,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.ts); lRow = tRow; - tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, - TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version, - pSubmitTbData->uid); // remain row ++tRow.iRow; @@ -683,9 +680,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, lRow = tRow; ++tRow.iRow; - tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, - TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version, - pSubmitTbData->uid); } } @@ -727,9 +721,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if (code) goto _exit; lRow = tRow; - tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, - TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version, - pSubmitTbData->uid); pTbData->minKey = TMIN(pTbData->minKey, key.ts); @@ -753,9 +744,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, lRow = tRow; iRow++; - tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, - TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version, - pSubmitTbData->uid); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a7fb590d1b..2f8de98039 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -120,8 +120,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { taskId = pTask->streamTaskId.taskId; if (tEncodeI32(pEncoder, taskId)) return -1; - if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; - if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.range.minVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.range.maxVer)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; @@ -193,8 +193,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &taskId)) return -1; pTask->streamTaskId.taskId = taskId; - if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; - if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.range.minVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 21dcd16441..37d1c2aa59 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1175,6 +1175,7 @@ e ,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim +,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ,,n,script,./test.sh -f tsim/valgrind/checkError1.sim ,,n,script,./test.sh -f tsim/valgrind/checkError2.sim ,,n,script,./test.sh -f tsim/valgrind/checkError3.sim diff --git a/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim index 3b3cd01521..b1e5ed200f 100644 --- a/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-rsma-test.sim @@ -114,7 +114,7 @@ endi vg_ready: print ====> create stable/child table -sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s +sql create table stb (ts timestamp, c1 float, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s sql show stables if $rows != 1 then diff --git a/tests/script/win-test-file b/tests/script/win-test-file index 4ff4b52f7e..fe5f5c39e3 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -320,6 +320,7 @@ ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim +./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim ./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError2.sim ./test.sh -f tsim/valgrind/checkError3.sim diff --git a/tests/system-test/1-insert/rsma.py b/tests/system-test/1-insert/rsma.py new file mode 100644 index 0000000000..ab84185e87 --- /dev/null +++ b/tests/system-test/1-insert/rsma.py @@ -0,0 +1,248 @@ +from datetime import datetime +import time + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +PRIMARY_COL = "ts" + +INT_COL = "c_int" +BINT_COL = "c_bint" +SINT_COL = "c_sint" +TINT_COL = "c_tint" +FLOAT_COL = "c_float" +DOUBLE_COL = "c_double" +BOOL_COL = "c_bool" +TINT_UN_COL = "c_utint" +SINT_UN_COL = "c_usint" +BINT_UN_COL = "c_ubint" +INT_UN_COL = "c_uint" +BINARY_COL = "c_binary" +NCHAR_COL = "c_nchar" +TS_COL = "c_ts" + +INT_TAG = "t_int" + +TAG_COL = [INT_TAG] + +## insert data args: +TIME_STEP = 10000 +NOW = int(datetime.timestamp(datetime.now()) * 1000) + +# init db/table +DBNAME = "db" +DB1 = "db1" +DB2 = "db2" +DB3 = "db3" +DB4 = "db4" +STBNAME = "stb1" +CTBNAME = "ct1" +NTBNAME = "nt1" + +class TDTestCase: + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + @property + def create_databases_sql_err(self): + return [ + # check grammar + "create database db1 retentions", + "create database db1 retentions 1s:1d", + "create database db1 retentions 1s:1d,2s:2d", + "create database db1 retentions 1s:1d,2s:2d,3s:3d", + "create database db1 retentions 1s:1d,2s:2d,3s:3d,4s:4d", + "create database db1 retentions -:1d,2s:2d,3s:3d,4s:4d", + "create database db1 retentions --:1d", + "create database db1 retentions -:-:1d", + "create database db1 retentions 1d:-", + "create database db1 retentions -:-", + "create database db1 retentions +:1d", + "create database db1 retentions :1d", + "create database db1 retentions -:1d,-:2d", + "create database db1 retentions -:1d,-:2d,-:3d", + "create database db1 retentions -:1d,1s:-", + "create database db1 retentions -:1d,15s:2d,-:3d", + + # check unit + "create database db1 retentions -:1d,1b:1d", + "create database db1 retentions -:1d,1u:1d", + "create database db1 retentions -:1d,1a:1d", + "create database db1 retentions -:1d,1n:1d", + "create database db1 retentions -:1d,1y:1d", + "create database db1 retentions -:1d,1s:86400s", + "create database db1 retentions -:1d,1s:86400000a", + "create database db1 retentions -:1d,1s:86400000000u", + "create database db1 retentions -:1d,1s:86400000000000b", + "create database db1 retentions -:1s,1s:2s", + "create database db1 retentions -:1d,1s:1w", + "create database db1 retentions -:1d,1s:1n", + "create database db1 retentions -:1d,1s:1y", + + # check value range + "create database db3 retentions -:-1d", + "create database db3 retentions -:0d", + "create database db3 retentions -:1439m", + "create database db3 retentions -:365001d", + "create database db3 retentions -:8760001h", + "create database db3 retentions -:525600001m", + "create database db3 retentions -:106581d precision 'ns'", + "create database db3 retentions -:2557921h precision 'ns'", + "create database db3 retentions -:153475201m precision 'ns'", + # check relationships + "create database db5 retentions -:1440m,1441m:1440m,2d:3d", + "create database db5 retentions -:1d,2m:1d,1s:2d", + "create database db5 retentions -:1440m,1s:2880m,2s:2879m", + "create database db5 retentions -:1d,2s:2d,2s:3d", + "create database db5 retentions -:1d,3s:2d,2s:3d", + "create database db1 retentions -:1d,2s:3d,3s:2d", + "create database db1 retentions -:1d,2s:3d,1s:2d", + + ] + + @property + def create_databases_sql_current(self): + return [ + f"create database {DB1} retentions -:1d", + f"create database {DB2} retentions -:1d,2m:2d,3h:3d", + ] + + @property + def alter_database_sql(self): + return [ + "alter database db1 retentions -:99d", + "alter database db2 retentions -:97d,98h:98d,99h:99d,", + ] + + @property + def create_stable_sql_err(self, dbname=DB2): + return [ + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m", + f"create stable {dbname}.stb12 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min", + f"create stable {dbname}.stb13 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s", + f"create stable {dbname}.stb14 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m", + f"create stable {dbname}.stb15 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ", + f"create stable {dbname}.stb16 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ", + f"create stable {dbname}.stb21 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s", + f"create stable {dbname}.stb22 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m", + f"create table {dbname}.ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s", + f"create table {dbname}.ntb_2 ({PRIMARY_COL} timestamp, {INT_COL} int) " , + f"create stable {dbname}.stb23 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " , + f"create stable {dbname}.stb24 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " , + f"create stable {dbname}.stb25 ({PRIMARY_COL} timestamp, {INT_COL} int) " , + f"create stable {dbname}.stb26 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " , + # only float/double allowd for avg/sum + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(avg)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(avg)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(avg)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(sum)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(sum)", + f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(sum)", + + + # watermark, max_delay: [0, 900000], [ms, s, m, ?] + f"create stable stb17 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u", + f"create stable stb18 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b", + f"create stable stb19 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms", + f"create stable stb20 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m", + f"create stable stb27 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s", + f"create stable stb28 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h", + f"create stable stb29 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h", + f"create stable stb30 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d", + + ] + + @property + def create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, rsma=False, dbname=DBNAME, rsma_type="sum"): + tdLog.printNoPrefix("==========step: create table") + if rsma: + if rsma_type.lower().strip() in ("last", "first"): + create_stb_sql = f'''create table {dbname}.{stb}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned, {BINARY_COL} binary(16) + ) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s + ''' + elif rsma_type.lower().strip() in ("sum", "avg"): + create_stb_sql = f'''create table {dbname}.{stb}( + ts timestamp, {DOUBLE_COL} double, {DOUBLE_COL}_1 double, {DOUBLE_COL}_2 double, {DOUBLE_COL}_3 double, + {FLOAT_COL} float, {DOUBLE_COL}_4 double, {FLOAT_COL}_1 float, {FLOAT_COL}_2 float, {FLOAT_COL}_3 float, + {DOUBLE_COL}_5 double) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s + ''' + else: + create_stb_sql = f'''create table {dbname}.{stb}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned + ) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s + ''' + tdSql.execute(create_stb_sql) + else: + create_stb_sql = f'''create table {dbname}.{stb}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, + {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, + {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned + ) tags ({INT_TAG} int) + ''' + tdSql.execute(create_stb_sql) + + for i in range(ntbnum): + create_ntb_sql = f'''create table {dbname}.nt{i+1}( + ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, + {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, + {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, + {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, + {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned + ) + ''' + tdSql.execute(create_ntb_sql) + + for i in range(ctb_num): + tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )') + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + tagValue = 'beijing' + if (i % 2 == 0): + tagValue = 'shanghai' + sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + + def run(self): + self.rows = 10 + tdLog.printNoPrefix("==========step0:all check") + dbname='d0' + tdSql.execute(f"create database {dbname} retentions -:10d,1m:15d,1h:30d STT_TRIGGER 1 vgroups 6;") + tdSql.execute(f"create stable if not exists {dbname}.st_min (ts timestamp, c1 int) tags (proid int,city binary(20)) rollup(min) watermark 0s,1s max_delay 1m,180s;;") + tdSql.execute(f"create stable if not exists {dbname}.st_avg (ts timestamp, c1 double) tags (city binary(20),district binary(20)) rollup(min) watermark 0s,1s max_delay 1m,180s;;") + self.create_ctable(tdSql, dbname, 'st_min', 'ct_min', 10000) + tdLog.printNoPrefix("==========step4:after wal, all check again ") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())