From 1f85cb42e65f05aa6ace816adad423ead8e06b0e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 Apr 2024 17:22:40 +0800 Subject: [PATCH] fix:[TS-4728]transform strore data if data is too big --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 26 +++-- source/dnode/vnode/src/tq/tqMeta.c | 144 ++++++++++++++++++++++++++- source/dnode/vnode/src/tq/tqOffset.c | 4 + 4 files changed, 162 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index bd8b73ed33..e2ecdca59f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -136,6 +136,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key); int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); +int32_t tqMetaTransform(STQ* pTq); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0e6b85bd2b..2add128eff 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -86,15 +86,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } int32_t tqInitialize(STQ* pTq) { - if (tqMetaOpen(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } - int32_t vgId = TD_VID(pTq->pVnode); pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); if (pTq->pStreamMeta == NULL) { @@ -102,6 +93,23 @@ int32_t tqInitialize(STQ* pTq) { } /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + + if (tqMetaTransform(pTq) < 0) { + return -1; + } + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } + + if (tqMetaRestoreCheckInfo(pTq) < 0) { + return -1; + } + + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 82547af8d4..76322c527f 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } - if (tqMetaRestoreCheckInfo(pTq) < 0) { - return -1; - } - return 0; } @@ -378,6 +374,146 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } +static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ + TBC* pCur = NULL; + if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) { + return -1; + } + + TXN* txn; + if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + + tdbTbcMoveToFirst(pCur); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) { + tqError("transform sub info error"); + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + return -1; + } + } + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + + if (tdbCommit(pMetaDB, txn) < 0) { + return -1; + } + + if (tdbPostCommit(pMetaDB, txn) < 0) { + return -1; + } + return 0; +} + +int32_t tqMetaTransform(STQ* pTq) { + int32_t len = strlen(pTq->path) + 64; + char* maindb = taosMemoryCalloc(1, len); + sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME); + + if(!taosCheckExistFile(maindb)){ + taosMemoryFree(maindb); + char* tpath = taosMemoryCalloc(1, len); + if(tpath == NULL){ + return -1; + } + sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); + taosMemoryFree(pTq->path); + pTq->path = tpath; + return tqMetaOpen(pTq); + } + + int32_t code = 0; + TDB* pMetaDB = NULL; + TTB* pExecStore = NULL; + TTB* pCheckStore = NULL; + char* offsetNew = NULL; + char* offset = tqOffsetBuildFName(pTq->path, 0); + if(offset == NULL){ + code = -1; + goto END; + } + + + if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) { + code = -1; + goto END; + } + + if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) { + code = -1; + goto END; + } + + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) { + code = -1; + goto END; + } + + char* tpath = taosMemoryCalloc(1, len); + if(tpath == NULL){ + code = -1; + goto END; + } + sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); + taosMemoryFree(pTq->path); + pTq->path = tpath; + if (tqMetaOpen(pTq) < 0) { + code = -1; + goto END; + } + + if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){ + code = -1; + goto END; + } + + if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){ + code = -1; + goto END; + } + + tdbTbClose(pExecStore); + pExecStore = NULL; + tdbTbClose(pCheckStore); + pCheckStore = NULL; + tdbClose(pMetaDB); + pMetaDB = NULL; + + offsetNew = tqOffsetBuildFName(pTq->path, 0); + if(offsetNew == NULL){ + code = -1; + goto END; + } + if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ + tqError("copy offset file error"); + code = -1; + goto END; + } + + taosRemoveFile(maindb); + taosRemoveFile(offset); + + END: + taosMemoryFree(maindb); + taosMemoryFree(offset); + taosMemoryFree(offsetNew); + + tdbTbClose(pExecStore); + tdbTbClose(pCheckStore); + tdbClose(pMetaDB); + + return code; +} + //int32_t tqMetaRestoreHandle(STQ* pTq) { // int code = 0; // TBC* pCur = NULL; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 7321e73d28..8b0e039ad5 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -25,6 +25,10 @@ struct STqOffsetStore { char* tqOffsetBuildFName(const char* path, int32_t fVer) { int32_t len = strlen(path); char* fname = taosMemoryCalloc(1, len + 40); + if(fname == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer); return fname; }