enh: rsma checkpoint
This commit is contained in:
parent
d9ed63473c
commit
a48968e5e8
|
@ -445,11 +445,6 @@ typedef struct SRetention {
|
||||||
int8_t keepUnit;
|
int8_t keepUnit;
|
||||||
} SRetention;
|
} SRetention;
|
||||||
|
|
||||||
typedef struct SRetentionEx {
|
|
||||||
SRetention rtn;
|
|
||||||
int64_t checkpointId;
|
|
||||||
} SRetentionEx;
|
|
||||||
|
|
||||||
#define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0))
|
#define RETENTION_VALID(l, r) ((((l) == 0 && (r)->freq >= 0) || ((r)->freq > 0)) && ((r)->keep > 0))
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
|
|
@ -134,11 +134,10 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->tsdbCfg.minRows = pCreate->minRows;
|
pCfg->tsdbCfg.minRows = pCreate->minRows;
|
||||||
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
|
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
|
||||||
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
|
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
|
||||||
SRetentionEx *pRetention = &pCfg->tsdbCfg.retentions[i];
|
SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
|
||||||
memcpy(&pRetention->rtn, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
|
memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
|
||||||
pRetention->checkpointId = -1;
|
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
if ((pRetention->rtn.freq >= 0 && pRetention->rtn.keep > 0)) pCfg->isRsma = 1;
|
if ((pRetention->freq >= 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -287,7 +287,7 @@ struct STsdbCfg {
|
||||||
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
int32_t keep1; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||||
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
int32_t keep2; // just for save config, don't use in tsdbRead/tsdbCommit/..., and use STsdbKeepCfg in STsdb instead
|
||||||
int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead
|
int32_t keepTimeOffset; // just for save config, use STsdbKeepCfg in STsdb instead
|
||||||
SRetentionEx retentions[TSDB_RETENTION_MAX];
|
SRetention retentions[TSDB_RETENTION_MAX];
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -155,7 +155,7 @@ struct SRSmaInfo {
|
||||||
int64_t lastRecv; // ms
|
int64_t lastRecv; // ms
|
||||||
int8_t assigned; // 0 idle, 1 assgined for exec
|
int8_t assigned; // 0 idle, 1 assgined for exec
|
||||||
int8_t delFlag;
|
int8_t delFlag;
|
||||||
int16_t padding;
|
int8_t padding;
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
|
||||||
|
@ -163,12 +163,10 @@ struct SRSmaInfo {
|
||||||
STaosQall *qall; // buffer qall of SubmitReq
|
STaosQall *qall; // buffer qall of SubmitReq
|
||||||
};
|
};
|
||||||
|
|
||||||
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
|
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
||||||
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
|
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
||||||
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
|
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
||||||
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
|
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
|
||||||
#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i])
|
|
||||||
#define RSMA_INFO_ITEM(r, i) (&(r)->items[i])
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_TRIGGER_STAT_INIT = 0,
|
TASK_TRIGGER_STAT_INIT = 0,
|
||||||
|
|
|
@ -156,10 +156,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) {
|
if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) {
|
||||||
smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit);
|
smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
|
smaDebug("vgId:%d, rsma commit, type:%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
|
||||||
}
|
}
|
||||||
TD_SMA_LOOPS_CHECK(nLoops, 1000);
|
TD_SMA_LOOPS_CHECK(nLoops, 1000);
|
||||||
}
|
}
|
||||||
|
@ -175,6 +175,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
||||||
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
||||||
TD_SMA_LOOPS_CHECK(nLoops, 1000);
|
TD_SMA_LOOPS_CHECK(nLoops, 1000);
|
||||||
}
|
}
|
||||||
|
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||||
|
|
||||||
if (!isCommit) goto _exit;
|
if (!isCommit) goto _exit;
|
||||||
|
|
||||||
|
@ -183,7 +184,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
|
||||||
|
|
||||||
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
||||||
|
|
||||||
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
|
|
||||||
|
|
||||||
// all rsma results are written completely
|
// all rsma results are written completely
|
||||||
STsdb *pTsdb = NULL;
|
STsdb *pTsdb = NULL;
|
||||||
|
|
|
@ -16,13 +16,13 @@
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration);
|
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration);
|
||||||
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
|
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
|
||||||
static int32_t rsmaRestore(SSma *pSma);
|
static int32_t rsmaRestore(SSma *pSma);
|
||||||
|
|
||||||
#define SMA_SET_KEEP_CFG(v, l) \
|
#define SMA_SET_KEEP_CFG(v, l) \
|
||||||
do { \
|
do { \
|
||||||
SRetention *r = &(pCfg->retentions[l].rtn); \
|
SRetention *r = &(pCfg->retentions[l]); \
|
||||||
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
|
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
|
||||||
pKeepCfg->keep0 = pKeepCfg->keep2; \
|
pKeepCfg->keep0 = pKeepCfg->keep2; \
|
||||||
pKeepCfg->keep1 = pKeepCfg->keep2; \
|
pKeepCfg->keep1 = pKeepCfg->keep2; \
|
||||||
|
@ -32,7 +32,7 @@ static int32_t rsmaRestore(SSma *pSma);
|
||||||
|
|
||||||
#define SMA_OPEN_RSMA_IMPL(v, l, force) \
|
#define SMA_OPEN_RSMA_IMPL(v, l, force) \
|
||||||
do { \
|
do { \
|
||||||
SRetention *r = &(((SRetentionEx *)VND_RETENTIONS(v) + l)->rtn); \
|
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
|
||||||
if (!RETENTION_VALID(l, r)) { \
|
if (!RETENTION_VALID(l, r)) { \
|
||||||
if (l == 0) { \
|
if (l == 0) { \
|
||||||
code = TSDB_CODE_INVALID_PARA; \
|
code = TSDB_CODE_INVALID_PARA; \
|
||||||
|
@ -59,9 +59,9 @@ static int32_t rsmaRestore(SSma *pSma);
|
||||||
* @param duration
|
* @param duration
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t precision, int32_t duration) {
|
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) {
|
||||||
int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.freq, precision, TIME_UNIT_MINUTE);
|
int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE);
|
||||||
int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->rtn.keep, precision, TIME_UNIT_MINUTE);
|
int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE);
|
||||||
int32_t days = duration; // min
|
int32_t days = duration; // min
|
||||||
|
|
||||||
if (days < freqDuration) {
|
if (days < freqDuration) {
|
||||||
|
@ -76,10 +76,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetentionEx *r, int8_t level, int8_t
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
freqDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.freq, precision, TIME_UNIT_MINUTE);
|
freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE);
|
||||||
keepDuration = convertTimeFromPrecisionToUnit((r + level)->rtn.keep, precision, TIME_UNIT_MINUTE);
|
keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE);
|
||||||
|
|
||||||
int32_t nFreqTimes = (r + level)->rtn.freq / (60 * 1000); // use 60s for freq of 1st level
|
int32_t nFreqTimes = (r + level)->freq / (60 * 1000); // use 60s for freq of 1st level
|
||||||
days *= (nFreqTimes > 1 ? nFreqTimes : 1);
|
days *= (nFreqTimes > 1 ? nFreqTimes : 1);
|
||||||
|
|
||||||
if (days < freqDuration) {
|
if (days < freqDuration) {
|
||||||
|
|
|
@ -232,6 +232,29 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t tdRSmaTaskGetCheckpointId(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||||
|
int64_t checkpointId = -1;
|
||||||
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
if (ppTask && *ppTask) {
|
||||||
|
checkpointId = (*ppTask)->chkInfo.checkpointId;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
return checkpointId;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
|
||||||
|
streamMetaUnregisterTask(pMeta, streamId, taskId);
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
smaDebug("vgId:%d rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||||
int8_t idx) {
|
int8_t idx) {
|
||||||
if ((param->qmsgLen > 0) && param->qmsg[idx]) {
|
if ((param->qmsgLen > 0) && param->qmsg[idx]) {
|
||||||
|
@ -267,7 +290,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
|
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
|
||||||
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
|
pStreamTask->exec.qmsg = taosMemoryMalloc(strlen(RSMA_TASK_FLAG) + 1);
|
||||||
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
|
sprintf(pStreamTask->exec.qmsg, "%s", RSMA_TASK_FLAG);
|
||||||
pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId;
|
pStreamTask->chkInfo.checkpointId =
|
||||||
|
tdRSmaTaskGetCheckpointId(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
|
||||||
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
|
||||||
if (!pStreamState) {
|
if (!pStreamState) {
|
||||||
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
|
||||||
|
@ -275,6 +299,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
}
|
}
|
||||||
pItem->pStreamState = pStreamState;
|
pItem->pStreamState = pStreamState;
|
||||||
|
|
||||||
|
tdRSmaTaskRemove(pStreamTask->pMeta, pStreamTask->id.streamId, pStreamTask->id.taskId);
|
||||||
|
|
||||||
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
|
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
|
||||||
initStorageAPI(&handle.api);
|
initStorageAPI(&handle.api);
|
||||||
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
|
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
|
||||||
|
@ -1129,19 +1155,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
|
if (streamFlushed) {
|
||||||
smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
|
pRSmaInfo->items[i].streamFlushed = 1;
|
||||||
nStreamFlushed, nTaskInfo);
|
if (++nStreamFlushed >= nTaskInfo) {
|
||||||
taosHashCancelIterate(pInfoHash, infoHash);
|
smaInfo("vgId:%d rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
|
||||||
goto _checkpoint;
|
nSleep * 10, nStreamFlushed, nTaskInfo);
|
||||||
|
taosHashCancelIterate(pInfoHash, infoHash);
|
||||||
|
goto _checkpoint;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosUsleep(10);
|
taosUsleep(10);
|
||||||
++nSleep;
|
++nSleep;
|
||||||
smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
|
smaDebug("vgId:%d, rsma commit, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode),
|
||||||
nStreamFlushed, nTaskInfo);
|
nSleep * 10, nStreamFlushed, nTaskInfo);
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -1149,7 +1178,6 @@ _checkpoint:
|
||||||
// stream state: build checkpoint in backend
|
// stream state: build checkpoint in backend
|
||||||
do {
|
do {
|
||||||
void *infoHash = NULL;
|
void *infoHash = NULL;
|
||||||
|
|
||||||
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
|
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
|
||||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||||
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
|
||||||
|
@ -1170,29 +1198,16 @@ _checkpoint:
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pTask->pMeta->lock);
|
taosWLockLatch(&pTask->pMeta->lock);
|
||||||
if (streamMetaSaveTask(pTask->pMeta, pTask) != 0) {
|
if (0 != streamMetaSaveTask(pTask->pMeta, pTask) || 0 != streamMetaCommit(pTask->pMeta)) {
|
||||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosHashCancelIterate(pInfoHash, infoHash);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamMetaCommit(pTask->pMeta) != 0) {
|
|
||||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosHashCancelIterate(pInfoHash, infoHash);
|
taosHashCancelIterate(pInfoHash, infoHash);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||||
|
|
||||||
// save checkpointId to vnode.json
|
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint/task:%" PRIi64 "/%p, table:%" PRIi64 ", level:%d",
|
||||||
(pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId;
|
TD_VID(pVnode), pTask->checkpointingId, pTask, pRSmaInfo->suid, i + 1);
|
||||||
|
|
||||||
smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64
|
|
||||||
", level:%d, checkpointId:%" PRIi64,
|
|
||||||
TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
|
|
||||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
|
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
|
||||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idstr,
|
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
|
||||||
int8_t* pLevel);
|
int8_t* pLevel);
|
||||||
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
||||||
|
@ -3140,7 +3140,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* retentions, const char* idStr,
|
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr,
|
||||||
int8_t* pLevel) {
|
int8_t* pLevel) {
|
||||||
if (VND_IS_RSMA(pVnode)) {
|
if (VND_IS_RSMA(pVnode)) {
|
||||||
int8_t level = 0;
|
int8_t level = 0;
|
||||||
|
@ -3151,7 +3151,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetentionEx* r
|
||||||
: 1000000L);
|
: 1000000L);
|
||||||
|
|
||||||
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||||
SRetention* pRetention = &((retentions + level)->rtn);
|
SRetention* pRetention = retentions + level;
|
||||||
if (pRetention->keep <= 0) {
|
if (pRetention->keep <= 0) {
|
||||||
if (level > 0) {
|
if (level > 0) {
|
||||||
--level;
|
--level;
|
||||||
|
|
|
@ -106,24 +106,23 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "keepTimeOffset", pCfg->tsdbCfg.keepTimeOffset) < 0) return -1;
|
||||||
if (pCfg->tsdbCfg.retentions[0].rtn.keep > 0) {
|
if (pCfg->tsdbCfg.retentions[0].keep > 0) {
|
||||||
int32_t nRetention = 1;
|
int32_t nRetention = 1;
|
||||||
if (pCfg->tsdbCfg.retentions[1].rtn.freq > 0) {
|
if (pCfg->tsdbCfg.retentions[1].freq > 0) {
|
||||||
++nRetention;
|
++nRetention;
|
||||||
if (pCfg->tsdbCfg.retentions[2].rtn.freq > 0) {
|
if (pCfg->tsdbCfg.retentions[2].freq > 0) {
|
||||||
++nRetention;
|
++nRetention;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SJson *pNodeRetentions = tjsonCreateArray();
|
SJson *pNodeRetentions = tjsonCreateArray();
|
||||||
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
|
||||||
for (int32_t i = 0; i < nRetention; ++i) {
|
for (int32_t i = 0; i < nRetention; ++i) {
|
||||||
SJson *pNodeRetention = tjsonCreateObject();
|
SJson *pNodeRetention = tjsonCreateObject();
|
||||||
const SRetentionEx *pRetention = pCfg->tsdbCfg.retentions + i;
|
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->rtn.freq);
|
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit);
|
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->rtn.keep);
|
tjsonAddIntegerToObject(pNodeRetention, "keep", pRetention->keep);
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit);
|
tjsonAddIntegerToObject(pNodeRetention, "keepUnit", pRetention->keepUnit);
|
||||||
tjsonAddIntegerToObject(pNodeRetention, "checkpointId", pRetention->checkpointId);
|
|
||||||
tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
|
tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,12 +231,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
for (int32_t i = 0; i < nRetention; ++i) {
|
for (int32_t i = 0; i < nRetention; ++i) {
|
||||||
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
|
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
|
||||||
ASSERT(pNodeRetention != NULL);
|
ASSERT(pNodeRetention != NULL);
|
||||||
SRetentionEx *pRetention = &(pCfg->tsdbCfg.retentions[i]);
|
tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code);
|
||||||
tjsonGetNumberValue(pNodeRetention, "freq", pRetention->rtn.freq, code);
|
tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code);
|
||||||
tjsonGetNumberValue(pNodeRetention, "freqUnit", pRetention->rtn.freqUnit, code);
|
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep, code);
|
||||||
tjsonGetNumberValue(pNodeRetention, "keep", pRetention->rtn.keep, code);
|
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit, code);
|
||||||
tjsonGetNumberValue(pNodeRetention, "keepUnit", pRetention->rtn.keepUnit, code);
|
|
||||||
tjsonGetNumberValue(pNodeRetention, "checkpointId", pRetention->checkpointId, code);
|
|
||||||
}
|
}
|
||||||
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
|
tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
|
|
|
@ -687,10 +687,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
if (0 == strcmp(pTask->exec.qmsg, "rsma_task")) {
|
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
|
||||||
tFreeStreamTask(pTask);
|
|
||||||
continue;
|
|
||||||
} else if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
|
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue