fix transfer crash

This commit is contained in:
yihaoDeng 2023-10-17 21:08:31 +08:00
parent 2004c1a346
commit d67fdb5e46
2 changed files with 17 additions and 7 deletions

View File

@ -139,7 +139,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL;

View File

@ -393,8 +393,10 @@ _NEXT:
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx);
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
@ -422,10 +424,17 @@ _NEXT:
if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) {
// finish
if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) {
// skip to next snap set
pHandle->currIdx += 1;
pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
goto _NEXT;
} else {
*ppData = NULL;
*size = 0;
return 0;
}
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, item->name, TD_FILE_READ);
@ -541,10 +550,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", path, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", snapInfo.chkpId);
if (!taosIsDir(path)) {
code = taosMulMkDir(path);
qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
ASSERT(code == 0);
}