Merge pull request #13663 from taosdata/feature/stream
feat(wal): support restore from snapshot
This commit is contained in:
commit
a2145321b5
|
@ -308,9 +308,11 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||||
|
taosFreeQitem(pBlock);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
|
taosFreeQitem(pBlock);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
||||||
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||||
|
|
|
@ -141,6 +141,8 @@ typedef struct SWal {
|
||||||
// ctl
|
// ctl
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
|
// ref
|
||||||
|
SHashObj *pRefHash; // ref -> SWalRef
|
||||||
// path
|
// path
|
||||||
char path[WAL_PATH_LEN];
|
char path[WAL_PATH_LEN];
|
||||||
// reusable write head
|
// reusable write head
|
||||||
|
@ -184,7 +186,7 @@ int32_t walRollback(SWal *, int64_t ver);
|
||||||
// notify that previous logs can be pruned safely
|
// notify that previous logs can be pruned safely
|
||||||
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
||||||
int32_t walEndSnapshot(SWal *);
|
int32_t walEndSnapshot(SWal *);
|
||||||
void walRestoreFromSnapshot(SWal *, int64_t ver);
|
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
||||||
// int32_t walDataCorrupted(SWal*);
|
// int32_t walDataCorrupted(SWal*);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
|
@ -199,6 +201,16 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
|
||||||
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
|
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
|
||||||
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
|
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t refId;
|
||||||
|
int64_t ver;
|
||||||
|
} SWalRef;
|
||||||
|
|
||||||
|
SWalRef *walOpenRef(SWal *);
|
||||||
|
void walCloseRef(SWalRef *);
|
||||||
|
int32_t walRefVer(SWalRef *, int64_t ver);
|
||||||
|
int32_t walUnrefVer(SWal *);
|
||||||
|
|
||||||
// deprecated
|
// deprecated
|
||||||
#if 0
|
#if 0
|
||||||
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
||||||
|
|
|
@ -1773,6 +1773,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
}
|
}
|
||||||
|
|
||||||
// assign data
|
// assign data
|
||||||
|
// TODO
|
||||||
ret = taosMemoryCalloc(1, cap + 46);
|
ret = taosMemoryCalloc(1, cap + 46);
|
||||||
ret = POINTER_SHIFT(ret, 46);
|
ret = POINTER_SHIFT(ret, 46);
|
||||||
ret->header.vgId = vgId;
|
ret->header.vgId = vgId;
|
||||||
|
|
|
@ -132,6 +132,7 @@ static inline void walResetVer(SWalVer* pVer) {
|
||||||
|
|
||||||
int walLoadMeta(SWal* pWal);
|
int walLoadMeta(SWal* pWal);
|
||||||
int walSaveMeta(SWal* pWal);
|
int walSaveMeta(SWal* pWal);
|
||||||
|
int walRemoveMeta(SWal* pWal);
|
||||||
int walRollFileInfo(SWal* pWal);
|
int walRollFileInfo(SWal* pWal);
|
||||||
|
|
||||||
int walCheckAndRepairMeta(SWal* pWal);
|
int walCheckAndRepairMeta(SWal* pWal);
|
||||||
|
|
|
@ -419,3 +419,12 @@ int walLoadMeta(SWal* pWal) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int walRemoveMeta(SWal* pWal) {
|
||||||
|
int metaVer = walFindCurMetaVer(pWal);
|
||||||
|
if (metaVer == -1) return 0;
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
walBuildMetaName(pWal, metaVer, fnameStr);
|
||||||
|
taosRemoveFile(fnameStr);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -75,7 +75,7 @@ void walCleanUp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
SWal *pWal = taosMemoryMalloc(sizeof(SWal));
|
SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
|
||||||
if (pWal == NULL) {
|
if (pWal == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -92,6 +92,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init ref
|
||||||
|
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||||
|
if (pWal->pRefHash == NULL) {
|
||||||
|
taosMemoryFree(pWal);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// open meta
|
// open meta
|
||||||
walResetVer(&pWal->vers);
|
walResetVer(&pWal->vers);
|
||||||
pWal->pWriteLogTFile = NULL;
|
pWal->pWriteLogTFile = NULL;
|
||||||
|
@ -100,6 +107,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
|
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
if (pWal->fileInfoSet == NULL) {
|
if (pWal->fileInfoSet == NULL) {
|
||||||
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
|
||||||
|
taosHashCleanup(pWal->pRefHash);
|
||||||
taosMemoryFree(pWal);
|
taosMemoryFree(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -115,12 +123,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
|
|
||||||
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
|
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
taosHashCleanup(pWal->pRefHash);
|
||||||
taosMemoryFree(pWal);
|
taosMemoryFree(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
|
||||||
if (pWal->refId < 0) {
|
if (pWal->refId < 0) {
|
||||||
|
taosHashCleanup(pWal->pRefHash);
|
||||||
taosThreadMutexDestroy(&pWal->mutex);
|
taosThreadMutexDestroy(&pWal->mutex);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
taosMemoryFree(pWal);
|
taosMemoryFree(pWal);
|
||||||
|
@ -130,6 +140,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
walLoadMeta(pWal);
|
walLoadMeta(pWal);
|
||||||
|
|
||||||
if (walCheckAndRepairMeta(pWal) < 0) {
|
if (walCheckAndRepairMeta(pWal) < 0) {
|
||||||
|
taosHashCleanup(pWal->pRefHash);
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
taosThreadMutexDestroy(&pWal->mutex);
|
taosThreadMutexDestroy(&pWal->mutex);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
|
@ -175,6 +186,7 @@ void walClose(SWal *pWal) {
|
||||||
walSaveMeta(pWal);
|
walSaveMeta(pWal);
|
||||||
taosArrayDestroy(pWal->fileInfoSet);
|
taosArrayDestroy(pWal->fileInfoSet);
|
||||||
pWal->fileInfoSet = NULL;
|
pWal->fileInfoSet = NULL;
|
||||||
|
taosHashCleanup(pWal->pRefHash);
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
|
|
|
@ -18,12 +18,47 @@
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
void walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
/*pWal->vers.firstVer = -1;*/
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
taosHashIterate(pWal->pRefHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SWalRef *pRef = (SWalRef *)pIter;
|
||||||
|
if (pRef->ver != -1) {
|
||||||
|
taosHashCancelIterate(pWal->pRefHash, pIter);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCloseFile(&pWal->pWriteLogTFile);
|
||||||
|
taosCloseFile(&pWal->pWriteIdxTFile);
|
||||||
|
|
||||||
|
if (pWal->vers.firstVer != -1) {
|
||||||
|
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||||
|
for (int32_t i = 0; i < fileSetSize; i++) {
|
||||||
|
SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||||
|
taosRemoveFile(fnameStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
walRemoveMeta(pWal);
|
||||||
|
|
||||||
|
pWal->writeCur = -1;
|
||||||
|
pWal->totSize = 0;
|
||||||
|
pWal->lastRollSeq = -1;
|
||||||
|
|
||||||
|
taosArrayClear(pWal->fileInfoSet);
|
||||||
|
pWal->vers.firstVer = -1;
|
||||||
pWal->vers.lastVer = ver;
|
pWal->vers.lastVer = ver;
|
||||||
pWal->vers.commitVer = ver - 1;
|
pWal->vers.commitVer = ver - 1;
|
||||||
pWal->vers.snapshotVer = ver - 1;
|
pWal->vers.snapshotVer = ver - 1;
|
||||||
pWal->vers.verInSnapshotting = -1;
|
pWal->vers.verInSnapshotting = -1;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walCommit(SWal *pWal, int64_t ver) {
|
int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
|
|
Loading…
Reference in New Issue