fix(wal): rollback

This commit is contained in:
Liu Jicong 2022-07-11 13:38:22 +08:00
parent 711f960827
commit 9fd5ec9020
7 changed files with 86 additions and 58 deletions

View File

@ -15,30 +15,37 @@
#include "vnd.h" #include "vnd.h"
const SVnodeCfg vnodeCfgDefault = { const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.vgId = -1, .dbname = "",
.dbname = "", .dbId = 0,
.dbId = 0, .szPage = 4096,
.szPage = 4096, .szCache = 256,
.szCache = 256, .szBuf = 96 * 1024 * 1024,
.szBuf = 96 * 1024 * 1024, .isHeap = false,
.isHeap = false, .isWeak = 0,
.isWeak = 0, .tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI, .update = 1,
.update = 1, .compression = 2,
.compression = 2, .slLevel = 5,
.slLevel = 5, .days = 14400,
.days = 14400, .minRows = 100,
.minRows = 100, .maxRows = 4096,
.maxRows = 4096, .keep2 = 5256000,
.keep2 = 5256000, .keep0 = 5256000,
.keep0 = 5256000, .keep1 = 5256000},
.keep1 = 5256000}, .walCfg =
.walCfg = {
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE}, .vgId = -1,
.hashBegin = 0, .fsyncPeriod = 0,
.hashEnd = 0, .retentionPeriod = -1,
.hashMethod = 0}; .rollPeriod = -1,
.segSize = -1,
.retentionSize = -1,
.level = TAOS_WAL_WRITE,
},
.hashBegin = 0,
.hashEnd = 0,
.hashMethod = 0};
int vnodeCheckCfg(const SVnodeCfg *pCfg) { int vnodeCheckCfg(const SVnodeCfg *pCfg) {
// TODO // TODO
@ -79,7 +86,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *pNodeRetentions = tjsonCreateArray(); SJson *pNodeRetentions = tjsonCreateArray();
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions); tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) { for (int32_t i = 0; i < nRetention; ++i) {
SJson * pNodeRetention = tjsonCreateObject(); SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i; const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq); tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit); tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
@ -156,7 +163,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code); tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
if (code < 0) return -1; if (code < 0) return -1;
SJson * pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int32_t nRetention = tjsonGetArraySize(pNodeRetentions); int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) { if (nRetention > TSDB_RETENTION_MAX) {
nRetention = TSDB_RETENTION_MAX; nRetention = TSDB_RETENTION_MAX;

View File

@ -230,6 +230,7 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit // preCommit
smaPreCommit(pVnode->pSma); smaPreCommit(pVnode->pSma);
@ -278,6 +279,7 @@ int vnodeCommit(SVnode *pVnode) {
smaPostCommit(pVnode->pSma); smaPostCommit(pVnode->pSma);
// apply the commit (TODO) // apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
vnodeBufPoolReset(pVnode->onCommit); vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool; pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit; pVnode->pPool = pVnode->onCommit;

View File

@ -117,6 +117,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open wal // open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir)); taosRealPath(tdir, NULL, sizeof(tdir));
/*pVnode->config.walCfg.retentionSize = 2000;*/
/*pVnode->config.walCfg.segSize = 200;*/
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));

View File

@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qInfo("task %d receive dispatch rsp", pTask->taskId); qDebug("task %d receive dispatch rsp", pTask->taskId);
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT); ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
@ -242,7 +242,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);

View File

@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
} }
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qInfo("stream continue dispatching: task %d", pTask->taskId); qDebug("stream continue dispatching: task %d", pTask->taskId);
SRpcMsg dispatchMsg = {0}; SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL; SEpSet* pEpSet = NULL;

View File

