fix stream state transfer

This commit is contained in:
yihaoDeng 2023-07-26 10:16:06 +00:00
parent c61393a7a1
commit 287088ae3f
2 changed files with 27 additions and 9 deletions

View File

@ -135,7 +135,9 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
goto _err;
@ -143,6 +145,8 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir);
pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));

View File

@ -361,8 +361,6 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->currFileIdx = 0;
pHandle->offset = 0;
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE);
*ppWriter = pWriter;
return 0;
}
@ -373,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) {
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code;
}
pHandle->offset += pHdr->size;
pHandle->offset += bytes;
} else {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
@ -392,7 +401,12 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
taosArrayPush(pHandle->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size;