diff --git a/include/libs/stream/streamSnapshot.h b/include/libs/stream/streamSnapshot.h index a47bf9e381..bc5fd63070 100644 --- a/include/libs/stream/streamSnapshot.h +++ b/include/libs/stream/streamSnapshot.h @@ -22,12 +22,12 @@ typedef struct SStreamSnapWriter SStreamSnapWriter; typedef struct SStreamSnapHandle SStreamSnapHandle; typedef struct SStreamSnapBlockHdr SStreamSnapBlockHdr; -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader); +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader); int32_t streamSnapReaderClose(SStreamSnapReader* pReader); int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* size); // SMetaSnapWriter ======================================== -int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter); +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter); int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamSnapWriterClose(SStreamSnapWriter* ppWriter, int8_t rollback); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5b3ee1a588..42d2be04c9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -89,6 +89,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_RSMA0_DIR "tsdb" #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VNODE_TQ_STREAM "stream" #define VNODE_BUFPOOL_SEGMENTS 3 diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index e579231d14..9223a0c211 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -32,6 +32,8 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS int32_t code = 0; SStreamStateReader* pReader = NULL; + char tdir[TSDB_FILENAME_LEN * 2] = {0}; + // alloc pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader)); if (pReader == NULL) { @@ -43,7 +45,8 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pReader->ever = ever; SStreamSnapReader* pSnapReader = NULL; - streamSnapReaderOpen(pTq, sver, ever, &pSnapReader); + sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); + streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader); pReader->pReaderImpl = pSnapReader; @@ -104,6 +107,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS int32_t code = 0; SStreamStateWriter* pWriter; + char tdir[TSDB_FILENAME_LEN * 2] = {0}; // alloc pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { @@ -114,8 +118,9 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; + sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); SStreamSnapWriter* pSnapWriter = NULL; - streamSnapWriterOpen(pTq, sver, ever, &pSnapWriter); + streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter); pWriter->pWriterImpl = pSnapWriter; diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 3232a5152f..296369a80c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -102,6 +102,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); NextTbl: + except = 0; for (;;) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { except = 1; @@ -123,7 +124,10 @@ NextTbl: goto NextTbl; } } - + if (pVal == NULL || vLen == 0) { + *ppData = NULL; + return code; + } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index c6638e2959..24e86a7c27 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -235,6 +235,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) pReader->streamTaskDone = 1; code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); if (code) goto _err; + pReader->pStreamTaskReader = NULL; } } } @@ -254,6 +255,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) pReader->streamStateDone = 1; code = streamStateSnapReaderClose(pReader->pStreamStateReader); if (code) goto _err; + pReader->pStreamStateReader = NULL; } } } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index ff551e6534..a250d7a59b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -216,7 +216,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); if (!pVnode->restored) { - vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, + TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_RESTORING; vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); rpcFreeCont(pMsg->pCont); @@ -279,7 +280,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle); if (!pVnode->restored) { - vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, TMSG_INFO(pMsg->msgType)); + vGError("vgId:%d, msg:%p failed to process since restore not finished, type:%s", vgId, pMsg, + TMSG_INFO(pMsg->msgType)); vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -526,7 +528,7 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p } static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { - SVnode *pVnode = pFsm->data; + SVnode *pVnode = pFsm->data; SyncIndex appliedIdx = -1; do { @@ -660,8 +662,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) { SNodeInfo *pNode = &pCfg->nodeInfo[i]; - vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort, - pNode->nodeId, pNode->clusterId); + vInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pVnode->config.vgId, i, pNode->nodeFqdn, + pNode->nodePort, pNode->nodeId, pNode->clusterId); } pVnode->sync = syncOpen(&syncInfo); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index bb976748ff..5fdc5daa20 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -205,13 +205,13 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { return; } -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapReader** ppReader) { +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) { // impl later SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); if (pReader == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - const char* path = NULL; + // const char* path = NULL; if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { return -1; } @@ -243,7 +243,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return code; // handle later return -1; - } else if (nread <= kBlockSize) { + } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { @@ -254,6 +254,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si } else { if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { // finish + *ppData = NULL; + *size = 0; return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); @@ -278,7 +280,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } // SMetaSnapWriter ======================================== -int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter) { +int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) { // impl later SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter)); if (pWriter == NULL) { @@ -286,7 +288,6 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna } SStreamSnapHandle* pHandle = &pWriter->handle; - const char* path = NULL; SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); pFile->path = taosStrdup(path); SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));