@ -141,34 +141,32 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree(&idxRegPattern); regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo); taosArraySort(pLogInfoArray, compareWalFileInfo);
int oldSz = 0;
if (pWal->fileInfoSet) {
oldSz = taosArrayGetSize(pWal->fileInfoSet);
}
int newSz = taosArrayGetSize(pLogInfoArray);
if (oldSz > newSz) { int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz); int actualFileNum = taosArrayGetSize(pLogInfoArray);
} else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) {
for (int i = metaFileNum; i < actualFileNum; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i); SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
taosArrayDestroy(pLogInfoArray); taosArrayDestroy(pLogInfoArray);
pWal->writeCur = newSz - 1; pWal->writeCur = actualFileNum - 1;
if (newSz > 0) { if (actualFileNum > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1); SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &file_size, NULL); taosStatFile(fnameStr, &fileSize, NULL);
if (oldSz != newSz || pLastFileInfo->fileSize != file_size) { if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = file_size; pLastFileInfo->fileSize = fileSize;
pWal->vers.lastVer = walScanLogGetLastVer(pWal); pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1); ASSERT(pWal->vers.lastVer != -1);

View File

@ -99,7 +99,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// delete files // delete files
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = pWal->writeCur; i < fileSetSize; i++) { for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
@ -113,18 +113,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET); code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
if (code < 0) { if (code < 0) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
// read idx file and get log file pos // read idx file and get log file pos
SWalIdxEntry entry; SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -133,12 +136,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
ASSERT(0);
// TODO // TODO
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (code < 0) { if (code < 0) {
ASSERT(0);
// TODO // TODO
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
@ -148,6 +153,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(taosValidFile(pLogTFile)); ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) { if (size != sizeof(SWalCkHead)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -205,15 +211,22 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
walRoll(pWal); walRoll(pWal);
taosThreadMutexUnlock(&pWal->mutex);
} }
return 0; return 0;
} }
int32_t walEndSnapshot(SWal *pWal) { int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0;
taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting; int64_t ver = pWal->vers.verInSnapshotting;
if (ver == -1) return 0; if (ver == -1) {
code = -1;
goto END;
};
pWal->vers.snapshotVer = ver; pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec(); int ts = taosGetTimestampSec();
@ -229,7 +242,7 @@ int32_t walEndSnapshot(SWal *pWal) {
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) || if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time // delete according to file size or close time
deleteCnt++; deleteCnt++;
@ -259,12 +272,14 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.verInSnapshotting = -1; pWal->vers.verInSnapshotting = -1;
// save snapshot ver, commit ver // save snapshot ver, commit ver
int code = walSaveMeta(pWal); code = walSaveMeta(pWal);
if (code < 0) { if (code < 0) {
return -1; goto END;
} }
return 0; END:
taosThreadMutexUnlock(&pWal->mutex);
return code;
} }
int walRoll(SWal *pWal) { int walRoll(SWal *pWal) {
@ -273,14 +288,14 @@ int walRoll(SWal *pWal) {
code = taosCloseFile(&pWal->pWriteIdxTFile); code = taosCloseFile(&pWal->pWriteIdxTFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; goto END;
} }
} }
if (pWal->pWriteLogTFile != NULL) { if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile); code = taosCloseFile(&pWal->pWriteLogTFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; goto END;
} }
} }
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
@ -291,18 +306,20 @@ int walRoll(SWal *pWal) {
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; code = -1;
goto END;
} }
walBuildLogName(pWal, newFileFirstVersion, fnameStr); walBuildLogName(pWal, newFileFirstVersion, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; code = -1;
goto END;
} }
// terrno set inner // error code was set inner
code = walRollFileInfo(pWal); code = walRollFileInfo(pWal);
if (code != 0) { if (code != 0) {
return -1; goto END;
} }
// switch file // switch file
@ -312,7 +329,9 @@ int walRoll(SWal *pWal) {
ASSERT(pWal->writeCur >= 0); ASSERT(pWal->writeCur >= 0);
pWal->lastRollSeq = walGetSeq(); pWal->lastRollSeq = walGetSeq();
return 0;
END:
return code;
} }
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {