From 0704dcf45d5d4efbd94856386f510c96e257fc57 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 16 Jul 2024 18:56:20 +0800 Subject: [PATCH] fix:[TS-4593] check info error --- source/dnode/mnode/impl/src/mndTopic.c | 6 +- source/dnode/vnode/src/inc/tq.h | 1 - source/dnode/vnode/src/tq/tq.c | 59 +++++++--------- source/dnode/vnode/src/tq/tqMeta.c | 93 +++++++++----------------- source/dnode/vnode/src/tq/tqSnapshot.c | 2 +- 5 files changed, 60 insertions(+), 101 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b8cb1d00d0..bcb38a3902 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -725,19 +725,19 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO continue; } - buf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(pTopic->ntbUid)); + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); if (buf == NULL){ code = TSDB_CODE_OUT_OF_MEMORY; goto end; } void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - *(int64_t*)abuf = pTopic->ntbUid; + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; - action.contLen = sizeof(SMsgHead) + sizeof(pTopic->ntbUid); + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; code = mndTransAppendRedoAction(pTrans, &action); if (code != 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index d97a953d94..f946dc76f8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -132,7 +132,6 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen); int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen); int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle); -void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid); void* tqMetaGetOffset(STQ* pTq, const char* subkey); int32_t tqMetaTransform(STQ* pTq); // tqSink diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ca1527d5e7..6611aa4caa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -76,7 +76,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK); @@ -282,16 +282,25 @@ end: } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { - STqCheckInfo* pCheck = tqMetaGetCheckInfo(pTq, tbUid); - if(pCheck == NULL) { - return 0; - } + void* pIter = NULL; - int32_t sz = taosArrayGetSize(pCheck->colIdList); - for (int32_t i = 0; i < sz; i++) { - int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); - if (forbidColId == colId) { - return -1; + while (1) { + pIter = taosHashIterate(pTq->pCheckInfo, pIter); + if (pIter == NULL) { + break; + } + + STqCheckInfo* pCheck = (STqCheckInfo*)pIter; + + if (pCheck->ntbUid == tbUid) { + int32_t sz = taosArrayGetSize(pCheck->colIdList); + for (int32_t i = 0; i < sz; i++) { + int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); + if (forbidColId == colId) { + taosHashCancelIterate(pTq->pCheckInfo, pIter); + return -1; + } + } } } @@ -585,39 +594,18 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return 0; } -void mergeTwoArray(SArray* a, SArray* b){ - size_t bLen = taosArrayGetSize(b); - for(int i = 0; i < bLen; i++){ - void* dataB = taosArrayGet(b, i); - size_t aLen = taosArrayGetSize(a); - int j = 0; - for(; j < aLen; j++){ - void* dataA = taosArrayGet(a, i); - if(*(int64_t*)dataA == *(int64_t*)dataB){ - break; - } - } - if(j == aLen){ - taosArrayPush(a, dataB); - } - } -} - int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqCheckInfo info = {0}; if(tqMetaDecodeCheckInfo(&info, msg, msgLen) != 0){ return -1; } - STqCheckInfo *old = tqMetaGetCheckInfo(pTq, info.ntbUid); - if (old != NULL){ - mergeTwoArray(info.colIdList, old->colIdList); - } - if (taosHashPut(pTq->pCheckInfo, &info.ntbUid, sizeof(info.ntbUid), &info, sizeof(STqCheckInfo)) < 0) { + + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tDeleteSTqCheckInfo(&info); return -1; } - if (tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), msg, msgLen) < 0) { + if (tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -629,7 +617,8 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, msgLen) < 0) { + if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg)) < 0) { + tqError("cannot process tq delete check info req %s, since no such check info", msg); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 3ddd7d953f..622ba786ab 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -87,31 +87,6 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ return code; } -void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid){ - void* data = taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid)); - if (data == NULL) { - int vLen = 0; - if (tdbTbGet(pTq->pCheckStore, &tbUid, sizeof(tbUid), &data, &vLen) < 0) { - tdbFree(data); - return NULL; - } - STqCheckInfo info= {0}; - if(tqMetaDecodeCheckInfo(&info, data, vLen) != 0) { - tdbFree(data); - return NULL; - } - tdbFree(data); - - if(taosHashPut(pTq->pCheckInfo, &tbUid, sizeof(tbUid), &info, sizeof(STqCheckInfo)) != 0){ - tDeleteSTqCheckInfo(&info); - return NULL; - } - return taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid)); - } else { - return data; - } -} - int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; @@ -326,7 +301,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ +static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew){ TBC* pCur = NULL; void* pKey = NULL; int kLen = 0; @@ -336,12 +311,12 @@ static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* p int32_t code = TDB_CODE_SUCCESS; - TQ_ERR_GO_TO_END(tdbTbcOpen(pExecStoreOld, &pCur, NULL)); + TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL)); TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - TQ_ERR_GO_TO_END (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn)); + TQ_ERR_GO_TO_END (tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn)); } TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); @@ -354,37 +329,6 @@ END: return code; } -static int32_t tqMetaTransformCheckInfo(TDB* pMetaDB, TTB* pCheckOld, TTB* pCheckNew){ - TBC* pCur = NULL; - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - TXN* txn = NULL; - - int32_t code = TDB_CODE_SUCCESS; - - TQ_ERR_GO_TO_END(tdbTbcOpen(pCheckOld, &pCur, NULL)); - TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); - - TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqCheckInfo info= {0}; - TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen)); - int64_t uid = info.ntbUid; - tDeleteSTqCheckInfo(&info); - TQ_ERR_GO_TO_END (tdbTbUpsert(pCheckNew, &uid, sizeof(uid), pVal, vLen, txn)); - } - - TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); - TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn)); -END: - tdbFree(pKey); - tdbFree(pVal); - (void)tdbTbcClose(pCur); - return code; -} - int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) { void* data = taosHashGet(pTq->pHandle, key, strlen(key)); if(data == NULL){ @@ -434,6 +378,32 @@ static int32_t replaceTqPath(char** path){ return TDB_CODE_SUCCESS; } +static int32_t tqMetaRestoreCheckInfo(STQ* pTq) { + TBC* pCur = NULL; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + int32_t code = 0; + STqCheckInfo info = {0}; + + TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL)); + TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen)); + TQ_ERR_GO_TO_END(taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo))); + } + info.colIdList = NULL; + +END: + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + tDeleteSTqCheckInfo(&info); + return code; +} + int32_t tqMetaOpen(STQ* pTq) { char* maindb = NULL; int32_t code = TDB_CODE_SUCCESS; @@ -444,6 +414,7 @@ int32_t tqMetaOpen(STQ* pTq) { }else{ TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); } + TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); END: taosMemoryFree(maindb); @@ -466,8 +437,8 @@ int32_t tqMetaTransform(STQ* pTq) { TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path)); TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); - TQ_ERR_GO_TO_END(tqMetaTransformStoreInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore)); - TQ_ERR_GO_TO_END(tqMetaTransformCheckInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); + TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore)); + TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index 755274ff5e..6cb5734aa2 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -211,7 +211,7 @@ int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nD goto _err; } - code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); if (code) goto _err; return code;