fix stream state transfer

This commit is contained in:
yihaoDeng 2023-07-20 09:51:18 +00:00
parent 6e5be0d30e
commit 519d3b74d1
3 changed files with 40 additions and 31 deletions

View File

@ -51,15 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
SStreamSnapReader* pSnapReader = NULL;
// restore from checkpoint if checkpointid != 0
if (checkpointId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints",
TD_DIRSEP, checkpointId);
} else {
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "state");
}
if (streamSnapReaderOpen(pTq, sver, checkpointId, tdir, &pSnapReader) == 0) {
if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1;
} else {
code = -1;

View File

@ -79,31 +79,42 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname)
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
do { \
sprintf(fullname, "%s/%s", path, file); \
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
} while (0)
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
// impl later
int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 128);
memcpy(tdir, path, len);
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
} else {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
}
int32_t code = 0;
TdDirPtr pDir = taosOpenDir(path);
TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) {
qError("stream-state failed to open %s", tdir);
goto _err;
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pHandle->pBackendFile = pFile;
pHandle->checkpointId = 0;
pHandle->checkpointId = chkpId;
pHandle->seraial = 0;
pFile->path = taosStrdup(path);
pFile->path = tdir;
pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry;
@ -135,7 +146,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) {
qError("stream-state failed to open %s, reason: no valid file", tdir);
code = -1;
tdir = NULL;
goto _err;
}
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
@ -176,11 +189,13 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
pHandle->pFileList = list;
char fullname[256] = {0};
char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
if (pHandle->fd == NULL) {
qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno));
tdir = NULL;
goto _err;
}
pHandle->seraial = 0;
@ -188,6 +203,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
return 0;
_err:
streamSnapHandleDestroy(pHandle);
taosMemoryFreeClear(tdir);
code = -1;
return code;
@ -195,32 +211,33 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile;
taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
taosMemoryFree(pFile->pOptions);
taosMemoryFree(pFile->path);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
taosMemoryFree(sst);
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
taosMemoryFree(pFile->pOptions);
taosMemoryFree(pFile->path);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
taosMemoryFree(sst);
}
taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile);
}
taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile);
taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd);
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) {
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) {
// impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReader->handle.checkpointId = ever;
// const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) {
taosMemoryFree(pReader);
return -1;
}

View File

@ -804,7 +804,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
// sprintf(&key[count - 2], "%c", i);
key[count - 2] = '0' + i;
ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn);
ret = tdbTbInsert(pDb, key, count, NULL, 0, txn);
GTEST_ASSERT_EQ(ret, 0);
}
}