diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 83fdd71c44..79b633e9d7 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -135,7 +135,9 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); + sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received"); + taosMkDir(tdir); + SStreamSnapWriter* pSnapWriter = NULL; if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { goto _err; @@ -143,6 +145,8 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir); pWriter->pWriterImpl = pSnapWriter; + + *ppWriter = pWriter; return code; _err: tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 5ae0f6f30e..0bf029f574 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -361,8 +361,6 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->currFileIdx = 0; pHandle->offset = 0; - SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE); *ppWriter = pWriter; return 0; } @@ -373,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; SBanckendFile* pFile = pHandle->pBackendFile; - SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx); - if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) { + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + + if (pHandle->fd == NULL) { + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } + } + + if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { + int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); + if (bytes != pHdr->size) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); return code; } - pHandle->offset += pHdr->size; + pHandle->offset += bytes; } else { taosCloseFile(&pHandle->fd); pHandle->offset = 0; @@ -392,7 +401,12 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa taosArrayPush(pHandle->pFileList, &item); SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_WRITE); + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); pHandle->offset += pHdr->size;