From 1d8791a106c2f6d6499f9f9af70aec1fff820d18 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 18 Jul 2024 14:32:04 +0800 Subject: [PATCH] opti:[TS-4593] transform offset from file to tdb in tmq --- include/common/tmsg.h | 1 + source/client/src/clientMonitor.c | 2 +- source/common/src/tmsg.c | 6 +++++ source/dnode/vnode/src/inc/tq.h | 3 ++- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqMeta.c | 35 +++++++++++++++++----------- source/dnode/vnode/src/tq/tqOffset.c | 20 ++++++---------- 7 files changed, 40 insertions(+), 30 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 40fce1d67b..a5dea8a44e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3612,6 +3612,7 @@ typedef struct { int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset); int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset); +void tDeleteSTqOffset(void* val); typedef struct SMqVgOffset { int64_t consumerId; diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 304f18cd68..b8b57160fe 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -691,7 +691,7 @@ static void* monitorThreadFunc(void* param) { tscDebug("monitorThreadFunc start"); int64_t quitTime = 0; while (1) { - if (atomic_load_32(&slowLogFlag) > 0 > 0) { + if (atomic_load_32(&slowLogFlag) > 0) { if (quitCnt == 0) { monitorSendAllSlowLogAtQuit(); if (quitCnt == 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e93de486e0..98e6530be0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9258,6 +9258,12 @@ void tOffsetDestroy(void *param) { taosMemoryFreeClear(pVal->primaryKey.pData); } } + +void tDeleteSTqOffset(void *param) { + STqOffset *pVal = (STqOffset *)param; + tOffsetDestroy(&pVal->val); +} + int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1; if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f946dc76f8..d3582ab8f3 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -131,6 +131,7 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen); int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen); +int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen); int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle); void* tqMetaGetOffset(STQ* pTq, const char* subkey); int32_t tqMetaTransform(STQ* pTq); @@ -141,7 +142,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); // tqOffset int32_t tqBuildFName(char** data, const char* path, char* name); -int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name); +int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name); // tq util int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0d167ccfa6..dd33d4ed65 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -80,6 +80,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK); + taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset); int32_t code = tqInitialize(pTq); if (code != TSDB_CODE_SUCCESS) { @@ -228,7 +229,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)) < 0) { + if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 2ab459416d..a985b804be 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -87,6 +87,18 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ return code; } +int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen){ + SDecoder decoder = {0}; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + int32_t code = tDecodeSTqOffset(&decoder, info); + if (code != 0) { + tDeleteSTqOffset(info); + return TSDB_CODE_OUT_OF_MEMORY; + } + tDecoderClear(&decoder); + return code; +} + int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; @@ -113,21 +125,9 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) { int32_t code = TDB_CODE_SUCCESS; - void* pIter = NULL; - - TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq->pOffset, path)); - while (1) { - pIter = taosHashIterate(pTq->pOffset, pIter); - if (pIter == NULL) { - break; - } - - STqOffset* offset = (STqOffset*)pIter; - TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset->subKey, strlen(offset->subKey), offset, sizeof(STqOffset))); - } + TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path)); END: - taosHashCancelIterate(pTq->pOffset, pIter); return code; } @@ -140,7 +140,14 @@ void* tqMetaGetOffset(STQ* pTq, const char* subkey){ return NULL; } - if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), data, sizeof(STqOffset)) != 0){ + STqOffset offset = {0}; + if (tqMetaDecodeOffsetInfo(&offset, data, vLen) != TDB_CODE_SUCCESS) { + tdbFree(data); + return NULL; + } + + if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0){ + tDeleteSTqOffset(&offset); tdbFree(data); return NULL; } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e37cdfc697..f9faf611e1 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -32,10 +32,9 @@ int32_t tqBuildFName(char** data, const char* path, char* name) { return TDB_CODE_SUCCESS; } -int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) { +int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { int32_t code = TDB_CODE_SUCCESS; void* pMemBuf = NULL; - SDecoder decoder = {0}; TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ); if (pFile == NULL) { @@ -65,19 +64,15 @@ int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) { goto END; } - STqOffset offset; - tDecoderInit(&decoder, pMemBuf, size); - if (tDecodeSTqOffset(&decoder, &offset) < 0) { - code = TSDB_CODE_INVALID_MSG; + STqOffset offset = {0}; + TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size)); + code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)); + if (code != TDB_CODE_SUCCESS) { + tDeleteSTqOffset(&offset); goto END; } + TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); - if (taosHashPut(pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } - - tDecoderClear(&decoder); taosMemoryFree(pMemBuf); pMemBuf = NULL; } @@ -85,7 +80,6 @@ int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) { END: taosCloseFile(&pFile); taosMemoryFree(pMemBuf); - tDecoderClear(&decoder); return code; }