From f9d0874e4525a2adb1cc8dd17ea3a168746da85b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 3 Aug 2023 08:24:50 +0000 Subject: [PATCH] support reopen stream state --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/inc/vnodeInt.h | 8 +- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 3 + source/libs/stream/src/streamBackendRocksdb.c | 67 ++++++++-------- source/libs/stream/src/streamMeta.c | 79 +++++++++++++------ 5 files changed, 102 insertions(+), 60 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index efc7901d4a..51a31f72ed 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -369,7 +369,7 @@ typedef struct SStreamMeta { int32_t chkptNotReadyTasks; - int64_t checkpointId; + int64_t checkpointId; SArray* checkpointSaved; SArray* checkpointInUse; int32_t checkpointCap; @@ -601,6 +601,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId); void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); + void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); @@ -610,6 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); + int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2f37048d5c..cb7af681ee 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,7 +93,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_BUFPOOL_SEGMENTS 3 -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" // vnd.h @@ -181,8 +181,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); -int64_t metaGetTbNum(SMeta *pMeta); -void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); +int64_t metaGetTbNum(SMeta* pMeta); +void metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -329,6 +329,8 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData); int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId); + // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 79b633e9d7..3478928c4f 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -162,6 +162,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) taosMemoryFree(pWriter); return code; } +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { + return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId); +} int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f643e5b1e8..2e9032a47e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -158,7 +158,6 @@ int32_t copyFiles(const char* src, const char* dst) { if (pDir == NULL) return 0; TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; @@ -186,7 +185,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; - // chkpId = 0; + /*param@1: checkpointId dir + param@2: state + copy checkpointdir's file to state dir + opt to set hard link to previous file + */ char* state = taosMemoryCalloc(1, strlen(path) + 32); sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (chkpId != 0) { @@ -1469,10 +1472,11 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); @@ -1507,9 +1511,9 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { qDebug("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel_rocksdb(pState, &tmp); return code; @@ -1523,10 +1527,8 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, uint64_t groupId = pKey->groupId; int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); - if (code == 0) { - if (pKey->groupId == groupId) { - return 0; - } + if (code == 0 && pKey->groupId == groupId) { + return 0; } return -1; } @@ -1628,6 +1630,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + if (code != 0) { + return NULL; + } char buf[128] = {0}; int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); @@ -1664,9 +1669,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1715,8 +1721,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); - if (code == -1) { - } return code; } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -1726,21 +1730,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo SSessionKey resKey = *key; void* tmp = NULL; int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0) { - if (pVLen != NULL) *pVLen = vLen; - if (key->win.skey != resKey.win.skey) { - code = -1; - } else { - *key = resKey; - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); - } + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + if (code == 0 && key->win.skey != resKey.win.skey) { + *key = resKey; + + if (pVal) { + *pVal = tmp; + tmp = NULL; + }; + if (pVLen) *pVLen = vLen; + } else { + code = -1; } + taosMemoryFree(tmp); - streamStateFreeCur(pCur); - // impl later return code; } @@ -1786,8 +1790,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev - // %s", toString); streamStateFreeCur(pCur); return NULL; } @@ -1805,10 +1807,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; - char buf[128] = {0}; - + char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -1856,6 +1858,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con streamStateFreeCur(pCur); return NULL; } + size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -2391,8 +2394,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - int i = streamStateGetCfIdx(pState, cfKeyName); + int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfKeyName); return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4711f4af19..3cf967a219 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,6 +36,14 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); } +int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { + int32_t code = 0; + + int32_t nTask = taosHashGetSize(pMeta->pTasks); + assert(nTask == 0); + + return code; +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -44,22 +52,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return NULL; } - int32_t len = strlen(path) + 20; - char* streamPath = taosMemoryCalloc(1, len); - sprintf(streamPath, "%s/%s", path, "stream"); - pMeta->path = taosStrdup(streamPath); + char* tpath = taosMemoryCalloc(1, strlen(path) + 64); + sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); + pMeta->path = tpath; + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } - - memset(streamPath, 0, len); - sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(streamPath, 0755); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } @@ -90,14 +89,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - // memset(streamPath, 0, len); - // sprintf(streamPath, "%s/%s", pMeta->path, "state"); - // code = taosMulModeMkDir(streamPath, 0755); - // if (code != 0) { - // terrno = TAOS_SYSTEM_ERROR(code); - // goto _err; - // } - pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); @@ -119,15 +110,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - taosMemoryFree(streamPath); - taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); return pMeta; _err: - taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -140,6 +128,49 @@ _err: return NULL; } +void streamMetaReopen(SStreamMeta** ppMeta) { + SStreamMeta* pMeta = *ppMeta; + + SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); + pNewMeta->path = taosStrdup(pMeta->path); + pNewMeta->vgId = pMeta->vgId; + pNewMeta->walScanCounter = 0; + pNewMeta->ahandle = pMeta->ahandle; + pNewMeta->expandFunc = pMeta->expandFunc; + + *ppMeta = pNewMeta; + + streamMetaClose(pMeta); + + // tdbAbort(pMeta->db, pMeta->txn); + // tdbTbClose(pMeta->pTaskDb); + // tdbTbClose(pMeta->pCheckpointDb); + // tdbClose(pMeta->db); + + // void* pIter = NULL; + // while (1) { + // pIter = taosHashIterate(pMeta->pTasks, pIter); + // if (pIter == NULL) { + // break; + // } + + // SStreamTask* pTask = *(SStreamTask**)pIter; + // if (pTask->schedTimer) { + // taosTmrStop(pTask->schedTimer); + // pTask->schedTimer = NULL; + // } + + // if (pTask->launchTaskTimer) { + // taosTmrStop(pTask->launchTaskTimer); + // pTask->launchTaskTimer = NULL; + // } + + // tFreeStreamTask(pTask); + // } + + // taosHashClear(pMeta->pTasks); + // taosRemoveRef(streamBackendId, pMeta->streamBackendRid); +} void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb);