diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b08832ed46..2ee1b5c807 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -14,13 +14,13 @@ */ #include "os.h" -#include "ttimer.h" #include "streamState.h" #include "tdatablock.h" #include "tdbInt.h" #include "tmsg.h" #include "tmsgcb.h" #include "tqueue.h" +#include "ttimer.h" #ifdef __cplusplus extern "C" { @@ -49,7 +49,7 @@ enum { TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause - TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore + TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK_READY, }; @@ -322,8 +322,8 @@ struct SStreamTask { int32_t nextCheckId; SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; - SArray* pReadyMsgList; // SArray - TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ + SArray* pReadyMsgList; // SArray + TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ SArray* pUpstreamInfoList; // output @@ -388,11 +388,13 @@ typedef struct SStreamMeta { tmr_h hbTmr; SMgmtInfo mgmtInfo; - int32_t chkptNotReadyTasks; - SArray* checkpointSaved; - SArray* checkpointInUse; - int32_t checkpointCap; - SRWLatch checkpointDirLock; + int32_t chkptNotReadyTasks; + + int64_t chkpId; + SArray* chkpSaved; + SArray* chkpInUse; + int32_t chkpCap; + SRWLatch chkpDirLock; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -418,7 +420,7 @@ typedef struct { typedef struct { int32_t type; - int64_t stage; //nodeId from upstream task + int64_t stage; // nodeId from upstream task int64_t streamId; int32_t taskId; int32_t srcVgId; @@ -566,7 +568,7 @@ typedef struct SNodeUpdateInfo { typedef struct SStreamTaskNodeUpdateMsg { int64_t streamId; int32_t taskId; - SArray* pNodeList; // SArray + SArray* pNodeList; // SArray } SStreamTaskNodeUpdateMsg; int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); @@ -613,7 +615,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); @@ -694,6 +696,9 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); +// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); + int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta); diff --git a/include/util/tarray.h b/include/util/tarray.h index f56c9e3a17..4d9c930521 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -200,8 +200,11 @@ void taosArrayClear(SArray* pArray); * @param pArray * @param fp */ + void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); +void taosArrayClearP(SArray* pArray, void (*fp)(void*)); + void* taosArrayDestroy(SArray* pArray); void taosArrayDestroyP(SArray* pArray, FDelete fp); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cb60725110..01c17d8e04 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -48,6 +48,7 @@ typedef struct SStreamVnodeRevertIndex { static int32_t mndNodeCheckSentinel = 0; static SStreamVnodeRevertIndex execNodeList; +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); @@ -1060,7 +1061,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } } - pStream->checkpointFreq = checkpointId; pStream->checkpointId = checkpointId; pStream->checkpointFreq = taosGetTimestampMs(); atomic_store_64(&pStream->currentTick, 0); @@ -1097,7 +1097,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME); if (pTrans == NULL) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d84cc26d9f..3112e59985 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,7 +93,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_BUFPOOL_SEGMENTS 3 -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" // vnd.h @@ -331,6 +331,10 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData); int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId); + +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter); + // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index f7bae25043..2c3a12b50b 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -41,13 +41,17 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + + SStreamMeta* meta = pTq->pStreamMeta; pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; + int64_t chkpId = meta ? meta->chkpId : 0; + SStreamSnapReader* pSnapReader = NULL; - sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints"); - if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { + + if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; @@ -131,14 +135,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); + sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received"); + taosMkDir(tdir); + SStreamSnapWriter* pSnapWriter = NULL; if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { goto _err; } - tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir); pWriter->pWriterImpl = pSnapWriter; + + *ppWriter = pWriter; return code; _err: tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); @@ -154,10 +162,17 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) taosMemoryFree(pWriter); return code; } - -int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); - code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { + int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); + if (code == 0) { + code = streamStateLoadTasks(pWriter); + } return code; } + +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); } + +int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { + tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); + return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); +} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index a10b38eb64..abd6ea00a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -248,31 +248,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } } } - // if (!pReader->streamStateDone) { - // if (pReader->pStreamStateReader == NULL) { - // code = - // streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, - // &pReader->pStreamStateReader); - // if (code) { - // pReader->streamStateDone = 1; - // pReader->pStreamStateReader = NULL; - // goto _err; - // } - // } - // code = streamStateSnapRead(pReader->pStreamStateReader, ppData); - // if (code) { - // goto _err; - // } else { - // if (*ppData) { - // goto _exit; - // } else { - // pReader->streamStateDone = 1; - // code = streamStateSnapReaderClose(pReader->pStreamStateReader); - // if (code) goto _err; - // pReader->pStreamStateReader = NULL; - // } - // } - // } + if (!pReader->streamStateDone) { + if (pReader->pStreamStateReader == NULL) { + code = + streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader); + if (code) { + pReader->streamStateDone = 1; + pReader->pStreamStateReader = NULL; + goto _err; + } + } + code = streamStateSnapRead(pReader->pStreamStateReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->streamStateDone = 1; + code = streamStateSnapReaderClose(pReader->pStreamStateReader); + if (code) goto _err; + pReader->pStreamStateReader = NULL; + } + } + } // RSMA ============== if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { @@ -419,6 +418,9 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (pWriter->pStreamStateWriter) { code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); if (code) goto _exit; + + code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0); + if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { @@ -527,7 +529,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); if (code) goto _err; } break; - case SNAP_DATA_STREAM_STATE: { + case SNAP_DATA_STREAM_STATE_BACKEND: { if (pWriter->pStreamStateWriter == NULL) { code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); if (code) goto _err; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c3c2847a3f..920ec8a254 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,6 +20,27 @@ #include "tcommon.h" #include "tref.h" +typedef struct { + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + + char* buf; + int32_t len; + + // ping-pong buf + SHashObj* pSstTbl[2]; + int8_t idx; + + SArray* pAdd; + SArray* pDel; + int8_t update; +} SBackendManager; + typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -127,6 +148,218 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); +SBackendManager* bkdMgtCreate(char* path) { + SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); + p->curChkpId = 0; + p->preCkptId = 0; + p->pSST = taosArrayInit(64, sizeof(void*)); + p->path = taosStrdup(path); + p->len = strlen(path) + 128; + p->buf = taosMemoryCalloc(1, p->len); + + p->idx = 0; + p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + + p->pAdd = taosArrayInit(64, sizeof(void*)); + p->pDel = taosArrayInit(64, sizeof(void*)); + p->update = 0; + return p; +} +void bkdMgtDestroy(SBackendManager* bm) { + if (bm == NULL) return; + + taosMemoryFree(bm->buf); + taosMemoryFree(bm->path); + + taosArrayDestroyP(bm->pSST, taosMemoryFree); + taosArrayDestroyP(bm->pAdd, taosMemoryFree); + taosArrayDestroyP(bm->pDel, taosMemoryFree); + + taosHashCleanup(bm->pSstTbl[0]); + taosHashCleanup(bm->pSstTbl[1]); + taosMemoryFree(bm); +} + +int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { + int32_t code = 0; + size_t len = 0; + void* pIter = taosHashIterate(p2, NULL); + while (pIter) { + char* name = taosHashGetKey(pIter, &len); + if (!taosHashGet(p1, name, len)) { + char* p = taosStrdup(name); + taosArrayPush(diff, &p); + } + pIter = taosHashIterate(p2, pIter); + } + return code; +} +int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { + int32_t code = 0; + + code = compareHashTableImpl(p1, p2, add); + code = compareHashTableImpl(p2, p1, del); + + return code; +} +int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + const char* pCurrent = "CURRENT"; + int32_t currLen = strlen(pCurrent); + + const char* pManifest = "MANIFEST-"; + int32_t maniLen = strlen(pManifest); + + const char* pSST = ".sst"; + int32_t sstLen = strlen(pSST); + + memset(bm->buf, 0, bm->len); + sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + + TdDirPtr pDir = taosOpenDir(bm->buf); + TdDirEntryPtr de = NULL; + int8_t dummy = 0; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + taosMemoryFreeClear(bm->pCurrent); + bm->pCurrent = taosStrdup(name); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + + if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + taosMemoryFreeClear(bm->pManifest); + bm->pManifest = taosStrdup(name); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { + char* p = taosStrdup(name); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + } + if (bm->init == 0) { + bm->preCkptId = -1; + bm->curChkpId = chkpId; + bm->init = 1; + + void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL); + while (pIter) { + size_t len; + char* name = taosHashGetKey(pIter, &len); + if (name != NULL && len != 0) { + taosArrayPush(bm->pAdd, &name); + } + pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); + } + if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; + } else { + int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + if (code != 0) { + // dead code + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + taosHashClear(bm->pSstTbl[1 - bm->idx]); + bm->update = 0; + + return code; + } + + bm->preCkptId = bm->curChkpId; + bm->curChkpId = chkpId; + if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { + bm->update = 0; + } + } + taosHashClear(bm->pSstTbl[bm->idx]); + bm->idx = 1 - bm->idx; + + return 0; +} + +int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { + int32_t code = 0; + int32_t len = bm->len + 128; + + char* dstBuf = taosMemoryCalloc(1, len); + char* srcBuf = taosMemoryCalloc(1, len); + + char* srcDir = taosMemoryCalloc(1, len); + char* dstDir = taosMemoryCalloc(1, len); + + sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId); + sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname); + + if (!taosDirExist(srcDir)) { + return 0; + } + + code = taosMkDir(dstDir); + if (code != 0) { + return code; + } + + // clear current file + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + taosRemoveFile(dstBuf); + + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + taosRemoveFile(dstBuf); + + // add file to $name dir + for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { + memset(dstBuf, 0, len); + memset(srcBuf, 0, len); + + char* filename = taosArrayGetP(bm->pAdd, i); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + + taosCopyFile(srcBuf, dstBuf); + } + // del file in $name + for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { + memset(dstBuf, 0, len); + memset(srcBuf, 0, len); + + char* filename = taosArrayGetP(bm->pDel, i); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + taosRemoveFile(dstBuf); + } + + // copy current file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent); + taosCopyFile(srcBuf, dstBuf); + + // copy manifest file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); + taosCopyFile(srcBuf, dstBuf); + + // clear delta data buf + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + + taosMemoryFree(srcBuf); + taosMemoryFree(dstBuf); + taosMemoryFree(srcDir); + taosMemoryFree(dstDir); + return code; +} + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc}, @@ -151,34 +384,33 @@ int32_t copyFiles(const char* src, const char* dst) { // opt later, just hard link int32_t sLen = strlen(src); int32_t dLen = strlen(dst); - char* absSrcPath = taosMemoryCalloc(1, sLen + 64); - char* absDstPath = taosMemoryCalloc(1, dLen + 64); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); TdDirPtr pDir = taosOpenDir(src); if (pDir == NULL) return 0; TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - sprintf(absSrcPath, "%s/%s", src, name); - sprintf(absDstPath, "%s/%s", dst, name); + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); if (!taosDirEntryIsDir(de)) { - code = taosCopyFile(absSrcPath, absDstPath); + code = taosCopyFile(srcName, dstName); if (code == -1) { goto _err; } } - memset(absSrcPath, 0, sLen + 64); - memset(absDstPath, 0, dLen + 64); + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); } _err: - taosMemoryFreeClear(absSrcPath); - taosMemoryFreeClear(absDstPath); + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); taosCloseDir(&pDir); return code >= 0 ? 0 : -1; } @@ -186,12 +418,16 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; - // chkpId = 0; + /*param@1: checkpointId dir + param@2: state + copy checkpointdir's file to state dir + opt to set hard link to previous file + */ char* state = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(state, "%s/%s", path, "state"); + sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (chkpId != 0) { char* chkp = taosMemoryCalloc(1, strlen(path) + 64); - sprintf(chkp, "%s/%s/checkpoint%" PRId64 "", path, "checkpoints", chkpId); + sprintf(chkp, "%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (taosIsDir(chkp) && isValidCheckpoint(chkp)) { if (taosIsDir(state)) { // remove dir if exists @@ -216,6 +452,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } + void* streamBackendInit(const char* streamPath, int64_t chkpId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); @@ -295,12 +532,14 @@ _EXIT: } void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; - void* pIter = taosHashIterate(pHandle->cfInst, NULL); + + void* pIter = taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; destroyRocksdbCfInst(inst); pIter = taosHashIterate(pHandle->cfInst, pIter); } + taosHashCleanup(pHandle->cfInst); if (pHandle->db) { @@ -399,74 +638,75 @@ void streamBackendHandleCleanup(void* arg) { int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); int64_t tc = 0; - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); if (sz <= 0) { + taosWUnLockLatch(&pMeta->chkpDirLock); return -1; } else { - tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); + tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved); } - taosArrayPush(pMeta->checkpointInUse, &tc); + taosArrayPush(pMeta->chkpInUse, &tc); *checkpoint = tc; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); return 0; } /* * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| - * checkpointInUse: |--cp2--|--cp4--| - * checkpointInUse is doing translation, cannot del until + * chkpInUse: |--cp2--|--cp4--| + * chkpInUse is doing translation, cannot del until * replication is finished */ int32_t delObsoleteCheckpoint(void* arg, const char* path) { SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); - SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); - SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDel = taosArrayInit(10, sizeof(int64_t)); + SArray* chkpDup = taosArrayInit(10, sizeof(int64_t)); int64_t minId = 0; - if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) { - minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0); + if (taosArrayGetSize(pMeta->chkpInUse) >= 1) { + minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); - for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); if (id >= minId) { - taosArrayPush(checkpointDup, &id); + taosArrayPush(chkpDup, &id); } else { - taosArrayPush(checkpointDel, &id); + taosArrayPush(chkpDel, &id); } } } else { - int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); - int32_t dsz = sz - pMeta->checkpointCap; // del size + int32_t sz = taosArrayGetSize(pMeta->chkpSaved); + int32_t dsz = sz - pMeta->chkpCap; // del size for (int i = 0; i < dsz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDel, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDel, &id); } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - taosArrayPush(checkpointDup, &id); + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); + taosArrayPush(chkpDup, &id); } } - taosArrayDestroy(pMeta->checkpointSaved); - pMeta->checkpointSaved = checkpointDup; + taosArrayDestroy(pMeta->chkpSaved); + pMeta->chkpSaved = chkpDup; - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWUnLockLatch(&pMeta->chkpDirLock); - for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) { - int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i); + for (int i = 0; i < taosArrayGetSize(chkpDel); i++) { + int64_t id = *(int64_t*)taosArrayGet(chkpDel, i); char tbuf[256] = {0}; - sprintf(tbuf, "%s/checkpoint%" PRId64 "", path, id); + sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id); if (taosIsDir(tbuf)) { taosRemoveDir(tbuf); } } - taosArrayDestroy(checkpointDel); + taosArrayDestroy(chkpDel); return 0; } @@ -481,16 +721,21 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { int32_t code = 0; int32_t len = strlen(pMeta->path) + 30; - char* checkpointPath = taosMemoryCalloc(1, len); - sprintf(checkpointPath, "%s/%s", pMeta->path, "checkpoints"); + char* chkpPath = taosMemoryCalloc(1, len); + sprintf(chkpPath, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); - if (!taosDirExist(checkpointPath)) { + if (!taosDirExist(chkpPath)) { // no checkpoint, nothing to load + taosMemoryFree(chkpPath); return 0; } - TdDirPtr pDir = taosOpenDir(checkpointPath); - if (pDir == NULL) return 0; + TdDirPtr pDir = taosOpenDir(chkpPath); + + if (pDir == NULL) { + taosMemoryFree(chkpPath); + return 0; + } TdDirEntryPtr de = NULL; SArray* suffix = taosArrayInit(4, sizeof(int64_t)); @@ -514,12 +759,12 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { for (int i = 0; i < taosArrayGetSize(suffix); i++) { int64_t id = *(int64_t*)taosArrayGet(suffix, i); - taosArrayPush(pMeta->checkpointSaved, &id); + taosArrayPush(pMeta->chkpSaved, &id); } taosArrayDestroy(suffix); taosCloseDir(&pDir); - taosMemoryFree(checkpointPath); + taosMemoryFree(chkpPath); return 0; } int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { @@ -529,7 +774,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int32_t code = -1; char path[256] = {0}; - sprintf(path, "%s/%s", pMeta->path, "checkpoints"); + sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); code = taosMulModeMkDir(path, 0755); if (code != 0) { qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); @@ -566,9 +811,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } rocksdb_checkpoint_object_destroy(cp); } - taosWLockLatch(&pMeta->checkpointDirLock); - taosArrayPush(pMeta->checkpointSaved, &checkpointId); - taosWUnLockLatch(&pMeta->checkpointDirLock); + taosWLockLatch(&pMeta->chkpDirLock); + taosArrayPush(pMeta->chkpSaved, &checkpointId); + taosWUnLockLatch(&pMeta->chkpDirLock); delObsoleteCheckpoint(arg, path); @@ -618,9 +863,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, return ret; } } -int streamStateValueIsStale(char* vv) { +int streamStateValueIsStale(char* v) { int64_t ts = 0; - taosDecodeFixedI64(vv, &ts); + taosDecodeFixedI64(v, &ts); return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; } int iterValueIsStale(rocksdb_iterator_t* iter) { @@ -726,8 +971,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); } -int stateSessionKeyEncode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyEncode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); @@ -735,8 +980,8 @@ int stateSessionKeyEncode(void* ses, char* buf) { len += taosEncodeFixedI64((void**)&buf, sess->opNum); return len; } -int stateSessionKeyDecode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyDecode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; char* p = buf; @@ -951,33 +1196,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { - if (dest != NULL) *dest = NULL; - return -1; + goto _EXCEPT; } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { - if (dest != NULL) *dest = NULL; qError("vlen: %d, read len: %d", vlen, key.len); - return -1; + goto _EXCEPT; } + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); - if (key.len == 0) { - key.data = NULL; - } else { - p = taosDecodeBinary(p, (void**)&(key.data), key.len); - } - - if (ttl != NULL) { - int64_t now = taosGetTimestampMs(); - *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; - } - if (dest != NULL) { - *dest = key.data; - } else { - taosMemoryFree(key.data); - } + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; + +_EXCEPT: + if (dest != NULL) *dest = NULL; + if (ttl != NULL) *ttl = 0; + return -1; } const char* compareDefaultName(void* arg) { @@ -1097,12 +1332,14 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t cfHandle[0] = NULL; } rocksdb_options_destroy(cfOpts[0]); + handle->db = db; static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < nCf; i++) { char* cf = cfs[i]; - if (i == 0) continue; + if (i == 0) continue; // skip default column family, not set opt + char funcname[64] = {0}; if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { char idstr[128] = {0}; @@ -1135,9 +1372,9 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pHandle[idx] = cfHandle[i]; } } - void** pIter = taosHashIterate(handle->cfInst, NULL); + void* pIter = taosHashIterate(handle->cfInst, NULL); while (pIter) { - RocksdbCfInst* inst = *pIter; + RocksdbCfInst* inst = *(RocksdbCfInst**)pIter; for (int i = 0; i < cfLen; i++) { if (inst->cfOpt[i] == NULL) { @@ -1178,8 +1415,8 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { // taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); - taosThreadMutexLock(&handle->cfMutex); + taosThreadMutexLock(&handle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; @@ -1470,10 +1707,11 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); @@ -1508,9 +1746,9 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { qDebug("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel_rocksdb(pState, &tmp); return code; @@ -1631,6 +1869,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + if (code != 0) { + return NULL; + } char buf[128] = {0}; int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); @@ -1667,9 +1908,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1727,23 +1969,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo SSessionKey resKey = *key; void* tmp = NULL; int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0) { - if (pVLen != NULL) *pVLen = vLen; - if (key->win.skey != resKey.win.skey) { - code = -1; - } else { - *key = resKey; - if (pVal != NULL && pVLen != NULL) { - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); - } - } + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + if (code == 0 && key->win.skey != resKey.win.skey) { + *key = resKey; + + if (pVal) { + *pVal = tmp; + tmp = NULL; + }; + if (pVLen) *pVLen = vLen; + } else { + code = -1; } + taosMemoryFree(tmp); - streamStateFreeCur(pCur); - // impl later return code; } @@ -1789,8 +2029,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev - // %s", toString); streamStateFreeCur(pCur); return NULL; } @@ -1808,10 +2046,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; - char buf[128] = {0}; - + char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -1859,6 +2097,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con streamStateFreeCur(pCur); return NULL; } + size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -2355,9 +2594,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { } int32_t streamDefaultIterValid_rocksdb(void* iter) { SStreamStateCur* pCur = iter; - bool val = rocksdb_iter_valid(pCur->iter); - - return val ? 1 : 0; + return rocksdb_iter_valid(pCur->iter) ? 1 : 0; } void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { SStreamStateCur* pCur = iter; @@ -2373,13 +2610,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { } char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { SStreamStateCur* pCur = iter; - int32_t vlen = 0; - char* dst = NULL; - const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); - if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { + char* ret = NULL; + + int32_t vlen = 0; + const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); + *len = decodeValueFunc((void*)val, vlen, NULL, &ret); + if (*len < 0) { return NULL; } - return dst; + + return ret; } // batch func void* streamStateCreateBatch() { @@ -2396,8 +2636,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - int i = streamStateGetCfIdx(pState, cfKeyName); + int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfKeyName); return -1; @@ -2434,6 +2674,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb if (tmpBuf == NULL) { taosMemoryFree(ttlV); } + { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 799dc96ec7..e2e95230c6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -48,22 +48,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return NULL; } - int32_t len = strlen(path) + 20; - char* streamPath = taosMemoryCalloc(1, len); - sprintf(streamPath, "%s/%s", path, "stream"); - pMeta->path = taosStrdup(streamPath); + char* tpath = taosMemoryCalloc(1, strlen(path) + 64); + sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); + pMeta->path = tpath; + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } - - memset(streamPath, 0, len); - sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(streamPath, 0755); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } @@ -100,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->checkpointCap = 8; - taosInitRWLatch(&pMeta->checkpointDirLock); + pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); + pMeta->chkpCap = 8; + taosInitRWLatch(&pMeta->chkpDirLock); int64_t chkpId = streamGetLatestCheckpointId(pMeta); + pMeta->chkpId = chkpId; pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); if (pMeta->streamBackend == NULL) { @@ -118,9 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TAOS_SYSTEM_ERROR(code); goto _err; } - - taosMemoryFree(streamPath); - taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); @@ -128,7 +117,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return pMeta; _err: - taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -141,6 +129,66 @@ _err: return NULL; } +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { + // stop all running tasking and reopen later + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->schedTimer) { + taosTmrStop(pTask->schedTimer); + pTask->schedTimer = NULL; + } + + if (pTask->launchTaskTimer) { + taosTmrStop(pTask->launchTaskTimer); + pTask->launchTaskTimer = NULL; + } + + tFreeStreamTask(pTask); + } + + // close stream backend + streamBackendCleanup(pMeta->streamBackend); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + pMeta->streamBackendRid = -1; + pMeta->streamBackend = NULL; + + char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(defaultPath); + + char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); + + if (taosRenameFile(newPath, defaultPath) < 0) { + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); + return -1; + } + + pMeta->streamBackend = streamBackendInit(pMeta->path, 0); + if (pMeta->streamBackend == NULL) { + return -1; + } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + + taosHashClear(pMeta->pTasks); + + taosArrayClear(pMeta->pTaskList); + + taosHashClear(pMeta->pTaskBackendUnique); + + taosArrayClear(pMeta->chkpSaved); + + taosArrayClear(pMeta->chkpInUse); + + return 0; +} void streamMetaClose(SStreamMeta* pMeta) { qDebug("start to close stream meta"); if (pMeta == NULL) { @@ -168,8 +216,8 @@ void streamMetaClose(SStreamMeta* pMeta) { taosThreadMutexDestroy(&pMeta->backendMutex); taosHashCleanup(pMeta->pTaskBackendUnique); - taosArrayDestroy(pMeta->checkpointSaved); - taosArrayDestroy(pMeta->checkpointInUse); + taosArrayDestroy(pMeta->chkpSaved); + taosArrayDestroy(pMeta->chkpInUse); taosMemoryFree(pMeta); qDebug("end to close stream meta"); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 1a66c00389..0bf029f574 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -64,6 +64,7 @@ struct SStreamSnapReader { int64_t sver; int64_t ever; SStreamSnapHandle handle; + int64_t checkpointId; }; struct SStreamSnapWriter { void* pMeta; @@ -78,31 +79,60 @@ const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; static int64_t kBlockSize = 64 * 1024; -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path); +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId); void streamSnapHandleDestroy(SStreamSnapHandle* handle); // static void streamBuildFname(char* path, char* file, char* fullname) #define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \ do { \ - sprintf(fullname, "%s/%s", path, file); \ + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \ } while (0) -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { +int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { + int ret = 0; + + char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); + + ret = taosStatFile(fullname, sz, NULL); + taosMemoryFree(fullname); + + return ret; +} + +TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { + char fullname[256] = {0}; + STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); + return taosOpenFile(fullname, opt); +} + +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { // impl later + int len = strlen(path); + char* tdir = taosMemoryCalloc(1, len + 128); + memcpy(tdir, path, len); + + if (chkpId != 0) { + sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, + chkpId); + } else { + sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); + } int32_t code = 0; - TdDirPtr pDir = taosOpenDir(path); + TdDirPtr pDir = taosOpenDir(tdir); if (NULL == pDir) { + qError("stream-state failed to open %s", tdir); goto _err; } SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); pHandle->pBackendFile = pFile; - pHandle->checkpointId = 0; + pHandle->checkpointId = chkpId; pHandle->seraial = 0; - pFile->path = taosStrdup(path); + pFile->path = tdir; pFile->pSst = taosArrayInit(16, sizeof(void*)); TdDirEntryPtr pDirEntry; @@ -117,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { continue; } if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { - pFile->pMainfest = taosStrdup(name); + pFile->pOptions = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && @@ -134,7 +164,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { taosCloseDir(&pDir); if (pFile->pCurrent == NULL) { + qError("stream-state failed to open %s, reason: no valid file", tdir); code = -1; + tdir = NULL; goto _err; } SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); @@ -143,50 +175,45 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { // current item.name = pFile->pCurrent; item.type = ROCKSDB_CURRENT_TYPE; - taosStatFile(pFile->pCurrent, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); + // mainfest item.name = pFile->pMainfest; item.type = ROCKSDB_MAINFEST_TYPE; - taosStatFile(pFile->pMainfest, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); + // options item.name = pFile->pOptions; item.type = ROCKSDB_OPTIONS_TYPE; - taosStatFile(pFile->pOptions, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); // sst for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; - taosStatFile(sst, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); } // meta item.name = pFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; - taosStatFile(pFile->pCheckpointMeta, &item.size, NULL); - taosArrayPush(list, &item); + if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) { + taosArrayPush(list, &item); + } pHandle->pBackendFile = pFile; pHandle->currFileIdx = 0; pHandle->pFileList = list; - - char fullname[256] = {0}; - char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname); - - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); - if (pHandle->fd == NULL) { - goto _err; - } pHandle->seraial = 0; pHandle->offset = 0; return 0; _err: streamSnapHandleDestroy(pHandle); + taosMemoryFreeClear(tdir); code = -1; return code; @@ -194,31 +221,33 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { SBanckendFile* pFile = handle->pBackendFile; - taosMemoryFree(pFile->pCheckpointMeta); - taosMemoryFree(pFile->pCurrent); - taosMemoryFree(pFile->pMainfest); - taosMemoryFree(pFile->pOptions); - taosMemoryFree(pFile->path); - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - char* sst = taosArrayGetP(pFile->pSst, i); - taosMemoryFree(sst); + if (pFile) { + taosMemoryFree(pFile->pCheckpointMeta); + taosMemoryFree(pFile->pCurrent); + taosMemoryFree(pFile->pMainfest); + taosMemoryFree(pFile->pOptions); + taosMemoryFree(pFile->path); + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + char* sst = taosArrayGetP(pFile->pSst, i); + taosMemoryFree(sst); + } + taosArrayDestroy(pFile->pSst); + taosMemoryFree(pFile); } - taosArrayDestroy(pFile->pSst); - taosMemoryFree(pFile); taosArrayDestroy(handle->pFileList); taosCloseFile(&handle->fd); return; } -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) { +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) { // impl later SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); if (pReader == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - // const char* path = NULL; - if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { + + if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) { taosMemoryFree(pReader); return -1; } @@ -242,17 +271,32 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + if (pHandle->fd == NULL) { + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { + // finish + *ppData = NULL; + *size = 0; + return 0; + } else { + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); + qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + } + } + + qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, + qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type, tstrerror(code)); - return code; - // handle later return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize + qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); @@ -260,6 +304,11 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si pHandle->currFileIdx += 1; } } else { + qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); + taosCloseFile(&pHandle->fd); + pHandle->offset = 0; + pHandle->currFileIdx += 1; + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { // finish *ppData = NULL; @@ -267,12 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - char fullname[256] = {0}; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname); - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; + + qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -310,7 +360,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->pFileList = list; pHandle->currFileIdx = 0; pHandle->offset = 0; - pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE); + *ppWriter = pWriter; return 0; } @@ -321,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; SBanckendFile* pFile = pHandle->pBackendFile; - SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx); - if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) { + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + + if (pHandle->fd == NULL) { + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } + } + + if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { + int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); + if (bytes != pHdr->size) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); return code; } - pHandle->offset += pHdr->size; + pHandle->offset += bytes; } else { taosCloseFile(&pHandle->fd); pHandle->offset = 0; @@ -339,10 +400,13 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa item.type = pHdr->type; taosArrayPush(pHandle->pFileList, &item); - char fullname[256] = {0}; - char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); - pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); pHandle->offset += pHdr->size; @@ -367,6 +431,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { qDebug("stream snap get file list, %s", buf); taosMemoryFree(buf); } + for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { SBackendFileItem* item = taosArrayGet(handle->pFileList, i); taosMemoryFree(item->name); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index c0dd40bb4d..b1ed36de18 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -108,7 +108,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz SStreamTask* pStreamTask = pTask; char statePath[1024]; if (!specPath) { - sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); + sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId); } else { memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); diff --git a/source/libs/tdb/test/tdbPageRecycleTest.cpp b/source/libs/tdb/test/tdbPageRecycleTest.cpp index 4d7b314917..d740bd0f94 100644 --- a/source/libs/tdb/test/tdbPageRecycleTest.cpp +++ b/source/libs/tdb/test/tdbPageRecycleTest.cpp @@ -804,7 +804,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) { // sprintf(&key[count - 2], "%c", i); key[count - 2] = '0' + i; - ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn); + ret = tdbTbInsert(pDb, key, count, NULL, 0, txn); GTEST_ASSERT_EQ(ret, 0); } } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index f5e15e7436..8e7c0f9584 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -319,7 +319,7 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { if (NULL == pSrc) { return NULL; } - + if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } +void taosArrayClearP(SArray* pArray, void (*fp)(void*)) { + // if (pArray == NULL) return; + // if (fp == NULL) { + // pArray->size = 0; + // return; + // } + + // for (int32_t i = 0; i < pArray->size; ++i) { + // fp(TARRAY_GET_ELEM(pArray, i)); + // } + if (pArray) { + for (int32_t i = 0; i < pArray->size; i++) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + } + taosArrayClear(pArray); +} void* taosArrayDestroy(SArray* pArray) { if (pArray) {