enh: rsma checkpoint
This commit is contained in:
parent
e72582a1fa
commit
72ecb0431c
|
@ -137,15 +137,17 @@ struct SSmaStat {
|
|||
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||
|
||||
struct SRSmaInfoItem {
|
||||
int8_t level;
|
||||
int8_t fetchLevel;
|
||||
int8_t triggerStat;
|
||||
int32_t nScanned;
|
||||
int32_t streamFlushed : 1;
|
||||
int32_t maxDelay : 31; // ms
|
||||
tmr_h tmrId;
|
||||
void *pStreamState;
|
||||
void *pStreamTask; // SStreamTask
|
||||
int8_t level;
|
||||
int8_t fetchLevel;
|
||||
int8_t triggerStat;
|
||||
int32_t nScanned;
|
||||
int32_t streamFlushed : 1;
|
||||
int32_t maxDelay : 31; // ms
|
||||
int64_t submitReqVer;
|
||||
int64_t fetchResultVer;
|
||||
tmr_h tmrId;
|
||||
void *pStreamState;
|
||||
void *pStreamTask; // SStreamTask
|
||||
};
|
||||
|
||||
struct SRSmaInfo {
|
||||
|
|
|
@ -169,7 +169,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
|||
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
||||
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
||||
*/
|
||||
smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit,
|
||||
smaInfo("vgId:%d, rsma commit, type:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit,
|
||||
(void *)taosGetSelfPthreadId());
|
||||
nLoops = 0;
|
||||
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
||||
|
|
|
@ -44,8 +44,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
|||
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static void tdFreeRSmaSubmitItems(SArray *pItems);
|
||||
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
|
||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, SArray **ppResList, int8_t *streamFlushed);
|
||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
||||
int32_t execType, SArray **ppResList, int8_t *streamFlushed);
|
||||
static void tdRSmaFetchTrigger(void *param, void *tmrId);
|
||||
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
|
||||
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
|
||||
|
@ -235,19 +235,16 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||
int64_t checkpointId = -1;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
|
||||
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask && *ppTask) {
|
||||
checkpointId = (*ppTask)->chkInfo.checkpointId;
|
||||
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
|
||||
pItem->fetchResultVer = (*ppTask)->info.triggerParam;
|
||||
}
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
return checkpointId;
|
||||
}
|
||||
#endif
|
||||
|
||||
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||
streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||
|
@ -295,12 +292,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
|
||||
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
|
||||
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
|
||||
#if 0
|
||||
pStreamTask->chkInfo.checkpointId =
|
||||
tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
|
||||
#else
|
||||
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
|
||||
#endif
|
||||
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
|
||||
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
||||
if (!pStreamState) {
|
||||
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
||||
|
@ -318,7 +311,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
|
||||
if (pItem->fetchResultVer < pItem->submitReqVer) {
|
||||
// fetch the data when reboot
|
||||
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
|
||||
}
|
||||
|
||||
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
|
||||
int64_t msInterval =
|
||||
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
|
||||
|
@ -337,10 +334,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
|||
|
||||
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
|
||||
smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64
|
||||
" watermark:%" PRIi64 ", finally maxdelay:%" PRIi32,
|
||||
smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64
|
||||
", submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64
|
||||
", finally maxdelay:%" PRIi32,
|
||||
TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
|
||||
param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
|
||||
pItem->submitReqVer, pItem->fetchResultVer, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -624,12 +622,14 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||
int64_t suid, SArray **ppResList, int8_t *streamFlushed) {
|
||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
|
||||
int32_t execType, SArray **ppResList, int8_t *streamFlushed) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSDataBlock *output = NULL;
|
||||
SArray *pResList = NULL;
|
||||
STSchema *pTSchema = pInfo->pTSchema;
|
||||
int64_t suid = pInfo->suid;
|
||||
|
||||
if (!(*ppResList)) {
|
||||
pResList = taosArrayInit(1, POINTER_BYTES);
|
||||
|
@ -657,11 +657,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
if (taosArrayGetSize(pResList) == 0) {
|
||||
break;
|
||||
}
|
||||
#if 0
|
||||
char flag[10] = {0};
|
||||
snprintf(flag, 10, "level %" PRIi8, pItem->level);
|
||||
blockDebugShowDataBlocks(pResList, flag);
|
||||
#endif
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
||||
output = taosArrayGetP(pResList, i);
|
||||
if (output->info.type == STREAM_CHECKPOINT) {
|
||||
|
@ -674,12 +670,17 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||
SSubmitReq2 *pReq = NULL;
|
||||
|
||||
// TODO: the schema update should be handled later(TD-17965)
|
||||
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
|
||||
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// reset the output version to handle reboot
|
||||
if (STREAM_GET_ALL == execType && output->info.version == 0) {
|
||||
// the submitReqVer keeps unchanged since tdExecuteRSmaImpl and tdRSmaFetchAllResult are executed synchronously
|
||||
output->info.version = pItem->submitReqVer;
|
||||
}
|
||||
|
||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
|
||||
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
@ -691,6 +692,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (STREAM_GET_ALL == execType) {
|
||||
atomic_store_64(&pItem->fetchResultVer, output->info.version);
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
|
||||
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
|
||||
|
||||
|
@ -803,9 +808,10 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
|||
*/
|
||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||
ERsmaExecType type, int8_t level) {
|
||||
int32_t idx = level - 1;
|
||||
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
|
||||
SArray *pResList = NULL;
|
||||
int32_t idx = level - 1;
|
||||
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
SArray *pResList = NULL;
|
||||
|
||||
if (!qTaskInfo) {
|
||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
||||
|
@ -833,8 +839,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
|
||||
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL);
|
||||
if (STREAM_INPUT__MERGED_SUBMIT == inputType) {
|
||||
SPackedData *packData = POINTER_SHIFT(pMsg, sizeof(SPackedData) * (msgSize - 1));
|
||||
atomic_store_64(&pItem->submitReqVer, packData->ver);
|
||||
}
|
||||
|
||||
tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL);
|
||||
|
||||
taosArrayDestroy(pResList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1161,8 +1171,8 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
|
||||
int8_t streamFlushed = 0;
|
||||
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema,
|
||||
pRSmaInfo->suid, &pResList, &streamFlushed);
|
||||
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
|
||||
STREAM_CHECKPOINT, &pResList, &streamFlushed);
|
||||
if (code) {
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -1190,7 +1200,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
_checkpoint:
|
||||
// stream state: build checkpoint in backend
|
||||
do {
|
||||
void *infoHash = NULL;
|
||||
SStreamMeta *pMeta = NULL;
|
||||
int64_t checkpointId = taosGetTimestampNs();
|
||||
bool checkpointBuilt = false;
|
||||
void *infoHash = NULL;
|
||||
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
|
||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||
|
@ -1202,32 +1215,48 @@ _checkpoint:
|
|||
if (pItem && pItem->pStreamTask) {
|
||||
SStreamTask *pTask = pItem->pStreamTask;
|
||||
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
|
||||
pTask->checkpointingId = taosGetTimestampNs();
|
||||
pTask->checkpointingId = checkpointId;
|
||||
pTask->chkInfo.checkpointId = pTask->checkpointingId;
|
||||
code = streamTaskBuildCheckpoint(pTask);
|
||||
if (code) {
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
|
||||
pTask->info.triggerParam = pItem->fetchResultVer;
|
||||
|
||||
if (!checkpointBuilt) {
|
||||
// the stream states share one checkpoint
|
||||
code = streamTaskBuildCheckpoint(pTask);
|
||||
if (code) {
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
pMeta = pTask->pMeta;
|
||||
checkpointBuilt = true;
|
||||
}
|
||||
|
||||
taosWLockLatch(&pTask->pMeta->lock);
|
||||
if (0 != streamMetaSaveTask(pTask->pMeta, pTask) || 0 != streamMetaCommit(pTask->pMeta)) {
|
||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
if (0 != streamMetaSaveTask(pMeta, pTask)) {
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||
|
||||
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint/task:%" PRIi64 "/%p, table:%" PRIi64 ", level:%d",
|
||||
TD_VID(pVnode), pTask->checkpointingId, pTask, pRSmaInfo->suid, i + 1);
|
||||
|
||||
// the stream states share one checkpoint
|
||||
taosHashCancelIterate(pInfoHash, infoHash);
|
||||
goto _exit;
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
|
||||
", table:%" PRIi64 ", level:%d",
|
||||
TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pMeta) {
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
if (0 != streamMetaCommit(pMeta)) {
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
}
|
||||
if (checkpointBuilt) {
|
||||
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);
|
||||
}
|
||||
} while (0);
|
||||
_exit:
|
||||
taosArrayDestroy(pResList);
|
||||
|
@ -1366,7 +1395,6 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
continue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
|
||||
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi32 " maxDelay:%d, fetch executed",
|
||||
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
|
||||
|
@ -1384,12 +1412,11 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
}
|
||||
|
||||
pItem->nScanned = 0;
|
||||
#endif
|
||||
|
||||
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, &pResList, NULL) < 0) {
|
||||
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, &pResList, NULL) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -30,13 +30,8 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
|
|||
offset = strlen(outputName);
|
||||
|
||||
// rsma
|
||||
#if 1
|
||||
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
|
||||
(endWithSep ? TD_DIRSEP : ""));
|
||||
#else
|
||||
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s%s%s%s", TD_DIRSEP, "tq", TD_DIRSEP, "stream",
|
||||
TD_DIRSEP, "state", (endWithSep ? TD_DIRSEP : ""));
|
||||
#endif
|
||||
}
|
||||
|
||||
// smaXXXUtil ================
|
||||
|
|
|
@ -661,9 +661,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
|
||||
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
|
||||
lRow = tRow;
|
||||
tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
|
||||
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version,
|
||||
pSubmitTbData->uid);
|
||||
|
||||
// remain row
|
||||
++tRow.iRow;
|
||||
|
@ -683,9 +680,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
lRow = tRow;
|
||||
|
||||
++tRow.iRow;
|
||||
tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
|
||||
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version,
|
||||
pSubmitTbData->uid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -727,9 +721,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
|
||||
if (code) goto _exit;
|
||||
lRow = tRow;
|
||||
tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
|
||||
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version,
|
||||
pSubmitTbData->uid);
|
||||
|
||||
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
|
||||
|
||||
|
@ -753,9 +744,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
|||
lRow = tRow;
|
||||
|
||||
iRow++;
|
||||
tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
|
||||
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version,
|
||||
pSubmitTbData->uid);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -120,8 +120,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
taskId = pTask->streamTaskId.taskId;
|
||||
if (tEncodeI32(pEncoder, taskId)) return -1;
|
||||
|
||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
|
||||
|
||||
|
@ -193,8 +193,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||
pTask->streamTaskId.taskId = taskId;
|
||||
|
||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
|
||||
|
||||
|
|
|
@ -1175,6 +1175,7 @@ e
|
|||
,,y,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
,,y,script,./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError2.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError3.sim
|
||||
|
|
|
@ -114,7 +114,7 @@ endi
|
|||
|
||||
vg_ready:
|
||||
print ====> create stable/child table
|
||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
|
||||
sql create table stb (ts timestamp, c1 float, c2 float, c3 double) tags (t1 int) rollup(sum) watermark 3s,3s max_delay 3s,3s
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
|
|
|
@ -320,6 +320,7 @@
|
|||
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
./test.sh -f tsim/sync/vnodesnapshot-rsma-test.sim
|
||||
./test.sh -f tsim/valgrind/checkError1.sim
|
||||
./test.sh -f tsim/valgrind/checkError2.sim
|
||||
./test.sh -f tsim/valgrind/checkError3.sim
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
from datetime import datetime
|
||||
import time
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
|
||||
PRIMARY_COL = "ts"
|
||||
|
||||
INT_COL = "c_int"
|
||||
BINT_COL = "c_bint"
|
||||
SINT_COL = "c_sint"
|
||||
TINT_COL = "c_tint"
|
||||
FLOAT_COL = "c_float"
|
||||
DOUBLE_COL = "c_double"
|
||||
BOOL_COL = "c_bool"
|
||||
TINT_UN_COL = "c_utint"
|
||||
SINT_UN_COL = "c_usint"
|
||||
BINT_UN_COL = "c_ubint"
|
||||
INT_UN_COL = "c_uint"
|
||||
BINARY_COL = "c_binary"
|
||||
NCHAR_COL = "c_nchar"
|
||||
TS_COL = "c_ts"
|
||||
|
||||
INT_TAG = "t_int"
|
||||
|
||||
TAG_COL = [INT_TAG]
|
||||
|
||||
## insert data args:
|
||||
TIME_STEP = 10000
|
||||
NOW = int(datetime.timestamp(datetime.now()) * 1000)
|
||||
|
||||
# init db/table
|
||||
DBNAME = "db"
|
||||
DB1 = "db1"
|
||||
DB2 = "db2"
|
||||
DB3 = "db3"
|
||||
DB4 = "db4"
|
||||
STBNAME = "stb1"
|
||||
CTBNAME = "ct1"
|
||||
NTBNAME = "nt1"
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
@property
|
||||
def create_databases_sql_err(self):
|
||||
return [
|
||||
# check grammar
|
||||
"create database db1 retentions",
|
||||
"create database db1 retentions 1s:1d",
|
||||
"create database db1 retentions 1s:1d,2s:2d",
|
||||
"create database db1 retentions 1s:1d,2s:2d,3s:3d",
|
||||
"create database db1 retentions 1s:1d,2s:2d,3s:3d,4s:4d",
|
||||
"create database db1 retentions -:1d,2s:2d,3s:3d,4s:4d",
|
||||
"create database db1 retentions --:1d",
|
||||
"create database db1 retentions -:-:1d",
|
||||
"create database db1 retentions 1d:-",
|
||||
"create database db1 retentions -:-",
|
||||
"create database db1 retentions +:1d",
|
||||
"create database db1 retentions :1d",
|
||||
"create database db1 retentions -:1d,-:2d",
|
||||
"create database db1 retentions -:1d,-:2d,-:3d",
|
||||
"create database db1 retentions -:1d,1s:-",
|
||||
"create database db1 retentions -:1d,15s:2d,-:3d",
|
||||
|
||||
# check unit
|
||||
"create database db1 retentions -:1d,1b:1d",
|
||||
"create database db1 retentions -:1d,1u:1d",
|
||||
"create database db1 retentions -:1d,1a:1d",
|
||||
"create database db1 retentions -:1d,1n:1d",
|
||||
"create database db1 retentions -:1d,1y:1d",
|
||||
"create database db1 retentions -:1d,1s:86400s",
|
||||
"create database db1 retentions -:1d,1s:86400000a",
|
||||
"create database db1 retentions -:1d,1s:86400000000u",
|
||||
"create database db1 retentions -:1d,1s:86400000000000b",
|
||||
"create database db1 retentions -:1s,1s:2s",
|
||||
"create database db1 retentions -:1d,1s:1w",
|
||||
"create database db1 retentions -:1d,1s:1n",
|
||||
"create database db1 retentions -:1d,1s:1y",
|
||||
|
||||
# check value range
|
||||
"create database db3 retentions -:-1d",
|
||||
"create database db3 retentions -:0d",
|
||||
"create database db3 retentions -:1439m",
|
||||
"create database db3 retentions -:365001d",
|
||||
"create database db3 retentions -:8760001h",
|
||||
"create database db3 retentions -:525600001m",
|
||||
"create database db3 retentions -:106581d precision 'ns'",
|
||||
"create database db3 retentions -:2557921h precision 'ns'",
|
||||
"create database db3 retentions -:153475201m precision 'ns'",
|
||||
# check relationships
|
||||
"create database db5 retentions -:1440m,1441m:1440m,2d:3d",
|
||||
"create database db5 retentions -:1d,2m:1d,1s:2d",
|
||||
"create database db5 retentions -:1440m,1s:2880m,2s:2879m",
|
||||
"create database db5 retentions -:1d,2s:2d,2s:3d",
|
||||
"create database db5 retentions -:1d,3s:2d,2s:3d",
|
||||
"create database db1 retentions -:1d,2s:3d,3s:2d",
|
||||
"create database db1 retentions -:1d,2s:3d,1s:2d",
|
||||
|
||||
]
|
||||
|
||||
@property
|
||||
def create_databases_sql_current(self):
|
||||
return [
|
||||
f"create database {DB1} retentions -:1d",
|
||||
f"create database {DB2} retentions -:1d,2m:2d,3h:3d",
|
||||
]
|
||||
|
||||
@property
|
||||
def alter_database_sql(self):
|
||||
return [
|
||||
"alter database db1 retentions -:99d",
|
||||
"alter database db2 retentions -:97d,98h:98d,99h:99d,",
|
||||
]
|
||||
|
||||
@property
|
||||
def create_stable_sql_err(self, dbname=DB2):
|
||||
return [
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m",
|
||||
f"create stable {dbname}.stb12 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min",
|
||||
f"create stable {dbname}.stb13 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
|
||||
f"create stable {dbname}.stb14 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
|
||||
f"create stable {dbname}.stb15 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
|
||||
f"create stable {dbname}.stb16 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
|
||||
f"create stable {dbname}.stb21 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
|
||||
f"create stable {dbname}.stb22 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
|
||||
f"create table {dbname}.ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
|
||||
f"create table {dbname}.ntb_2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
|
||||
f"create stable {dbname}.stb23 ({PRIMARY_COL} timestamp, {INT_COL} int, {NCHAR_COL} nchar(16)) tags (tag1 int) " ,
|
||||
f"create stable {dbname}.stb24 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
|
||||
f"create stable {dbname}.stb25 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
|
||||
f"create stable {dbname}.stb26 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
|
||||
# only float/double allowd for avg/sum
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(avg)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(avg)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(avg)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINT_COL} bigint) tags (tag1 int) rollup(sum)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BOOL_COL} bool) tags (tag1 int) rollup(sum)",
|
||||
f"create stable {dbname}.stb11 ({PRIMARY_COL} timestamp, {BINARY_COL} binary(10)) tags (tag1 int) rollup(sum)",
|
||||
|
||||
|
||||
# watermark, max_delay: [0, 900000], [ms, s, m, ?]
|
||||
f"create stable stb17 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
|
||||
f"create stable stb18 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b",
|
||||
f"create stable stb19 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms",
|
||||
f"create stable stb20 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m",
|
||||
f"create stable stb27 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s",
|
||||
f"create stable stb28 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h",
|
||||
f"create stable stb29 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h",
|
||||
f"create stable stb30 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d",
|
||||
|
||||
]
|
||||
|
||||
@property
|
||||
def create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, rsma=False, dbname=DBNAME, rsma_type="sum"):
|
||||
tdLog.printNoPrefix("==========step: create table")
|
||||
if rsma:
|
||||
if rsma_type.lower().strip() in ("last", "first"):
|
||||
create_stb_sql = f'''create table {dbname}.{stb}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned, {BINARY_COL} binary(16)
|
||||
) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
|
||||
'''
|
||||
elif rsma_type.lower().strip() in ("sum", "avg"):
|
||||
create_stb_sql = f'''create table {dbname}.{stb}(
|
||||
ts timestamp, {DOUBLE_COL} double, {DOUBLE_COL}_1 double, {DOUBLE_COL}_2 double, {DOUBLE_COL}_3 double,
|
||||
{FLOAT_COL} float, {DOUBLE_COL}_4 double, {FLOAT_COL}_1 float, {FLOAT_COL}_2 float, {FLOAT_COL}_3 float,
|
||||
{DOUBLE_COL}_5 double) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
|
||||
'''
|
||||
else:
|
||||
create_stb_sql = f'''create table {dbname}.{stb}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
|
||||
) tags ({INT_TAG} int) rollup({rsma_type}) watermark 5s,5s max_delay 5s,5s
|
||||
'''
|
||||
tdSql.execute(create_stb_sql)
|
||||
else:
|
||||
create_stb_sql = f'''create table {dbname}.{stb}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
|
||||
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
|
||||
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
|
||||
) tags ({INT_TAG} int)
|
||||
'''
|
||||
tdSql.execute(create_stb_sql)
|
||||
|
||||
for i in range(ntbnum):
|
||||
create_ntb_sql = f'''create table {dbname}.nt{i+1}(
|
||||
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
|
||||
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
|
||||
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
|
||||
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
|
||||
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
|
||||
)
|
||||
'''
|
||||
tdSql.execute(create_ntb_sql)
|
||||
|
||||
for i in range(ctb_num):
|
||||
tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
|
||||
|
||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
tagValue = 'beijing'
|
||||
if (i % 2 == 0):
|
||||
tagValue = 'shanghai'
|
||||
sql += " %s%d using %s tags(%d, '%s')"%(ctbPrefix,i,stbName,i+1, tagValue)
|
||||
if (i > 0) and (i%100 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
|
||||
def run(self):
|
||||
self.rows = 10
|
||||
tdLog.printNoPrefix("==========step0:all check")
|
||||
dbname='d0'
|
||||
tdSql.execute(f"create database {dbname} retentions -:10d,1m:15d,1h:30d STT_TRIGGER 1 vgroups 6;")
|
||||
tdSql.execute(f"create stable if not exists {dbname}.st_min (ts timestamp, c1 int) tags (proid int,city binary(20)) rollup(min) watermark 0s,1s max_delay 1m,180s;;")
|
||||
tdSql.execute(f"create stable if not exists {dbname}.st_avg (ts timestamp, c1 double) tags (city binary(20),district binary(20)) rollup(min) watermark 0s,1s max_delay 1m,180s;;")
|
||||
self.create_ctable(tdSql, dbname, 'st_min', 'ct_min', 10000)
|
||||
tdLog.printNoPrefix("==========step4:after wal, all check again ")
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue