tsdb/commit: update lru from imem

This commit is contained in:
Minglei Jin 2024-11-20 18:40:49 +08:00
parent ffab4e8886
commit 7947b94b03
4 changed files with 163 additions and 120 deletions

View File

@ -154,7 +154,7 @@ extern bool tsEnableCrashReport;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
extern int8_t tsSafetyCheckLevel;
extern int8_t tsSafetyCheckLevel;
enum {
TSDB_SAFETY_CHECK_LEVELL_NEVER = 0,
TSDB_SAFETY_CHECK_LEVELL_NORMAL = 1,
@ -258,7 +258,7 @@ extern int32_t tsS3MigrateIntervalSec;
extern bool tsS3MigrateEnabled;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;
extern bool tsUpdateCacheBatch;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern int tsStreamAggCnt;

View File

@ -298,6 +298,8 @@ bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 100000;
bool tsUpdateCacheBatch = true;
int8_t tsS3EpNum = 0;
char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<endpoint>"};
char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskey>"};
@ -536,7 +538,8 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "tqClientDebugFlag", tqClientDebugFlag, 0, 255, CFG_SCOPE_CLIENT, CFG_DYN_SERVER));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -589,8 +592,10 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlChildTableName", tsSmlChildTableName, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddString(pCfg, "smlAutoChildTableNameDelimiter", tsSmlAutoChildTableNameDelimiter,
CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
@ -1994,14 +1999,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function
static OptionNameAndVar debugOptions[] = {
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, {"mDebugFlag", &mDebugFlag},
{"wDebugFlag", &wDebugFlag}, {"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag},
{"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag}, {"fsDebugFlag", &fsDebugFlag},
{"udfDebugFlag", &udfDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag},
{"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag},
{"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
{"tqClientDebugFlag", &tqClientDebugFlag},
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag},
{"mDebugFlag", &mDebugFlag}, {"wDebugFlag", &wDebugFlag},
{"azDebugFlag", &azDebugFlag}, {"sDebugFlag", &sDebugFlag},
{"tsdbDebugFlag", &tsdbDebugFlag}, {"tqDebugFlag", &tqDebugFlag},
{"fsDebugFlag", &fsDebugFlag}, {"udfDebugFlag", &udfDebugFlag},
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag},
{"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag},
{"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"metaDebugFlag", &metaDebugFlag}, {"stDebugFlag", &stDebugFlag},
{"sndDebugFlag", &sndDebugFlag}, {"tqClientDebugFlag", &tqClientDebugFlag},
};
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -493,26 +493,6 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
return 0;
}
static int32_t tsdbCacheQueryReseek(void *pQHandle) {
int32_t code = 0;
SCacheRowsReader *pReader = pQHandle;
code = taosThreadMutexTryLock(&pReader->readerMutex);
if (code == 0) {
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
code = TSDB_CODE_VND_QUERY_BUSY;
(void)taosThreadMutexUnlock(&pReader->readerMutex);
return code;
} else if (code == EBUSY) {
return TSDB_CODE_VND_QUERY_BUSY;
} else {
return -1;
}
}
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
bool deleted = false;
while (*iSkyline > 0) {
@ -545,71 +525,73 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
}
// Get next non-deleted row from imem
static int32_t tsdbImemGetNextRow(SMemTable *imem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow, SArray *pSkyline,
int64_t iSkyline) {
static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
// tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
if (pMemRow) {
// if non deleted, return the found row.
TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline);
if (tsdbTbDataIterNext(pTbIter)) {
TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
if (!deleted) {
*ppRow = pMemRow;
TAOS_RETURN(code);
return pMemRow;
}
} else {
TAOS_RETURN(code);
}
// continue to find the non-deleted first row from imem
TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
TAOS_RETURN(code);
return NULL;
}
// Get first non-deleted row from imem
static int32_t tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter, TSDBROW **ppRow,
SArray *pSkyline, int64_t iSkyline) {
static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, TABLEID tid, STbDataIter *pTbIter,
SArray *pSkyline, int64_t *piSkyline) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = imem->pTsdb;
tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter);
TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter);
if (pMemRow) {
// if non deleted, return the found row.
TSDBKEY rowKey = TSDBROW_KEY(pMemRow);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, &iSkyline);
bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline);
if (!deleted) {
*ppRow = pMemRow;
TAOS_RETURN(code);
return pMemRow;
}
} else {
TAOS_RETURN(code);
return NULL;
}
// continue to find the non-deleted first row from imem, using get next row
TAOS_CHECK_GOTO(tsdbImemGetNextRow(imem, tid, pTbIter, ppRow, pSkyline, iSkyline), &lino, _exit);
return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
if (suid > 0 && suid == pRCache->suid) {
pRCache->sver = -1;
pRCache->suid = -1;
}
if (suid == 0 && uid == pRCache->uid) {
pRCache->sver = -1;
pRCache->uid = -1;
}
}
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (pRCache->pTSchema && sver == pRCache->sver) {
if (suid > 0 && suid == pRCache->suid) {
return 0;
}
if (suid == 0 && uid == pRCache->uid) {
return 0;
}
}
TAOS_RETURN(code);
pRCache->suid = suid;
pRCache->uid = uid;
pRCache->sver = sver;
tDestroyTSchema(pRCache->pTSchema);
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
}
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) {
@ -620,8 +602,12 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray)
SArray *pTombData = NULL;
SArray *pSkyline = NULL;
int64_t iSkyline = 0;
STbDataIter iter = {0};
STbDataIter tbIter = {0};
TSDBROW *pMemRow = NULL;
STSchema *pTSchema = NULL;
SSHashObj *iColHash = NULL;
int32_t sver;
int32_t nCol;
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
@ -635,16 +621,99 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray)
iSkyline = taosArrayGetSize(pSkyline) - 1;
}
TAOS_CHECK_GOTO(tsdbImemGetFirstRow(imem, pIMem, tid, &iter, &pMemRow, pSkyline, iSkyline), &lino, _exit);
pMemRow = tsdbImemGetFirstRow(imem, pIMem, tid, &tbIter, pSkyline, &iSkyline);
if (!pMemRow) {
goto _exit;
}
// iter first row to last_row/last col values to ctxArray, and mark last null col ids
sver = TSDBROW_SVERSION(pMemRow);
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, tid.suid, tid.uid, sver), &lino, _exit);
pTSchema = pTsdb->rCache.pTSchema;
nCol = pTSchema->numOfCols;
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pMemRow, &tsdbRowKey);
STSDBRowIter iter = {0};
TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, pMemRow, pTSchema), &lino, _exit);
int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
if (COL_VAL_IS_VALUE(pColVal)) {
updateCtx.lflag = LFLAG_LAST;
if (!taosArrayPush(ctxArray, &updateCtx)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
} else {
if (!iColHash) {
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (iColHash == NULL) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
}
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
tsdbRowClose(&iter);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
}
}
tsdbRowClose(&iter);
// continue to get next row to fill null last col values
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
while (pMemRow) {
if (tSimpleHashGetSize(iColHash) == 0) {
break;
}
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pMemRow, &tsdbRowKey);
void *pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) {
int32_t iCol = ((int32_t *)pIte)[0];
SColVal colVal = COL_VAL_NONE(0, 0);
tsdbRowGetColVal(pMemRow, pTSchema, iCol, &colVal);
if (COL_VAL_IS_VALUE(&colVal)) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal};
if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit);
}
code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
if (code != TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
__LINE__, tstrerror(code));
}
}
}
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
// destroy any allocated resource
tSimpleHashCleanup(iColHash);
if (pMemDelData) {
taosArrayDestroy(pMemDelData);
}
if (pSkyline) {
taosArrayDestroy(pSkyline);
}
TAOS_RETURN(code);
@ -685,13 +754,9 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit);
}
// 3, update cols into lru
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit);
taosArrayClear(ctxArray);
}
_exit:
@ -714,11 +779,13 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
// flush dirty data of lru into rocks with
// 4, and update when writing if !updateCacheBatch
code = tsdbCacheUpdateFromIMem(pTsdb);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
if (tsUpdateCacheBatch) {
code = tsdbCacheUpdateFromIMem(pTsdb);
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
TAOS_RETURN(code);
TAOS_RETURN(code);
}
}
char *err = NULL;
@ -1527,38 +1594,6 @@ _exit:
TAOS_RETURN(code);
}
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return;
if (suid > 0 && suid == pRCache->suid) {
pRCache->sver = -1;
pRCache->suid = -1;
}
if (suid == 0 && uid == pRCache->uid) {
pRCache->sver = -1;
pRCache->uid = -1;
}
}
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
SRocksCache *pRCache = &pTsdb->rCache;
if (pRCache->pTSchema && sver == pRCache->sver) {
if (suid > 0 && suid == pRCache->suid) {
return 0;
}
if (suid == 0 && uid == pRCache->uid) {
return 0;
}
}
pRCache->suid = suid;
pRCache->uid = uid;
pRCache->sver = sver;
tDestroyTSchema(pRCache->pTSchema);
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
}
int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow,
SRow **aRow) {
int32_t code = 0, lino = 0;

View File

@ -674,7 +674,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
pTbData->maxKey = key.key.ts;
}
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
@ -736,7 +736,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
if (key.key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.key.ts;
}
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
}