fix stream state transfer
This commit is contained in:
parent
6a27e81964
commit
0c2e86f416
|
@ -89,6 +89,17 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
|
|||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
|
||||
} while (0)
|
||||
|
||||
int32_t getFileSize(char* path, char* name, int64_t* sz) {
|
||||
int ret = 0;
|
||||
|
||||
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
||||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
|
||||
|
||||
ret = taosStatFile(fullname, sz, NULL);
|
||||
taosMemoryFree(fullname);
|
||||
|
||||
return ret;
|
||||
}
|
||||
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
|
||||
// impl later
|
||||
int len = strlen(path);
|
||||
|
@ -157,32 +168,32 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
|||
// current
|
||||
item.name = pFile->pCurrent;
|
||||
item.type = ROCKSDB_CURRENT_TYPE;
|
||||
if (taosStatFile(pFile->pCurrent, &item.size, NULL) != 0) {
|
||||
qError("stream-state failed to get file size: %s", pFile->pCurrent);
|
||||
}
|
||||
getFileSize(pFile->path, item.name, &item.size);
|
||||
taosArrayPush(list, &item);
|
||||
|
||||
// mainfest
|
||||
item.name = pFile->pMainfest;
|
||||
item.type = ROCKSDB_MAINFEST_TYPE;
|
||||
taosStatFile(pFile->pMainfest, &item.size, NULL);
|
||||
getFileSize(pFile->path, item.name, &item.size);
|
||||
taosArrayPush(list, &item);
|
||||
|
||||
// options
|
||||
item.name = pFile->pOptions;
|
||||
item.type = ROCKSDB_OPTIONS_TYPE;
|
||||
taosStatFile(pFile->pOptions, &item.size, NULL);
|
||||
getFileSize(pFile->path, item.name, &item.size);
|
||||
taosArrayPush(list, &item);
|
||||
// sst
|
||||
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
|
||||
char* sst = taosArrayGetP(pFile->pSst, i);
|
||||
item.name = sst;
|
||||
item.type = ROCKSDB_SST_TYPE;
|
||||
taosStatFile(sst, &item.size, NULL);
|
||||
getFileSize(pFile->path, item.name, &item.size);
|
||||
taosArrayPush(list, &item);
|
||||
}
|
||||
// meta
|
||||
item.name = pFile->pCheckpointMeta;
|
||||
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
|
||||
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL);
|
||||
getFileSize(pFile->path, item.name, &item.size);
|
||||
taosArrayPush(list, &item);
|
||||
|
||||
pHandle->pBackendFile = pFile;
|
||||
|
@ -305,8 +316,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
|||
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||
char fullname[256] = {0};
|
||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
|
||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
|
||||
|
||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
|
||||
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||
pHandle->offset += nread;
|
||||
|
||||
|
@ -349,7 +360,12 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
|
|||
pHandle->pFileList = list;
|
||||
pHandle->currFileIdx = 0;
|
||||
pHandle->offset = 0;
|
||||
pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE);
|
||||
|
||||
char fullname[256] = {0};
|
||||
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
|
||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
|
||||
|
||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
|
||||
*ppWriter = pWriter;
|
||||
return 0;
|
||||
}
|
||||
|
@ -378,8 +394,9 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
item.type = pHdr->type;
|
||||
taosArrayPush(pHandle->pFileList, &item);
|
||||
|
||||
char fullname[256] = {0};
|
||||
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name;
|
||||
char fullname[256] = {0};
|
||||
|
||||
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
|
||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
|
||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
|
||||
|
||||
|
@ -406,6 +423,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
|
|||
qDebug("stream snap get file list, %s", buf);
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
|
||||
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
|
||||
taosMemoryFree(item->name);
|
||||
|
|
Loading…
Reference in New Issue