From 7d27ba20d665a04fe33cd436c6f24bde821298cd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Mar 2024 03:58:24 +0000 Subject: [PATCH 01/21] fix queue log --- source/util/src/tworker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 138d4bc1f4..0712010458 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -19,7 +19,7 @@ #include "tgeosctx.h" #include "tlog.h" -#define QUEUE_THRESHOLD 1000 * 1000 +#define QUEUE_THRESHOLD (1000 * 1000) typedef void *(*ThreadFp)(void *param); From f853272d77fb8388ac1e9535a3f30508aa407f25 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Mar 2024 08:54:13 +0000 Subject: [PATCH 02/21] port main to 3.0 --- source/libs/transport/src/transCli.c | 71 +++++----------------------- 1 file changed, 13 insertions(+), 58 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ac45f1eef6..e27e9a4ac0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -585,11 +585,11 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; STrans* pTranInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -624,11 +624,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key) + 1); + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -714,7 +714,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1); + conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); } SConnList* pList = conn->list; @@ -1279,7 +1279,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -1291,7 +1291,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); } } } else { @@ -1471,7 +1471,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { } static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { uint32_t addr = 0; - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { @@ -1480,7 +1480,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) return addr; } - taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } else { addr = *v; } @@ -1490,13 +1490,13 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); if (addr != *v) { char old[64] = {0}, new[64] = {0}; tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } } return; @@ -1537,21 +1537,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pMsg->msg.msgType)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count + 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } - char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; @@ -1705,7 +1690,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); @@ -1800,21 +1784,6 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - if (rpcDebugFlag & DEBUG_TRACE) { - void* pIter = taosHashIterate(pThrd->msgCount, NULL); - while (pIter != NULL) { - int* count = pIter; - size_t len = 0; - char* key = taosHashGetKey(pIter, &len); - if (*count != 0) { - tDebug("key: %s count: %d", key, *count); - } - - pIter = taosHashIterate(pThrd->msgCount, pIter); - } - tDebug("all conn count: %d", pThrd->newConnCount); - } - int8_t supportBatch = pTransInst->supportBatch; if (supportBatch == 0) { cliNoBatchDealReq(&wq, pThrd); @@ -2411,20 +2380,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } } - if (rpcDebugFlag & DEBUG_TRACE) { - if (tmsgIsValid(pResp->msgType - 1)) { - char buf[128] = {0}; - sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1)); - int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf)); - if (NULL == 0) { - int localCount = 0; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } else { - int localCount = *count - 1; - taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount)); - } - } - } if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pSem) { From 89b9f87b7d2486c349169d1dd81b707e449b9b0c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 21 Mar 2024 09:01:13 +0000 Subject: [PATCH 03/21] port main to 3.0 --- source/libs/transport/src/transCli.c | 30 ++++++++++++++++------------ 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e27e9a4ac0..b335979cd5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -585,11 +585,12 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; STrans* pTranInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + size_t klen = strlen(key); + SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -624,11 +625,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + size_t klen = strlen(key); + SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -1471,7 +1473,8 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { } static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { uint32_t addr = 0; - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); + size_t len = strlen(fqdn); + uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { @@ -1480,7 +1483,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) return addr; } - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); } else { addr = *v; } @@ -1490,13 +1493,14 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = taosGetIpv4FromFqdn(fqdn); if (addr != 0xffffffff) { - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); + size_t len = strlen(fqdn); + uint32_t* v = taosHashGet(cache, fqdn, len); if (addr != *v) { char old[64] = {0}, new[64] = {0}; tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); } } return; @@ -1689,8 +1693,8 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); + size_t klen = strlen(key); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); QUEUE_INIT(&pBatchList->wq); @@ -1714,7 +1718,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*)); + taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); From ea424e822ece6f9aa17ed9e074a1dfc91c594adf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 25 Mar 2024 11:00:33 +0800 Subject: [PATCH 04/21] fix(tsdb/util): fix missing column value with row iter --- source/dnode/vnode/src/tsdb/tsdbUtil.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 44621bf4e6..076db8b3ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -642,8 +642,8 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { return &pIter->cv; } - if (pIter->iColData < pIter->pRow->pBlockData->nColData) { - tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv); + if (pIter->iColData <= pIter->pRow->pBlockData->nColData) { + tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv); ++pIter->iColData; return &pIter->cv; } else { From ced7c19943cc0dd5932cec4207a3fa752aca9621 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 25 Mar 2024 17:01:06 +0800 Subject: [PATCH 05/21] enh: not to start snap replication if too many open files already --- source/libs/sync/src/syncPipeline.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d40fff447f..3543ed574c 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -866,9 +866,14 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn SyncTerm term = -1; SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); + errno = 0; if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1); + if (term < 0 && (errno == ENFILE || errno == EMFILE)) { + sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1); + return -1; + } if ((index + 1 < firstVer) || (term < 0) || (term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) { ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST); From 6aaac7c0dcb656bf08d3248f15acd5ecf40bcd9c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 25 Mar 2024 17:57:13 +0800 Subject: [PATCH 06/21] fix:decode task error if task version is smaller than 2 --- source/libs/stream/src/streamTask.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4ca784a32f..13a4f5012d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); From 44ce5b951df0f4851163cae18cc283077fe70c12 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 25 Mar 2024 17:43:45 +0800 Subject: [PATCH 07/21] fix: arb group leak --- source/dnode/mnode/impl/inc/mndArbGroup.h | 1 + source/dnode/mnode/impl/src/mndArbGroup.c | 27 ++++++++++++++++++----- source/dnode/mnode/impl/src/mndDb.c | 19 ++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndArbGroup.h b/source/dnode/mnode/impl/inc/mndArbGroup.h index ed852cf581..fcd11310e7 100644 --- a/source/dnode/mnode/impl/inc/mndArbGroup.h +++ b/source/dnode/mnode/impl/inc/mndArbGroup.h @@ -35,6 +35,7 @@ int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup); +int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup); int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup); bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId, diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index e056e698f3..92ab5274e4 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -260,6 +260,14 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) { return 0; } +int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) { + SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + return 0; +} + static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) { SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup); if (pRedoRaw == NULL) return -1; @@ -535,10 +543,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { int32_t vgId = arbGroupDup.vgId; int64_t nowMs = taosGetTimestampMs(); - bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); - bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); - SArbAssignedLeader* pAssignedLeader = &arbGroupDup.assignedLeader; - int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; + bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); + bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); + SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; + int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; // 1. has assigned && is sync => send req if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { @@ -667,9 +675,16 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); newGroup.version = req.version; - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; + SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); + if (!pOldGroup) { + mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); + return 0; + } + sdbRelease(pMnode->pSdb, pOldGroup); + if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to update arbgroup, since %s", req.vgId, terrstr()); + mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr()); ret = -1; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a1f3a24661..527105a7b8 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1209,6 +1209,25 @@ static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SArbGroup *pArbGroup = NULL; + pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); + if (pIter == NULL) break; + + if (pArbGroup->dbUid == pDb->uid) { + if (mndSetDropArbGroupPrepareLogs(pTrans,pArbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pArbGroup); + return -1; + } + } + + sdbRelease(pSdb, pArbGroup); + } + return 0; } From e5abe34c84ae04f9ee652feb3274d354d22ba348 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 25 Mar 2024 21:34:31 +0800 Subject: [PATCH 08/21] fix: replace strdup for release build --- source/libs/executor/src/sysscanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 589c23ed3a..628aacf3c3 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -879,7 +879,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in char* outputWKT = NULL; if (nGeom == 0) { - if (!(*output = strdup(""))) code = TSDB_CODE_OUT_OF_MEMORY; + if (!(*output = taosStrdup(""))) code = TSDB_CODE_OUT_OF_MEMORY; *nOutput = 0; return code; } From a11b2c614e3c83e58d19e4dad09f5b3dfd65d70c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Mar 2024 23:41:22 +0800 Subject: [PATCH 09/21] fix(stream): not return error when failing to load stream task meta. --- source/dnode/vnode/src/tq/tq.c | 5 +--- source/libs/stream/src/streamMeta.c | 40 ++++++++++++----------------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ffebd783ac..ccfc8cc7c9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - return -1; - } - + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5f6440c06d..fc06a8975f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -824,13 +824,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } -static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - taosArrayDestroy(pRecycleList); -} - int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; void* pKey = NULL; @@ -847,10 +840,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - return -1; + return TSDB_CODE_SUCCESS; } tdbTbcMoveToFirst(pCur); @@ -859,20 +853,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno)); - doClear(pKey, pVal, pCur, pRecycleList); - return -1; + break; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); - doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "stream manually", vgId, tsDataDir); - return -1; + break; } tDecoderClear(&decoder); @@ -892,10 +884,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); + if (code < 0) { + stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); - return -1; + continue; } taosArrayPush(pMeta->pTaskList, &pTask->id); @@ -907,9 +900,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { - doClear(pKey, pVal, pCur, pRecycleList); + stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); + taosArrayPop(pMeta->pTaskList); tFreeStreamTask(pTask); - return -1; + continue; } if (pTask->info.fillHistory == 0) { @@ -925,10 +919,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); + if (tdbTbcClose(pCur) < 0) { - stError("vgId:%d failed to close meta-file cursor", vgId); - taosArrayDestroy(pRecycleList); - return -1; + stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno)); } if (taosArrayGetSize(pRecycleList) > 0) { @@ -942,8 +935,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); + taosArrayDestroy(pRecycleList); - return 0; + return TSDB_CODE_SUCCESS; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { From 8e45bebd66770855794a1f6e83ec9cc512793b46 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 26 Mar 2024 15:16:09 +0800 Subject: [PATCH 10/21] fix:decode task error if task version is smaller than 2 --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 13a4f5012d..c7a1a00a46 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; From 530e0133b9bba75f7ed33ccda676e3ba108ce1f3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 19:39:58 +0800 Subject: [PATCH 11/21] fix(stream): acquire the stream task in exec buffer if not in mnode store. --- source/dnode/mnode/impl/src/mndStream.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b522f6fa..ceb239ee96 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2153,11 +2153,20 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); - return -1; + // not in meta-store yet, try to acquire the task in exec buffer + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); @@ -2175,7 +2184,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); From 46280bfee95e634e4bf36f1197d77f046faa3526 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 19:40:47 +0800 Subject: [PATCH 12/21] fix(stream): add some comments. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ceb239ee96..93b4e70de7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2156,6 +2156,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { From 3c7fe5fd01b464693f5fe2ac429084ba5f2af906 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 23:27:38 +0800 Subject: [PATCH 13/21] fix(stream): check null ptr. --- source/dnode/mnode/impl/src/mndStream.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 93b4e70de7..bed946bc24 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2170,34 +2170,39 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } } - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(pList, req.taskId, req.streamId, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks); } int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + if (pStream != NULL) { + // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + mndReleaseStream(pMnode, pStream); + } else { + // todo: wait for the create stream trans completed, and launch the checkpoint trans + } // remove this entry taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } - mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); { From f3c306d582b1e2d7005ae3d3340b93d04df085d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 23:28:20 +0800 Subject: [PATCH 14/21] fix(stream): add some comments. --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bed946bc24..9a79f6ff5b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2194,6 +2194,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans + // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + // sleep(500ms) } // remove this entry From e776cde461a11151549c1e9f746f676d843daef0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Mar 2024 09:02:21 +0800 Subject: [PATCH 15/21] fix(stream):fix unrelease stream obj. --- source/dnode/mnode/impl/src/mndStream.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9a79f6ff5b..6067af199e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2188,10 +2188,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int64_t checkpointId = mndStreamGenChkpId(pMnode); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - if (pStream != NULL) { - // TODO:handle error + if (pStream != NULL) { // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); - mndReleaseStream(pMnode, pStream); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); @@ -2205,6 +2203,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + taosThreadMutexUnlock(&execInfo.lock); { From c0a394876d1a47780db7f90de915505204bbe95a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 27 Mar 2024 02:41:05 +0000 Subject: [PATCH 16/21] fix invalid free while taosd quit --- source/libs/transport/src/transCli.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b335979cd5..2d8f4ed3c2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1944,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { if (pMsg == NULL) { return; } - if (param != NULL) { - SCliThrd* pThrd = param; + + SCliThrd* pThrd = param; + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); } destroyCmsg(pMsg); @@ -1957,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; - tDebug("destroy Ahandle A"); - if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - tDebug("destroy Ahandle B"); + if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { pThrd->destroyAhandleFp(pMsg->ctx->ahandle); } - tDebug("destroy Ahandle C"); transDestroyConnCtx(pMsg->ctx); transFreeMsg(pMsg->msg.pCont); From f4fed29739d97f04603f315025930a685913cceb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Mar 2024 15:04:37 +0800 Subject: [PATCH 17/21] fix:write schema error if schema ver is 1 more than origin version --- source/dnode/mnode/impl/src/mndStb.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7ee1b36916..a507631128 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1104,14 +1104,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } else if (createReq.tagVer > 0 || createReq.colVer > 0) { int32_t tagDelta = createReq.tagVer - pStb->tagVer; int32_t colDelta = createReq.colVer - pStb->colVer; - int32_t verDelta = tagDelta + colDelta; mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d", createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); if (tagDelta <= 0 && colDelta <= 0) { mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name); code = 0; goto _OVER; - } else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) { + } else if ((tagDelta == 1 && colDelta == 0) || + (tagDelta == 0 && colDelta == 1) || + pStb->colVer == 1 || + pStb->tagVer == 1) { isAlter = true; mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); } else { From 4722f8be8787596d8c88939fc2f7166530eff5aa Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Mar 2024 15:17:49 +0800 Subject: [PATCH 18/21] fix:write schema error if schema ver is 1 more than origin version --- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a507631128..79c62df766 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1112,8 +1112,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { goto _OVER; } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) || - pStb->colVer == 1 || - pStb->tagVer == 1) { + (pStb->colVer == 1 && createReq.colVer > 1) || + (pStb->tagVer == 1 && createReq.tagVer > 1)) { isAlter = true; mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); } else { From 5157d5b97d44474edbc4612180a57e591e308ff1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Mar 2024 15:52:48 +0800 Subject: [PATCH 19/21] fix(stream): fix invalid read. --- source/libs/stream/src/streamMeta.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fc06a8975f..c24763c024 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -648,14 +648,17 @@ SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) { } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { + int32_t taskId = pTask->id.taskId; int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); + + // not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs. if (ref > 0) { - stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + stTrace("s-task:0x%x release task, ref:%d", taskId, ref); } else if (ref == 0) { - stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); + stTrace("s-task:0x%x all refs are gone, free it", taskId); tFreeStreamTask(pTask); } else if (ref < 0) { - stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); + stError("task ref is invalid, ref:%d, 0x%x", ref, taskId); } } From c611974247abfdbc40ece8230bea04ab7f9b4595 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Wed, 27 Mar 2024 14:17:53 +0800 Subject: [PATCH 20/21] fix: tmsg encode error --- source/common/src/tmsg.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ff424ee558..8430c67321 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -51,8 +51,8 @@ #define ENCODESQL() \ do { \ + if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ if (pReq->sqlLen > 0 && pReq->sql != NULL) { \ - if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \ if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \ } \ } while (0) @@ -3025,7 +3025,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { ENCODESQL(); - if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1; tEndEncode(&encoder); @@ -3140,7 +3140,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; ENCODESQL(); - if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; From 277a009677b958a68f898818c53c7443e5aae85d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Mar 2024 16:42:32 +0800 Subject: [PATCH 21/21] fix(test): fix unit test compiler error. --- source/util/test/tbaseCodecTest.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/util/test/tbaseCodecTest.cpp b/source/util/test/tbaseCodecTest.cpp index 4c56979885..63bbfcaa68 100644 --- a/source/util/test/tbaseCodecTest.cpp +++ b/source/util/test/tbaseCodecTest.cpp @@ -17,11 +17,6 @@ using namespace std; #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} - static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) { int64_t start = taosGetTimestampUs(); char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen);