fix stream state transfer
This commit is contained in:
parent
0c2e86f416
commit
22c85fcbad
|
@ -89,7 +89,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
|
||||||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
|
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
int32_t getFileSize(char* path, char* name, int64_t* sz) {
|
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
||||||
|
@ -100,6 +100,13 @@ int32_t getFileSize(char* path, char* name, int64_t* sz) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
|
||||||
|
char fullname[256] = {0};
|
||||||
|
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
|
||||||
|
return taosOpenFile(fullname, opt);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
|
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
|
||||||
// impl later
|
// impl later
|
||||||
int len = strlen(path);
|
int len = strlen(path);
|
||||||
|
@ -140,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
||||||
pFile->pMainfest = taosStrdup(name);
|
pFile->pOptions = taosStrdup(name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
|
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
|
||||||
|
@ -168,50 +175,39 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
// current
|
// current
|
||||||
item.name = pFile->pCurrent;
|
item.name = pFile->pCurrent;
|
||||||
item.type = ROCKSDB_CURRENT_TYPE;
|
item.type = ROCKSDB_CURRENT_TYPE;
|
||||||
getFileSize(pFile->path, item.name, &item.size);
|
streamGetFileSize(pFile->path, item.name, &item.size);
|
||||||
taosArrayPush(list, &item);
|
taosArrayPush(list, &item);
|
||||||
|
|
||||||
// mainfest
|
// mainfest
|
||||||
item.name = pFile->pMainfest;
|
item.name = pFile->pMainfest;
|
||||||
item.type = ROCKSDB_MAINFEST_TYPE;
|
item.type = ROCKSDB_MAINFEST_TYPE;
|
||||||
getFileSize(pFile->path, item.name, &item.size);
|
streamGetFileSize(pFile->path, item.name, &item.size);
|
||||||
taosArrayPush(list, &item);
|
taosArrayPush(list, &item);
|
||||||
|
|
||||||
// options
|
// options
|
||||||
item.name = pFile->pOptions;
|
item.name = pFile->pOptions;
|
||||||
item.type = ROCKSDB_OPTIONS_TYPE;
|
item.type = ROCKSDB_OPTIONS_TYPE;
|
||||||
getFileSize(pFile->path, item.name, &item.size);
|
streamGetFileSize(pFile->path, item.name, &item.size);
|
||||||
taosArrayPush(list, &item);
|
taosArrayPush(list, &item);
|
||||||
// sst
|
// sst
|
||||||
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
|
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
|
||||||
char* sst = taosArrayGetP(pFile->pSst, i);
|
char* sst = taosArrayGetP(pFile->pSst, i);
|
||||||
item.name = sst;
|
item.name = sst;
|
||||||
item.type = ROCKSDB_SST_TYPE;
|
item.type = ROCKSDB_SST_TYPE;
|
||||||
getFileSize(pFile->path, item.name, &item.size);
|
streamGetFileSize(pFile->path, item.name, &item.size);
|
||||||
taosArrayPush(list, &item);
|
taosArrayPush(list, &item);
|
||||||
}
|
}
|
||||||
// meta
|
// meta
|
||||||
item.name = pFile->pCheckpointMeta;
|
item.name = pFile->pCheckpointMeta;
|
||||||
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
|
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
|
||||||
getFileSize(pFile->path, item.name, &item.size);
|
if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
|
||||||
taosArrayPush(list, &item);
|
taosArrayPush(list, &item);
|
||||||
|
}
|
||||||
|
|
||||||
pHandle->pBackendFile = pFile;
|
pHandle->pBackendFile = pFile;
|
||||||
|
|
||||||
pHandle->currFileIdx = 0;
|
pHandle->currFileIdx = 0;
|
||||||
pHandle->pFileList = list;
|
pHandle->pFileList = list;
|
||||||
|
|
||||||
char fullname[256] = {0};
|
|
||||||
char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
|
|
||||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
|
|
||||||
|
|
||||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
|
|
||||||
if (pHandle->fd == NULL) {
|
|
||||||
qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno));
|
|
||||||
tdir = NULL;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0);
|
|
||||||
pHandle->seraial = 0;
|
pHandle->seraial = 0;
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -275,29 +271,35 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
|
|
||||||
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||||
|
|
||||||
qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
if (pHandle->fd == NULL) {
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
|
||||||
|
// finish
|
||||||
|
*ppData = NULL;
|
||||||
|
*size = 0;
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
||||||
|
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name,
|
||||||
|
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name,
|
||||||
|
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
||||||
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
|
qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d",
|
|
||||||
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
|
||||||
return code;
|
|
||||||
// handle later
|
|
||||||
return -1;
|
return -1;
|
||||||
} else if (nread > 0 && nread <= kBlockSize) {
|
} else if (nread > 0 && nread <= kBlockSize) {
|
||||||
// left bytes less than kBlockSize
|
// left bytes less than kBlockSize
|
||||||
qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx,
|
|
||||||
pHandle->currFileIdx + 1);
|
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
}
|
}
|
||||||
|
@ -314,15 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||||
char fullname[256] = {0};
|
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
|
||||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
|
|
||||||
|
|
||||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
|
|
||||||
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
|
|
||||||
qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
|
||||||
|
@ -361,11 +361,8 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
|
||||||
pHandle->currFileIdx = 0;
|
pHandle->currFileIdx = 0;
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
|
|
||||||
char fullname[256] = {0};
|
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||||
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE);
|
||||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
|
|
||||||
|
|
||||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -394,11 +391,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
item.type = pHdr->type;
|
item.type = pHdr->type;
|
||||||
taosArrayPush(pHandle->pFileList, &item);
|
taosArrayPush(pHandle->pFileList, &item);
|
||||||
|
|
||||||
char fullname[256] = {0};
|
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||||
|
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE);
|
||||||
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
|
|
||||||
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
|
|
||||||
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
|
|
||||||
|
|
||||||
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
|
||||||
pHandle->offset += pHdr->size;
|
pHandle->offset += pHdr->size;
|
||||||
|
|
Loading…
Reference in New Issue