From ee8c09667e955cbb67e12e2cfa3f6ad7570d2360 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Sep 2023 12:53:58 +0000 Subject: [PATCH] refactor backend --- source/libs/stream/src/streamMeta.c | 135 ++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 35 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 652ef7cde7..5c64619ca9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -103,6 +103,76 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { return 0; } +typedef struct { + int64_t chkpId; + char* path; + char* taskId; + + SArray* pChkpSave; + SArray* pChkpInUse; + int8_t chkpCap; + void* backend; + +} StreamMetaTaskState; + +int32_t streamMetaOpenTdb(SStreamMeta* pMeta) { + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { + return -1; + // goto _err; + } + + if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { + return -1; + } + + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { + return -1; + } + return 0; +} + +// +// impl later +// +enum STREAM_STATE_VER { + STREAM_STATA_NO_COMPATIBLE, + STREAM_STATA_COMPATIBLE, + STREAM_STATA_NEED_CONVERT, +}; + +int32_t streamMetaCheckStateVer(SStreamMeta* pMeta) { + TBC* pCur = NULL; + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + // no task info, no stream + return STREAM_STATA_COMPATIBLE; + } + + void* pKey = NULL; + int32_t kLen = 0; + void* pVal = NULL; + int32_t vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + if (pVal == NULL || vLen == 0) { + break; + } + SCheckpointInfo info; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { + continue; + } + tDecoderClear(&decoder); + } + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + + return STREAM_STATA_NEED_CONVERT; +} + +int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { return 0; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -118,15 +188,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); pMeta->path = tpath; - if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { + if (streamMetaOpenTdb(pMeta) < 0) { goto _err; } - if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { - goto _err; - } - - if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { + if (streamMetaDoStateDataConvert(pMeta) < 0) { goto _err; } @@ -134,8 +200,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); - pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); + pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { goto _err; } @@ -153,41 +218,43 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; - // send heartbeat every 5sec. - pMeta->rid = taosAddRef(streamMetaId, pMeta); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + pMeta->rid = taosAddRef(streamMetaId, pMeta); *pRid = pMeta->rid; metaRefMgtAdd(pMeta->vgId, pRid); + // send heartbeat every 5sec. + // TODO: start hb later pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.stopFlag = 0; - pMeta->pTaskBackendUnique = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); - pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); - pMeta->chkpCap = 8; - taosInitRWLatch(&pMeta->chkpDirLock); + // start backend + // taosInitRWLatch(&pMeta->chkpDirLock); - pMeta->chkpId = streamGetLatestCheckpointId(pMeta); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - while (pMeta->streamBackend == NULL) { - taosMsleep(2 * 1000); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - qError("vgId:%d failed to init stream backend", pMeta->vgId); - qInfo("vgId:%d retry to init stream backend", pMeta->vgId); - } - } - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + // pMeta->pTaskBackendUnique = + // taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + // pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + // pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t)); + // pMeta->chkpCap = 8; - code = streamBackendLoadCheckpointInfo(pMeta); + // pMeta->chkpId = streamGetLatestCheckpointId(pMeta); + // pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + // while (pMeta->streamBackend == NULL) { + // qError("vgId:%d failed to init stream backend", pMeta->vgId); + // taosMsleep(2 * 1000); + // qInfo("vgId:%d retry to init stream backend", pMeta->vgId); + // pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + // if (pMeta->streamBackend == NULL) { + // } + // } + // pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + + // code = streamBackendLoadCheckpointInfo(pMeta); + // taosThreadMutexInit(&pMeta->backendMutex, NULL); taosInitRWLatch(&pMeta->lock); - taosThreadMutexInit(&pMeta->backendMutex, NULL); - pMeta->pauseTaskNum = 0; qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, @@ -241,13 +308,11 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { + qError("vgId:%d failed to init stream backend", pMeta->vgId); taosMsleep(2 * 1000); + + qInfo("vgId:%d retry to init stream backend", pMeta->vgId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - qError("vgId:%d failed to init stream backend", pMeta->vgId); - qInfo("vgId:%d retry to init stream backend", pMeta->vgId); - // return -1; - } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta);