From 167e58578ad176dc427c7b7cccbb597a5c868310 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 29 Aug 2022 14:53:13 +0800 Subject: [PATCH 1/7] enh: add check for tpagebuf --- source/libs/executor/src/tsimplehash.c | 6 +----- source/util/src/tpagedbuf.c | 1 + 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 8cd376e092..84b615af7a 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -295,11 +295,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke } if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) { - if (!pPrev) { - *pIter = NULL; - } else { - *pIter = GET_SHASH_NODE_DATA(pPrev); - } + *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL; } FREE_HASH_NODE(pNode); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 4d5532b9a6..0c30cc1003 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -465,6 +465,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { // set the ptr to the new SPageInfo ((void**)((*pi)->pData))[0] = (*pi); + assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, *pi); (*pi)->used = true; From 12a01a88dd2cf189aa702ce40758e9f11dafcede Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 29 Aug 2022 16:15:12 +0800 Subject: [PATCH 2/7] fix: race condition between fetch all and optr serialize --- source/dnode/vnode/src/sma/smaRollup.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 426ab521fd..5b2595ad21 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1715,7 +1715,8 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (type == RSMA_EXEC_OVERFLOW) { + if (((type == RSMA_EXEC_OVERFLOW) && (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0)) || + (type == RSMA_EXEC_COMMIT)) { tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); } From 8f6e87594fc07228c3ca919cfbc23ee65e3cb829 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 29 Aug 2022 16:33:08 +0800 Subject: [PATCH 3/7] enh: not fetch all during commit --- source/dnode/vnode/src/sma/smaRollup.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 5b2595ad21..3ed8dc2a20 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1715,8 +1715,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (((type == RSMA_EXEC_OVERFLOW) && (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0)) || - (type == RSMA_EXEC_COMMIT)) { + if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); } From c8d6960291937accffc9179ff8ec5ec37a093b9e Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 29 Aug 2022 17:53:02 +0800 Subject: [PATCH 4/7] other: adjust sma log level --- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/util/src/tlog.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3ed8dc2a20..1b1e304c03 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1531,7 +1531,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) { tsem_post(&(pStat->notEmpty)); } - smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, + smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a2d65d6a54..06ebbf27fb 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -97,7 +97,7 @@ int32_t tqDebugFlag = 135; int32_t fsDebugFlag = 135; int32_t metaDebugFlag = 135; int32_t udfDebugFlag = 135; -int32_t smaDebugFlag = 135; +int32_t smaDebugFlag = 131; int32_t idxDebugFlag = 135; int64_t dbgEmptyW = 0; From 894e71371928a2a5428901778f35e232bc281e12 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 29 Aug 2022 17:54:20 +0800 Subject: [PATCH 5/7] other: revert the assert for tpagebuf --- source/util/src/tpagedbuf.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 0c30cc1003..4d5532b9a6 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -465,7 +465,6 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { // set the ptr to the new SPageInfo ((void**)((*pi)->pData))[0] = (*pi); - assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, *pi); (*pi)->used = true; From 9600840aad86ad17bfdef206b2ff22d956e78de1 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 08:14:32 +0800 Subject: [PATCH 6/7] fix: race condition between fetch all and commit --- source/dnode/vnode/src/sma/smaCommit.c | 11 +++++++++-- source/dnode/vnode/src/sma/smaRollup.c | 25 +++++++++++++++++-------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index ca5367f397..0e644be288 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -312,15 +312,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); + int32_t nLoops = 0; // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait for all triggered fetch tasks to finish - int32_t nLoops = 0; + while (1) { if (T_REF_VAL_GET(pStat) == 0) { smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1b1e304c03..52b08d131c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,6 +21,7 @@ #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_FETCH_DELAY_MAX (900000) // ms #define RSMA_FETCH_ACTIVE_MAX (1800) // ms +#define RSMA_FETCH_INTERVAL (5000) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -1501,13 +1502,13 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { - taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); return; } default: @@ -1518,7 +1519,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", + smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process pItem->fetchLevel = pItem->level; @@ -1531,8 +1532,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) { tsem_post(&(pStat->notEmpty)); } - smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, - pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", @@ -1715,15 +1714,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { + if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) { tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); } if (qallItemSize > 0) { atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue; } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - continue; + if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { + continue; + } + for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j); + if (pItem->fetchLevel) { + pItem->fetchLevel = 0; + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + } + } } break; @@ -1775,7 +1784,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (pEnv->flag & SMA_ENV_FLG_CLOSE) { break; } - + tsem_wait(&pRSmaStat->notEmpty); if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { From 01aca9ffc7fb7bd5bf6769ad1a751c9bce13fc2f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 30 Aug 2022 10:07:33 +0800 Subject: [PATCH 7/7] fix: stat timeseries every minute to avoid dnode offline caused by massive tables --- source/dnode/vnode/inc/vnode.h | 28 +++++++++++++------------ source/dnode/vnode/src/meta/metaQuery.c | 20 +++++++++++------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3a3cbe72ba..5d4285b7c2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,17 +157,17 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); // tq -typedef struct SMetaTableInfo{ +typedef struct SMetaTableInfo { int64_t suid; int64_t uid; SSchemaWrapper *schema; char tbName[TSDB_TABLE_NAME_LEN]; -}SMetaTableInfo; +} SMetaTableInfo; -typedef struct SIdInfo{ - int64_t version; - int32_t index; -}SIdInfo; +typedef struct SIdInfo { + int64_t version; + int32_t index; +} SIdInfo; typedef struct SSnapContext { SMeta *pMeta; @@ -180,8 +180,8 @@ typedef struct SSnapContext { SArray *idList; int32_t index; bool withMeta; - bool queryMetaOrData; // true-get meta, false-get data -}SSnapContext; + bool queryMetaOrData; // true-get meta, false-get data +} SSnapContext; typedef struct STqReader { int64_t ver; @@ -232,11 +232,12 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); -int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet); -int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); -SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx); -int32_t setForSnapShot(SSnapContext* ctx, int64_t uid); -int32_t destroySnapContext(SSnapContext* ctx); +int32_t buildSnapContext(SMeta *pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, + SSnapContext **ctxRet); +int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); +SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx); +int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); +int32_t destroySnapContext(SSnapContext *ctx); // structs struct STsdbCfg { @@ -259,6 +260,7 @@ typedef struct { int64_t numOfNTables; int64_t numOfNTimeSeries; int64_t numOfTimeSeries; + int64_t itvTimeSeries; int64_t pointsWritten; int64_t totalStorage; int64_t compStorage; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 9d3b4d82eb..7df355a59b 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -615,9 +615,13 @@ int64_t metaGetTbNum(SMeta *pMeta) { // N.B. Called by statusReq per second int64_t metaGetTimeSeriesNum(SMeta *pMeta) { // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) - int64_t num = 0; - vnodeGetTimeSeriesNum(pMeta->pVnode, &num); - pMeta->pVnode->config.vndStats.numOfTimeSeries = num; + if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % 60 == 0) { + int64_t num = 0; + vnodeGetTimeSeriesNum(pMeta->pVnode, &num); + pMeta->pVnode->config.vndStats.numOfTimeSeries = num; + + pMeta->pVnode->config.vndStats.itvTimeSeries = 0; + } return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; } @@ -890,7 +894,7 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) { #ifdef TAG_FILTER_DEBUG if (IS_VAR_DATA_TYPE(val->type)) { - char* buf = taosMemoryCalloc(val->nData + 1, 1); + char *buf = taosMemoryCalloc(val->nData + 1, 1); memcpy(buf, val->pData, val->nData); metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf); taosMemoryFree(buf); @@ -900,13 +904,13 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) { metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval); } - SArray* pTagVals = NULL; - tTagToValArray((STag*)pTag, &pTagVals); + SArray *pTagVals = NULL; + tTagToValArray((STag *)pTag, &pTagVals); for (int i = 0; i < taosArrayGetSize(pTagVals); i++) { - STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); + STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i); if (IS_VAR_DATA_TYPE(pTagVal->type)) { - char* buf = taosMemoryCalloc(pTagVal->nData + 1, 1); + char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1); memcpy(buf, pTagVal->pData, pTagVal->nData); metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf); taosMemoryFree(buf);