add checkpoint
This commit is contained in:
parent
14b9d920ba
commit
40eb50676d
|
@ -49,6 +49,10 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
||||||
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
|
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
|
||||||
if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) {
|
if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) {
|
||||||
pReader->complete = 1;
|
pReader->complete = 1;
|
||||||
|
} else {
|
||||||
|
code = -1;
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
pReader->pReaderImpl = pSnapReader;
|
pReader->pReaderImpl = pSnapReader;
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
// CONFIG ==============
|
// CONFIG ==============
|
||||||
// FIXME: if commit multiple times and the config changed?
|
// FIXME: if commit multiple times and the config changed?
|
||||||
if (!pReader->cfgDone) {
|
if (!pReader->cfgDone) {
|
||||||
char fName[TSDB_FILENAME_LEN];
|
char fName[TSDB_FILENAME_LEN];
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
|
||||||
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
|
||||||
|
@ -242,7 +242,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (pReader->pStreamStateReader == NULL) {
|
if (pReader->pStreamStateReader == NULL) {
|
||||||
code =
|
code =
|
||||||
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
|
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
|
||||||
if (code) goto _err;
|
if (code) {
|
||||||
|
pReader->streamStateDone = 1;
|
||||||
|
pReader->pStreamStateReader = NULL;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
|
code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -98,8 +98,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
|
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
|
||||||
|
pHandle->pBackendFile = pFile;
|
||||||
pHandle->checkpointId = 0;
|
pHandle->checkpointId = 0;
|
||||||
pHandle->seraial = 0;
|
pHandle->seraial = 0;
|
||||||
|
|
||||||
pFile->path = taosStrdup(path);
|
pFile->path = taosStrdup(path);
|
||||||
pFile->pSst = taosArrayInit(16, sizeof(void*));
|
pFile->pSst = taosArrayInit(16, sizeof(void*));
|
||||||
|
|
||||||
|
@ -133,7 +135,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
|
||||||
|
|
||||||
if (pFile->pCurrent == NULL) {
|
if (pFile->pCurrent == NULL) {
|
||||||
code = -1;
|
code = -1;
|
||||||
return code;
|
goto _err;
|
||||||
}
|
}
|
||||||
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
|
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
|
||||||
|
|
||||||
|
@ -197,7 +199,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
taosMemoryFree(pFile->pMainfest);
|
taosMemoryFree(pFile->pMainfest);
|
||||||
taosMemoryFree(pFile->pOptions);
|
taosMemoryFree(pFile->pOptions);
|
||||||
taosMemoryFree(pFile->path);
|
taosMemoryFree(pFile->path);
|
||||||
for (int i = 0; pFile->pSst != NULL && i < taosArrayGetSize(pFile->pSst); i++) {
|
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
|
||||||
char* sst = taosArrayGetP(pFile->pSst, i);
|
char* sst = taosArrayGetP(pFile->pSst, i);
|
||||||
taosMemoryFree(sst);
|
taosMemoryFree(sst);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue