fix stream state transfer
This commit is contained in:
parent
82df77eba8
commit
6a27e81964
|
@ -200,7 +200,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
||||||
tdir = NULL;
|
tdir = NULL;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
qDebug("open file %s, current offset %" PRId64 "", file, (int64_t)0);
|
qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0);
|
||||||
pHandle->seraial = 0;
|
pHandle->seraial = 0;
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -264,33 +264,34 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
|
||||||
|
|
||||||
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
|
||||||
|
|
||||||
qDebug("start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
|
|
||||||
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
|
||||||
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(terrno);
|
||||||
qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
|
qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
qDebug("failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d",
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
return code;
|
return code;
|
||||||
// handle later
|
// handle later
|
||||||
return -1;
|
return -1;
|
||||||
} else if (nread > 0 && nread <= kBlockSize) {
|
} else if (nread > 0 && nread <= kBlockSize) {
|
||||||
// left bytes less than kBlockSize
|
// left bytes less than kBlockSize
|
||||||
qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
|
||||||
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
|
||||||
pHandle->offset += nread;
|
pHandle->offset += nread;
|
||||||
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
if (pHandle->offset >= item->size || nread < kBlockSize) {
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
qDebug("close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, pHandle->currFileIdx + 1);
|
qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx,
|
||||||
|
pHandle->currFileIdx + 1);
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx);
|
qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx);
|
||||||
taosCloseFile(&pHandle->fd);
|
taosCloseFile(&pHandle->fd);
|
||||||
pHandle->offset = 0;
|
pHandle->offset = 0;
|
||||||
pHandle->currFileIdx += 1;
|
pHandle->currFileIdx += 1;
|
||||||
|
|
Loading…
Reference in New Issue