Merge branch 'enh/new3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-02 10:15:34 +08:00
parent b911492270
commit 7e7a39f077
1 changed files with 12 additions and 8 deletions

View File

@ -65,6 +65,7 @@ static void streamMetaEnvInit() {
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
@ -319,15 +320,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
pMeta->rid = taosAddRef(streamMetaId, pMeta);
*pRid = pMeta->rid;
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -347,6 +340,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
pMeta->rid = taosAddRef(streamMetaId, pMeta);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
return pMeta;
_err: