From 7947b94b039d877759434dcf6ce37704b05b9323 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 20 Nov 2024 18:40:49 +0800 Subject: [PATCH] tsdb/commit: update lru from imem --- include/common/tglobal.h | 4 +- source/common/src/tglobal.c | 30 ++- source/dnode/vnode/src/tsdb/tsdbCache.c | 245 ++++++++++++--------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +- 4 files changed, 163 insertions(+), 120 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e6c471eaf1..488a513b03 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -154,7 +154,7 @@ extern bool tsEnableCrashReport; extern char *tsTelemUri; extern char *tsClientCrashReportUri; extern char *tsSvrCrashReportUri; -extern int8_t tsSafetyCheckLevel; +extern int8_t tsSafetyCheckLevel; enum { TSDB_SAFETY_CHECK_LEVELL_NEVER = 0, TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1, @@ -258,7 +258,7 @@ extern int32_t tsS3MigrateIntervalSec; extern bool tsS3MigrateEnabled; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; - +extern bool tsUpdateCacheBatch; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern int tsStreamAggCnt; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0133428c53..cf588b4c06 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -298,6 +298,8 @@ bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; +bool tsUpdateCacheBatch = true; + int8_t tsS3EpNum = 0; char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; @@ -536,7 +538,8 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER)); + TAOS_CHECK_RETURN( + cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -589,8 +592,10 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { TAOS_CHECK_RETURN( cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); + TAOS_CHECK_RETURN( + cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); + TAOS_CHECK_RETURN( + cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT)); @@ -1994,14 +1999,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { - {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag}, - {"wDebugFlag", &wDebugFlag}, {"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag}, - {"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, - {"udfDebugFlag", &udfDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, - {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, - {"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, - {"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, - {"tqClientDebugFlag", &tqClientDebugFlag}, + {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, + {"mDebugFlag", &mDebugFlag}, {"wDebugFlag", &wDebugFlag}, + {"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag}, + {"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag}, + {"fsDebugFlag", &fsDebugFlag}, {"udfDebugFlag", &udfDebugFlag}, + {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, + {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, + {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, + {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, + {"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, + {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebugFlag", &tqClientDebugFlag}, }; static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 04a657e7ef..7fb7e64b49 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -493,26 +493,6 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { return 0; } -static int32_t tsdbCacheQueryReseek(void *pQHandle) { - int32_t code = 0; - SCacheRowsReader *pReader = pQHandle; - - code = taosThreadMutexTryLock(&pReader->readerMutex); - if (code == 0) { - // pause current reader's state if not paused, save ts & version for resuming - // just wait for the big all tables' snapshot untaking for now - - code = TSDB_CODE_VND_QUERY_BUSY; - (void)taosThreadMutexUnlock(&pReader->readerMutex); - - return code; - } else if (code == EBUSY) { - return TSDB_CODE_VND_QUERY_BUSY; - } else { - return -1; - } -} - static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { bool deleted = false; while (*iSkyline > 0) { @@ -545,71 +525,73 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { } // Get next non-deleted row from imem -static int32_t tsdbImemGetNextRow(SMemTable *imem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow, SArray *pSkyline, - int64_t iSkyline) { +static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) { int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = imem->pTsdb; - // tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter); - TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); - if (pMemRow) { - // if non deleted, return the found row. - TSDBKEY rowKey = TSDBROW_KEY(pMemRow); - bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline); + if (tsdbTbDataIterNext(pTbIter)) { + TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); + TSDBKEY rowKey = TSDBROW_KEY(pMemRow); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); if (!deleted) { - *ppRow = pMemRow; - TAOS_RETURN(code); + return pMemRow; } - } else { - TAOS_RETURN(code); } - // continue to find the non-deleted first row from imem - TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - - TAOS_RETURN(code); + return NULL; } // Get first non-deleted row from imem -static int32_t tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow, - SArray *pSkyline, int64_t iSkyline) { +static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter, + SArray *pSkyline, int64_t *piSkyline) { int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = imem->pTsdb; tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter); TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); if (pMemRow) { // if non deleted, return the found row. TSDBKEY rowKey = TSDBROW_KEY(pMemRow); - bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); if (!deleted) { - *ppRow = pMemRow; - TAOS_RETURN(code); + return pMemRow; } } else { - TAOS_RETURN(code); + return NULL; } // continue to find the non-deleted first row from imem, using get next row - TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit); + return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline); +} -_exit: - if (code) { - tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; + + if (suid > 0 && suid == pRCache->suid) { + pRCache->sver = -1; + pRCache->suid = -1; + } + if (suid == 0 && uid == pRCache->uid) { + pRCache->sver = -1; + pRCache->uid = -1; + } +} + +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (pRCache->pTSchema && sver == pRCache->sver) { + if (suid > 0 && suid == pRCache->suid) { + return 0; + } + if (suid == 0 && uid == pRCache->uid) { + return 0; + } } - TAOS_RETURN(code); + pRCache->suid = suid; + pRCache->uid = uid; + pRCache->sver = sver; + tDestroyTSchema(pRCache->pTSchema); + return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); } static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) { @@ -620,8 +602,12 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) SArray *pTombData = NULL; SArray *pSkyline = NULL; int64_t iSkyline = 0; - STbDataIter iter = {0}; + STbDataIter tbIter = {0}; TSDBROW *pMemRow = NULL; + STSchema *pTSchema = NULL; + SSHashObj *iColHash = NULL; + int32_t sver; + int32_t nCol; STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid); @@ -635,16 +621,99 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) iSkyline = taosArrayGetSize(pSkyline) - 1; } - TAOS_CHECK_GOTO(tsdbImemGetFirstRow(imem, pIMem, tid, &iter, &pMemRow, pSkyline, iSkyline), &lino, _exit); + pMemRow = tsdbImemGetFirstRow(imem, pIMem, tid, &tbIter, pSkyline, &iSkyline); + if (!pMemRow) { + goto _exit; + } // iter first row to last_row/last col values to ctxArray, and mark last null col ids + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, tid.suid, tid.uid, sver), &lino, _exit); + pTSchema = pTsdb->rCache.pTSchema; + nCol = pTSchema->numOfCols; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, pMemRow, pTSchema), &lino, _exit); + + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + tsdbRowClose(&iter); + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + + if (COL_VAL_IS_VALUE(pColVal)) { + updateCtx.lflag = LFLAG_LAST; + if (!taosArrayPush(ctxArray, &updateCtx)) { + tsdbRowClose(&iter); + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (iColHash == NULL) { + tsdbRowClose(&iter); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + } + } + + if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { + tsdbRowClose(&iter); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); + } + } + } + tsdbRowClose(&iter); + // continue to get next row to fill null last col values + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + while (pMemRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + void *pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) { + int32_t iCol = ((int32_t *)pIte)[0]; + SColVal colVal = COL_VAL_NONE(0, 0); + tsdbRowGetColVal(pMemRow, pTSchema, iCol, &colVal); + + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + if (code != TSDB_CODE_SUCCESS) { + tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, + __LINE__, tstrerror(code)); + } + } + } + + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + } _exit: if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } + + // destroy any allocated resource + tSimpleHashCleanup(iColHash); + if (pMemDelData) { + taosArrayDestroy(pMemDelData); + } + if (pSkyline) { + taosArrayDestroy(pSkyline); } TAOS_RETURN(code); @@ -685,13 +754,9 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i]; TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit); - } - - // 3, update cols into lru - for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) { - TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i]; TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit); + taosArrayClear(ctxArray); } _exit: @@ -714,11 +779,13 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { // flush dirty data of lru into rocks with // 4, and update when writing if !updateCacheBatch - code = tsdbCacheUpdateFromIMem(pTsdb); - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + if (tsUpdateCacheBatch) { + code = tsdbCacheUpdateFromIMem(pTsdb); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); - TAOS_RETURN(code); + TAOS_RETURN(code); + } } char *err = NULL; @@ -1527,38 +1594,6 @@ _exit: TAOS_RETURN(code); } -void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; - - if (suid > 0 && suid == pRCache->suid) { - pRCache->sver = -1; - pRCache->suid = -1; - } - if (suid == 0 && uid == pRCache->uid) { - pRCache->sver = -1; - pRCache->uid = -1; - } -} - -static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (pRCache->pTSchema && sver == pRCache->sver) { - if (suid > 0 && suid == pRCache->suid) { - return 0; - } - if (suid == 0 && uid == pRCache->uid) { - return 0; - } - } - - pRCache->suid = suid; - pRCache->uid = uid; - pRCache->sver = sver; - tDestroyTSchema(pRCache->pTSchema); - return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); -} - int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow) { int32_t code = 0, lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 617bfc1e17..eed18779a2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -674,7 +674,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) { tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64, TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version); @@ -736,7 +736,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.key.ts >= pTbData->maxKey) { pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow)); }