Merge pull request #15427 from taosdata/fix/wal
enh: add wal options to db
This commit is contained in:
commit
ad9c3cbd8c
|
@ -1154,6 +1154,10 @@ typedef struct {
|
||||||
int32_t numOfRetensions;
|
int32_t numOfRetensions;
|
||||||
SArray* pRetensions; // SRetention
|
SArray* pRetensions; // SRetention
|
||||||
void* pTsma;
|
void* pTsma;
|
||||||
|
int32_t walRetentionPeriod;
|
||||||
|
int64_t walRetentionSize;
|
||||||
|
int32_t walRollPeriod;
|
||||||
|
int64_t walSegmentSize;
|
||||||
} SCreateVnodeReq;
|
} SCreateVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
|
|
|
@ -103,8 +103,8 @@ typedef struct SWal {
|
||||||
int32_t fsyncSeq;
|
int32_t fsyncSeq;
|
||||||
// meta
|
// meta
|
||||||
SWalVer vers;
|
SWalVer vers;
|
||||||
TdFilePtr pWriteLogTFile;
|
TdFilePtr pLogFile;
|
||||||
TdFilePtr pWriteIdxTFile;
|
TdFilePtr pIdxFile;
|
||||||
int32_t writeCur;
|
int32_t writeCur;
|
||||||
SArray *fileInfoSet; // SArray<SWalFileInfo>
|
SArray *fileInfoSet; // SArray<SWalFileInfo>
|
||||||
// status
|
// status
|
||||||
|
|
|
@ -3750,6 +3750,10 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
||||||
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
|
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
|
||||||
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
|
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->walRollPeriod) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->walSegmentSize) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -3818,6 +3822,11 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
||||||
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
|
if (tDecodeBinary(&decoder, (uint8_t **)&pReq->pTsma, NULL) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->walRetentionSize) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->walRollPeriod) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->walSegmentSize) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -160,6 +160,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pCfg->walCfg.vgId = pCreate->vgId;
|
pCfg->walCfg.vgId = pCreate->vgId;
|
||||||
|
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
|
||||||
|
pCfg->walCfg.retentionPeriod = pCreate->walRetentionPeriod;
|
||||||
|
pCfg->walCfg.rollPeriod = pCreate->walRollPeriod;
|
||||||
|
pCfg->walCfg.retentionSize = pCreate->walRetentionSize;
|
||||||
|
pCfg->walCfg.segSize = pCreate->walSegmentSize;
|
||||||
|
pCfg->walCfg.level = pCreate->walLevel;
|
||||||
|
|
||||||
pCfg->hashBegin = pCreate->hashBegin;
|
pCfg->hashBegin = pCreate->hashBegin;
|
||||||
pCfg->hashEnd = pCreate->hashEnd;
|
pCfg->hashEnd = pCreate->hashEnd;
|
||||||
pCfg->hashMethod = pCreate->hashMethod;
|
pCfg->hashMethod = pCreate->hashMethod;
|
||||||
|
|
|
@ -302,9 +302,13 @@ typedef struct {
|
||||||
int8_t strict;
|
int8_t strict;
|
||||||
int8_t hashMethod; // default is 1
|
int8_t hashMethod; // default is 1
|
||||||
int8_t cacheLast;
|
int8_t cacheLast;
|
||||||
|
int8_t schemaless;
|
||||||
int32_t numOfRetensions;
|
int32_t numOfRetensions;
|
||||||
SArray* pRetensions;
|
SArray* pRetensions;
|
||||||
int8_t schemaless;
|
int32_t walRetentionPeriod;
|
||||||
|
int64_t walRetentionSize;
|
||||||
|
int32_t walRollPeriod;
|
||||||
|
int64_t walSegmentSize;
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -120,6 +120,10 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
||||||
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, _OVER)
|
||||||
}
|
}
|
||||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
|
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.schemaless, _OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRetentionPeriod, _OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walRetentionSize, _OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.walRollPeriod, _OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pDb->cfg.walSegmentSize, _OVER)
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||||
|
@ -199,6 +203,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.schemaless, _OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRetentionPeriod, _OVER)
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walRetentionSize, _OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.walRollPeriod, _OVER)
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pDb->cfg.walSegmentSize, _OVER)
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||||
taosInitRWLatch(&pDb->lock);
|
taosInitRWLatch(&pDb->lock);
|
||||||
|
@ -318,6 +326,10 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return -1;
|
||||||
|
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return -1;
|
||||||
|
if (pCfg->walRollPeriod < TSDB_DB_MIN_WAL_ROLL_PERIOD) return -1;
|
||||||
|
if (pCfg->walSegmentSize < TSDB_DB_MIN_WAL_SEGMENT_SIZE) return -1;
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -345,6 +357,12 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE;
|
if (pCfg->cacheLastSize <= 0) pCfg->cacheLastSize = TSDB_DEFAULT_CACHE_SIZE;
|
||||||
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
||||||
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
|
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
|
||||||
|
if (pCfg->walRetentionPeriod < 0 && pCfg->walRetentionPeriod != -1)
|
||||||
|
pCfg->walRetentionPeriod = TSDB_DEFAULT_DB_WAL_RETENTION_PERIOD;
|
||||||
|
if (pCfg->walRetentionSize < 0 && pCfg->walRetentionSize != -1)
|
||||||
|
pCfg->walRetentionSize = TSDB_DEFAULT_DB_WAL_RETENTION_SIZE;
|
||||||
|
if (pCfg->walRollPeriod < 0) pCfg->walRollPeriod = TSDB_DEFAULT_DB_WAL_ROLL_PERIOD;
|
||||||
|
if (pCfg->walSegmentSize < 0) pCfg->walSegmentSize = TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||||
|
@ -457,6 +475,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
||||||
.cacheLast = pCreate->cacheLast,
|
.cacheLast = pCreate->cacheLast,
|
||||||
.hashMethod = 1,
|
.hashMethod = 1,
|
||||||
.schemaless = pCreate->schemaless,
|
.schemaless = pCreate->schemaless,
|
||||||
|
.walRetentionPeriod = pCreate->walRetentionPeriod,
|
||||||
|
.walRetentionSize = pCreate->walRetentionSize,
|
||||||
|
.walRollPeriod = pCreate->walRollPeriod,
|
||||||
|
.walSegmentSize = pCreate->walSegmentSize,
|
||||||
};
|
};
|
||||||
|
|
||||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||||
|
|
|
@ -230,6 +230,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
createReq.standby = standby;
|
createReq.standby = standby;
|
||||||
createReq.isTsma = pVgroup->isTsma;
|
createReq.isTsma = pVgroup->isTsma;
|
||||||
createReq.pTsma = pVgroup->pTsma;
|
createReq.pTsma = pVgroup->pTsma;
|
||||||
|
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
|
||||||
|
createReq.walRetentionSize = pDb->cfg.walRetentionSize;
|
||||||
|
createReq.walRollPeriod = pDb->cfg.walRollPeriod;
|
||||||
|
createReq.walSegmentSize = pDb->cfg.walSegmentSize;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
SReplica *pReplica = &createReq.replicas[v];
|
SReplica *pReplica = &createReq.replicas[v];
|
||||||
|
|
|
@ -583,7 +583,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||||
tqDebug("vgId:%d, tq try get suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
|
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
|
||||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
||||||
|
|
|
@ -40,8 +40,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||||
.vgId = -1,
|
.vgId = -1,
|
||||||
.fsyncPeriod = 0,
|
.fsyncPeriod = 0,
|
||||||
.retentionPeriod = -1,
|
.retentionPeriod = -1,
|
||||||
.rollPeriod = -1,
|
.rollPeriod = 0,
|
||||||
.segSize = -1,
|
.segSize = 0,
|
||||||
.retentionSize = -1,
|
.retentionSize = -1,
|
||||||
.level = TAOS_WAL_WRITE,
|
.level = TAOS_WAL_WRITE,
|
||||||
},
|
},
|
||||||
|
|
|
@ -101,8 +101,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
|
|
||||||
// open meta
|
// open meta
|
||||||
walResetVer(&pWal->vers);
|
walResetVer(&pWal->vers);
|
||||||
pWal->pWriteLogTFile = NULL;
|
pWal->pLogFile = NULL;
|
||||||
pWal->pWriteIdxTFile = NULL;
|
pWal->pIdxFile = NULL;
|
||||||
pWal->writeCur = -1;
|
pWal->writeCur = -1;
|
||||||
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
|
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
if (pWal->fileInfoSet == NULL) {
|
if (pWal->fileInfoSet == NULL) {
|
||||||
|
@ -179,10 +179,10 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
|
||||||
|
|
||||||
void walClose(SWal *pWal) {
|
void walClose(SWal *pWal) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
taosCloseFile(&pWal->pWriteLogTFile);
|
taosCloseFile(&pWal->pLogFile);
|
||||||
pWal->pWriteLogTFile = NULL;
|
pWal->pLogFile = NULL;
|
||||||
taosCloseFile(&pWal->pWriteIdxTFile);
|
taosCloseFile(&pWal->pIdxFile);
|
||||||
pWal->pWriteIdxTFile = NULL;
|
pWal->pIdxFile = NULL;
|
||||||
walSaveMeta(pWal);
|
walSaveMeta(pWal);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
pWal->fileInfoSet = NULL;
|
pWal->fileInfoSet = NULL;
|
||||||
|
@ -223,7 +223,7 @@ static void walFsyncAll() {
|
||||||
if (walNeedFsync(pWal)) {
|
if (walNeedFsync(pWal)) {
|
||||||
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
|
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
|
||||||
atomic_load_32(&tsWal.seq));
|
atomic_load_32(&tsWal.seq));
|
||||||
int32_t code = taosFsyncFile(pWal->pWriteLogTFile);
|
int32_t code = taosFsyncFile(pWal->pLogFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
strerror(code));
|
strerror(code));
|
||||||
|
|
|
@ -22,8 +22,8 @@
|
||||||
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
|
||||||
int64_t code = 0;
|
int64_t code = 0;
|
||||||
|
|
||||||
TdFilePtr pIdxTFile = pWal->pWriteIdxTFile;
|
TdFilePtr pIdxTFile = pWal->pIdxFile;
|
||||||
TdFilePtr pLogTFile = pWal->pWriteLogTFile;
|
TdFilePtr pLogTFile = pWal->pLogFile;
|
||||||
|
|
||||||
// seek position
|
// seek position
|
||||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||||
|
@ -68,8 +68,8 @@ int walInitWriteFile(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// switch file
|
// switch file
|
||||||
pWal->pWriteIdxTFile = pIdxTFile;
|
pWal->pIdxFile = pIdxTFile;
|
||||||
pWal->pWriteLogTFile = pLogTFile;
|
pWal->pLogFile = pLogTFile;
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -78,15 +78,15 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
int code;
|
int code;
|
||||||
TdFilePtr pIdxTFile, pLogTFile;
|
TdFilePtr pIdxTFile, pLogTFile;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
if (pWal->pWriteLogTFile != NULL) {
|
if (pWal->pLogFile != NULL) {
|
||||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
code = taosCloseFile(&pWal->pLogFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pWal->pWriteIdxTFile != NULL) {
|
if (pWal->pIdxFile != NULL) {
|
||||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
code = taosCloseFile(&pWal->pIdxFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -106,7 +106,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxTFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
pWal->pWriteIdxTFile = NULL;
|
pWal->pIdxFile = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||||
|
@ -114,12 +114,12 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
||||||
if (pLogTFile == NULL) {
|
if (pLogTFile == NULL) {
|
||||||
taosCloseFile(&pIdxTFile);
|
taosCloseFile(&pIdxTFile);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
pWal->pWriteLogTFile = NULL;
|
pWal->pLogFile = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->pWriteLogTFile = pLogTFile;
|
pWal->pLogFile = pLogTFile;
|
||||||
pWal->pWriteIdxTFile = pIdxTFile;
|
pWal->pIdxFile = pIdxTFile;
|
||||||
pWal->writeCur = idx;
|
pWal->writeCur = idx;
|
||||||
return fileFirstVer;
|
return fileFirstVer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,8 +32,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pWal->pWriteLogTFile);
|
taosCloseFile(&pWal->pLogFile);
|
||||||
taosCloseFile(&pWal->pWriteIdxTFile);
|
taosCloseFile(&pWal->pIdxFile);
|
||||||
|
|
||||||
if (pWal->vers.firstVer != -1) {
|
if (pWal->vers.firstVer != -1) {
|
||||||
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
@ -261,14 +261,13 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
|
||||||
int64_t minVerToDelete = ver;
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SWalRef *pRef = *(SWalRef **)pIter;
|
SWalRef *pRef = *(SWalRef **)pIter;
|
||||||
if (pRef->refVer == -1) continue;
|
if (pRef->refVer == -1) continue;
|
||||||
minVerToDelete = TMIN(minVerToDelete, pRef->refVer);
|
ver = TMIN(ver, pRef->refVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteCnt = 0;
|
int deleteCnt = 0;
|
||||||
|
@ -277,6 +276,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
tmp.firstVer = ver;
|
tmp.firstVer = ver;
|
||||||
// find files safe to delete
|
// find files safe to delete
|
||||||
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
||||||
|
if (pInfo) {
|
||||||
if (ver >= pInfo->lastVer) {
|
if (ver >= pInfo->lastVer) {
|
||||||
pInfo++;
|
pInfo++;
|
||||||
}
|
}
|
||||||
|
@ -307,6 +307,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
} else {
|
} else {
|
||||||
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
pWal->totSize = newTotSize;
|
pWal->totSize = newTotSize;
|
||||||
pWal->vers.verInSnapshotting = -1;
|
pWal->vers.verInSnapshotting = -1;
|
||||||
|
@ -324,34 +325,34 @@ END:
|
||||||
|
|
||||||
int32_t walRollImpl(SWal *pWal) {
|
int32_t walRollImpl(SWal *pWal) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pWal->pWriteIdxTFile != NULL) {
|
if (pWal->pIdxFile != NULL) {
|
||||||
code = taosCloseFile(&pWal->pWriteIdxTFile);
|
code = taosCloseFile(&pWal->pIdxFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pWal->pWriteLogTFile != NULL) {
|
if (pWal->pLogFile != NULL) {
|
||||||
code = taosCloseFile(&pWal->pWriteLogTFile);
|
code = taosCloseFile(&pWal->pLogFile);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TdFilePtr pIdxTFile, pLogTFile;
|
TdFilePtr pIdxFile, pLogFile;
|
||||||
// create new file
|
// create new file
|
||||||
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
|
int64_t newFileFirstVer = pWal->vers.lastVer + 1;
|
||||||
char fnameStr[WAL_FILE_LEN];
|
char fnameStr[WAL_FILE_LEN];
|
||||||
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
walBuildIdxName(pWal, newFileFirstVer, fnameStr);
|
||||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pIdxFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
walBuildLogName(pWal, newFileFirstVer, fnameStr);
|
||||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pLogTFile == NULL) {
|
if (pLogFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -363,8 +364,8 @@ int32_t walRollImpl(SWal *pWal) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// switch file
|
// switch file
|
||||||
pWal->pWriteIdxTFile = pIdxTFile;
|
pWal->pIdxFile = pIdxFile;
|
||||||
pWal->pWriteLogTFile = pLogTFile;
|
pWal->pLogFile = pLogFile;
|
||||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||||
ASSERT(pWal->writeCur >= 0);
|
ASSERT(pWal->writeCur >= 0);
|
||||||
|
|
||||||
|
@ -378,10 +379,10 @@ END:
|
||||||
|
|
||||||
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
SWalIdxEntry entry = {.ver = ver, .offset = offset};
|
||||||
int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_END);
|
int64_t idxOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
|
||||||
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
wDebug("vgId:%d, write index, index:%" PRId64 ", offset:%" PRId64 ", at %" PRId64, pWal->cfg.vgId, ver, offset,
|
||||||
idxOffset);
|
idxOffset);
|
||||||
int64_t size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
|
int64_t size = taosWriteFile(pWal->pIdxFile, &entry, sizeof(SWalIdxEntry));
|
||||||
if (size != sizeof(SWalIdxEntry)) {
|
if (size != sizeof(SWalIdxEntry)) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
// TODO truncate
|
// TODO truncate
|
||||||
|
@ -407,7 +408,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
||||||
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
||||||
|
|
||||||
if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
|
||||||
// TODO ftruncate
|
// TODO ftruncate
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
|
@ -416,7 +417,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) {
|
if (taosWriteFile(pWal->pLogFile, (char *)body, bodyLen) != bodyLen) {
|
||||||
// TODO ftruncate
|
// TODO ftruncate
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
|
||||||
|
@ -456,14 +457,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
||||||
if (walInitWriteFile(pWal) < 0) {
|
if (walInitWriteFile(pWal) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
||||||
|
|
||||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
@ -494,14 +495,14 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWal->pWriteIdxTFile == NULL || pWal->pWriteIdxTFile == NULL || pWal->writeCur < 0) {
|
if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) {
|
||||||
if (walInitWriteFile(pWal) < 0) {
|
if (walInitWriteFile(pWal) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pWal->pWriteIdxTFile != NULL && pWal->pWriteLogTFile != NULL && pWal->writeCur >= 0);
|
ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0);
|
||||||
|
|
||||||
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
@ -524,7 +525,7 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
||||||
void walFsync(SWal *pWal, bool forceFsync) {
|
void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||||
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||||
if (taosFsyncFile(pWal->pWriteLogTFile) < 0) {
|
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ class TDTestCase:
|
||||||
if dropFlag == 1:
|
if dropFlag == 1:
|
||||||
tsql.execute("drop database if exists %s"%(dbName))
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
|
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period -1 wal_retention_size -1"%(dbName, vgroups, replica))
|
||||||
tdLog.debug("complete to create database %s"%(dbName))
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
|
||||||
tdLog.info("create stb")
|
tdLog.info("create stb")
|
||||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
tdLog.info("create ctb")
|
tdLog.info("create ctb")
|
||||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
|
||||||
tdLog.info("create stb")
|
tdLog.info("create stb")
|
||||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
tdLog.info("create ctb")
|
tdLog.info("create ctb")
|
||||||
|
|
|
@ -53,7 +53,7 @@ class TDTestCase:
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1,wal_retention_size=-1, wal_retention_period=-1)
|
||||||
tdLog.info("create stb")
|
tdLog.info("create stb")
|
||||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
tdLog.info("create ctb")
|
tdLog.info("create ctb")
|
||||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1, wal_retention_period=-1)
|
||||||
tdLog.info("create stb")
|
tdLog.info("create stb")
|
||||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
tdLog.info("create ctb")
|
tdLog.info("create ctb")
|
||||||
|
|
Loading…
Reference in New Issue