diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90465f611e..e6333d2ddc 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -85,7 +85,7 @@ extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; extern int8_t tsMemPoolFullFunc; -//extern int32_t tsQueryBufferPoolSize; +// extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; extern int8_t tsNeedTrim; @@ -283,7 +283,7 @@ extern int32_t tsS3MigrateIntervalSec; extern bool tsS3MigrateEnabled; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; - +extern bool tsUpdateCacheBatch; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern int tsStreamAggCnt; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3cee1cd7de..8529c5b690 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -329,6 +329,8 @@ bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; +bool tsUpdateCacheBatch = true; + int8_t tsS3EpNum = 0; char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f39da72507..29248e360a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,6 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); +int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func); // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); @@ -335,7 +336,6 @@ struct STsdbFS { typedef struct { rocksdb_t *db; rocksdb_comparator_t *my_comparator; - rocksdb_cache_t *blockcache; rocksdb_block_based_table_options_t *tableoptions; rocksdb_options_t *options; rocksdb_flushoptions_t *flushoptions; @@ -347,6 +347,7 @@ typedef struct { tb_uid_t suid; tb_uid_t uid; STSchema *pTSchema; + SArray *ctxArray; } SRocksCache; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2cef541cdb..e6dbf5e822 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -172,9 +172,6 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); - pTsdb->rCache.blockcache = cache; - rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create(); pTsdb->rCache.tableoptions = tableoptions; @@ -185,7 +182,6 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_comparator(options, cmp); - rocksdb_block_based_options_set_block_cache(tableoptions, cache); rocksdb_options_set_block_based_table_factory(options, tableoptions); rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL // rocksdb_options_set_inplace_update_support(options, 1); @@ -234,9 +230,15 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.suid = -1; pTsdb->rCache.uid = -1; pTsdb->rCache.pTSchema = NULL; + pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx)); + if (!pTsdb->rCache.ctxArray) { + TAOS_CHECK_GOTO(terrno, &lino, _err7); + } TAOS_RETURN(code); +_err7: + (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex); _err6: rocksdb_writebatch_destroy(writebatch); _err5: @@ -248,7 +250,6 @@ _err3: _err2: rocksdb_options_destroy(options); rocksdb_block_based_options_destroy(tableoptions); - rocksdb_cache_destroy(cache); _err: rocksdb_comparator_destroy(cmp); @@ -264,9 +265,9 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); - rocksdb_cache_destroy(pTsdb->rCache.blockcache); rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); taosMemoryFree(pTsdb->rCache.pTSchema); + taosArrayDestroy(pTsdb->rCache.ctxArray); } static void rocksMayWrite(STsdb *pTsdb, bool force) { @@ -491,10 +492,283 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { return 0; } +static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { + bool deleted = false; + while (*iSkyline > 0) { + TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline); + TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1); + + if (key->ts > pItemBack->ts) { + return false; + } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { + if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) { + // if (key->version <= pItemFront->version || key->version <= pItemBack->version) { + return true; + } else { + if (*iSkyline > 1) { + --*iSkyline; + } else { + return false; + } + } + } else { + if (*iSkyline > 1) { + --*iSkyline; + } else { + return false; + } + } + } + + return deleted; +} + +// Get next non-deleted row from imem +static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) { + int32_t code = 0; + + if (tsdbTbDataIterNext(pTbIter)) { + TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); + TSDBKEY rowKey = TSDBROW_KEY(pMemRow); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); + if (!deleted) { + return pMemRow; + } + } + + return NULL; +} + +// Get first non-deleted row from imem +static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline, + int64_t *piSkyline) { + int32_t code = 0; + + tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter); + TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); + if (pMemRow) { + // if non deleted, return the found row. + TSDBKEY rowKey = TSDBROW_KEY(pMemRow); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); + if (!deleted) { + return pMemRow; + } + } else { + return NULL; + } + + // continue to find the non-deleted first row from imem, using get next row + return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline); +} + +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; + + if (suid > 0 && suid == pRCache->suid) { + pRCache->sver = -1; + pRCache->suid = -1; + } + if (suid == 0 && uid == pRCache->uid) { + pRCache->sver = -1; + pRCache->uid = -1; + } +} + +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (pRCache->pTSchema && sver == pRCache->sver) { + if (suid > 0 && suid == pRCache->suid) { + return 0; + } + if (suid == 0 && uid == pRCache->uid) { + return 0; + } + } + + pRCache->suid = suid; + pRCache->uid = uid; + pRCache->sver = sver; + tDestroyTSchema(pRCache->pTSchema); + return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema); +} + +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray); + +int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) { + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = imem->pTsdb; + SArray *pMemDelData = NULL; + SArray *pSkyline = NULL; + int64_t iSkyline = 0; + STbDataIter tbIter = {0}; + TSDBROW *pMemRow = NULL; + STSchema *pTSchema = NULL; + SSHashObj *iColHash = NULL; + int32_t sver; + int32_t nCol; + SArray *ctxArray = pTsdb->rCache.ctxArray; + STsdbRowKey tsdbRowKey = {0}; + + STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid); + + // load imem tomb data and build skyline + TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit); + + // tsdbBuildDeleteSkyline + size_t delSize = TARRAY_SIZE(pMemDelData); + if (delSize > 0) { + pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + if (!pSkyline) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline)); + iSkyline = taosArrayGetSize(pSkyline) - 1; + } + + pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline); + if (!pMemRow) { + goto _exit; + } + + // iter first row to last_row/last col values to ctxArray, and mark last null col ids + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); + pTSchema = pTsdb->rCache.pTSchema; + nCol = pTSchema->numOfCols; + + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema)); + + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + + if (COL_VAL_IS_VALUE(pColVal)) { + updateCtx.lflag = LFLAG_LAST; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); + if (iColHash == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) { + TAOS_CHECK_EXIT(terrno); + } + } + } + tsdbRowClose(&iter); + + // continue to get next row to fill null last col values + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + while (pMemRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver)); + pTSchema = pTsdb->rCache.pTSchema; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema)); + + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid))); + } + } + tsdbRowClose(&iter); + + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + } + + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + + tsdbRowClose(&iter); + } + + taosArrayClear(ctxArray); + // destroy any allocated resource + tSimpleHashCleanup(iColHash); + if (pMemDelData) { + taosArrayDestroy(pMemDelData); + } + if (pSkyline) { + taosArrayDestroy(pSkyline); + } + + TAOS_RETURN(code); +} + +static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { + if (!pTsdb) return 0; + if (!pTsdb->imem) return 0; + + int32_t code = 0; + int32_t lino = 0; + SMemTable *imem = pTsdb->imem; + int32_t nTbData = imem->nTbData; + int64_t nRow = imem->nRow; + int64_t nDel = imem->nDel; + + if (nRow == 0 || nTbData == 0) return 0; + + TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem)); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel); + } + + TAOS_RETURN(code); +} + int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; - char *err = NULL; + // 0, tsdbCacheUpdateFromIMem if updateCacheBatch + // flush dirty data of lru into rocks + // 4, and update when writing if !updateCacheBatch + // 5, merge cache & mem if updateCacheBatch + + if (tsUpdateCacheBatch) { + code = tsdbCacheUpdateFromIMem(pTsdb); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + + TAOS_RETURN(code); + } + } + + char *err = NULL; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; @@ -1300,97 +1574,55 @@ _exit: TAOS_RETURN(code); } -void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; - - if (suid > 0 && suid == pRCache->suid) { - pRCache->sver = -1; - pRCache->suid = -1; - } - if (suid == 0 && uid == pRCache->uid) { - pRCache->sver = -1; - pRCache->uid = -1; - } -} - -static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (pRCache->pTSchema && sver == pRCache->sver) { - if (suid > 0 && suid == pRCache->suid) { - return 0; - } - if (suid == 0 && uid == pRCache->uid) { - return 0; - } - } - - pRCache->suid = suid; - pRCache->uid = uid; - pRCache->sver = sver; - tDestroyTSchema(pRCache->pTSchema); - return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); -} - int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow) { int32_t code = 0, lino = 0; // 1. prepare last - TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(&lRow); - SArray *ctxArray = NULL; - SSHashObj *iColHash = NULL; + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); + SSHashObj *iColHash = NULL; + STSDBRowIter iter = {0}; TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); pTSchema = pTsdb->rCache.pTSchema; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; - - ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); - if (ctxArray == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); - } + SArray *ctxArray = pTsdb->rCache.ctxArray; // 1. prepare by lrow STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(&lRow, &tsdbRowKey); - STSDBRowIter iter = {0}; TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } if (COL_VAL_IS_VALUE(pColVal)) { updateCtx.lflag = LFLAG_LAST; if (!taosArrayPush(ctxArray, &updateCtx)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } } else { if (!iColHash) { iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (iColHash == NULL) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } } if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } } } - tsdbRowClose(&iter); // 2. prepare by the other rows for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) { @@ -1424,32 +1656,28 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } } - // 3. do update - code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); - if (code < TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - } + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); _exit: if (code) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); } - taosArrayDestroy(ctxArray); + tsdbRowClose(&iter); tSimpleHashCleanup(iColHash); + taosArrayClear(ctxArray); TAOS_RETURN(code); } int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { - int32_t code = 0, lino = 0; + int32_t code = 0, lino = 0; + STSDBRowIter iter = {0}; + STSchema *pTSchema = NULL; + SArray *ctxArray = NULL; TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1); - - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(&lRow); - SArray *ctxArray = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema)); @@ -1500,29 +1728,18 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo } // 2. prepare last row - STSDBRowIter iter = {0}; - code = tsdbRowIterOpen(&iter, &lRow, pTSchema); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); - } + TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { TAOS_CHECK_GOTO(terrno, &lino, _exit); } } - tsdbRowClose(&iter); - // 3. do update - code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); - if (code != TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - } + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); _exit: + tsdbRowClose(&iter); taosMemoryFreeClear(pTSchema); taosArrayDestroy(ctxArray); @@ -1789,8 +2006,9 @@ _exit: TAOS_RETURN(code); } -int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; +static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, + int8_t ltype, SArray *keyArray) { + int32_t code = 0, lino = 0; SArray *remainCols = NULL; SArray *ignoreFromRocks = NULL; SLRUCache *pCache = pTsdb->lruCache; @@ -1811,6 +2029,10 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; } + if (!taosArrayPush(keyArray, &key)) { + TAOS_CHECK_EXIT(terrno); + } + LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN); SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { @@ -1914,6 +2136,492 @@ _exit: TAOS_RETURN(code); } +typedef enum SMEMNEXTROWSTATES { + SMEMNEXTROW_ENTER, + SMEMNEXTROW_NEXT, +} SMEMNEXTROWSTATES; + +typedef struct SMemNextRowIter { + SMEMNEXTROWSTATES state; + STbData *pMem; // [input] + STbDataIter iter; // mem buffer skip list iterator + int64_t lastTs; +} SMemNextRowIter; + +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { + SMemNextRowIter *state = (SMemNextRowIter *)iter; + int32_t code = 0; + *pIgnoreEarlierTs = false; + switch (state->state) { + case SMEMNEXTROW_ENTER: { + if (state->pMem != NULL) { + /* + if (state->pMem->maxKey <= state->lastTs) { + *ppRow = NULL; + *pIgnoreEarlierTs = true; + + TAOS_RETURN(code); + } + */ + tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); + + TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); + if (pMemRow) { + *ppRow = pMemRow; + state->state = SMEMNEXTROW_NEXT; + + TAOS_RETURN(code); + } + } + + *ppRow = NULL; + + TAOS_RETURN(code); + } + case SMEMNEXTROW_NEXT: + if (tsdbTbDataIterNext(&state->iter)) { + *ppRow = tsdbTbDataIterGet(&state->iter); + + TAOS_RETURN(code); + } else { + *ppRow = NULL; + + TAOS_RETURN(code); + } + default: + break; + } + +_err: + *ppRow = NULL; + + TAOS_RETURN(code); +} + +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols); +typedef int32_t (*_next_row_clear_fn_t)(void *iter); + +typedef struct { + TSDBROW *pRow; + bool stop; + bool next; + bool ignoreEarlierTs; + void *iter; + _next_row_fn_t nextRowFn; + _next_row_clear_fn_t nextRowClearFn; +} TsdbNextRowState; + +typedef struct { + SArray *pMemDelData; + SArray *pSkyline; + int64_t iSkyline; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + TSDBROW memRow, imemRow; + TsdbNextRowState input[2]; + SCacheRowsReader *pr; + STsdb *pTsdb; +} MemNextRowIter; + +static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) { + int32_t code = 0, lino = 0; + + STbData *pMem = NULL; + if (pReadSnap->pMem) { + pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid); + } + + STbData *pIMem = NULL; + if (pReadSnap->pIMem) { + pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid); + } + + pIter->pTsdb = pTsdb; + + pIter->pMemDelData = NULL; + + TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit); + + pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; + + pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; + pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; + + if (pMem) { + pIter->memState.pMem = pMem; + pIter->memState.state = SMEMNEXTROW_ENTER; + pIter->input[0].stop = false; + pIter->input[0].next = true; + } + + if (pIMem) { + pIter->imemState.pMem = pIMem; + pIter->imemState.state = SMEMNEXTROW_ENTER; + pIter->input[1].stop = false; + pIter->input[1].next = true; + } + + pIter->pr = pr; + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + TAOS_RETURN(code); +} + +static void memRowIterClose(MemNextRowIter *pIter) { + for (int i = 0; i < 2; ++i) { + if (pIter->input[i].nextRowClearFn) { + (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter); + } + } + + if (pIter->pSkyline) { + taosArrayDestroy(pIter->pSkyline); + } + + if (pIter->pMemDelData) { + taosArrayDestroy(pIter->pMemDelData); + } +} + +static void freeTableInfoFunc(void *param) { + void **p = (void **)param; + taosMemoryFreeClear(*p); +} + +static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { + if (!pReader->pTableMap) { + pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (!pReader->pTableMap) { + return NULL; + } + + tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); + } + + STableLoadInfo *pInfo = NULL; + STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); + if (!ppInfo) { + pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); + if (pInfo) { + if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { + return NULL; + } + } + + return pInfo; + } + + return *ppInfo; +} + +static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) { + int32_t code = 0, lino = 0; + + for (;;) { + for (int i = 0; i < 2; ++i) { + if (pIter->input[i].next && !pIter->input[i].stop) { + TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, + &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols), + &lino, _exit); + + if (pIter->input[i].pRow == NULL) { + pIter->input[i].stop = true; + pIter->input[i].next = false; + } + } + } + + if (pIter->input[0].stop && pIter->input[1].stop) { + return NULL; + } + + TSDBROW *max[2] = {0}; + int iMax[2] = {-1, -1}; + int nMax = 0; + SRowKey maxKey = {.ts = TSKEY_MIN}; + + for (int i = 0; i < 2; ++i) { + if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey); + + // merging & deduplicating on client side + int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key); + if (c <= 0) { + if (c < 0) { + nMax = 0; + maxKey = tsdbRowKey.key; + } + + iMax[nMax] = i; + max[nMax++] = pIter->input[i].pRow; + } + pIter->input[i].next = false; + } + } + + TSDBROW *merge[2] = {0}; + int iMerge[2] = {-1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey1 = TSDBROW_KEY(max[i]); + + if (!pIter->pSkyline) { + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno); + + uint64_t uid = pIter->idx.uid; + STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid); + TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY); + + if (pInfo->pTombData == NULL) { + pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); + TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno); + } + + if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + + size_t delSize = TARRAY_SIZE(pInfo->pTombData); + if (delSize > 0) { + code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; + } + + bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline); + if (!deleted) { + iMerge[nMerge] = iMax[i]; + merge[nMerge++] = max[i]; + } + + pIter->input[iMax[i]].next = deleted; + } + + if (nMerge > 0) { + pIter->input[iMerge[0]].next = true; + + return merge[0]; + } + } + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + return NULL; +} + +static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { + int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; + *ppDst = taosMemoryMalloc(len); + if (NULL == *ppDst) { + TAOS_RETURN(terrno); + } + memcpy(*ppDst, pSrc, len); + + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { + if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { + TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); + } + + if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { + TAOS_RETURN(TSDB_CODE_SUCCESS); + } + + taosMemoryFreeClear(pReader->pCurrSchema); + TAOS_RETURN( + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); +} + +static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, + SArray *keyArray) { + int32_t code = 0; + int32_t lino = 0; + STSchema *pTSchema = pr->pSchema; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int numKeys = TARRAY_SIZE(pCidList); + MemNextRowIter iter = {0}; + SSHashObj *iColHash = NULL; + + // 1, get from mem, imem filtered with delete info + TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr)); + + TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0); + if (!pRow) { + goto _exit; + } + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid)); + + pTSchema = pr->pCurrSchema; + } + int32_t nCol = pTSchema->numOfCols; + + STsdbRowKey rowKey = {0}; + tsdbRowGetKey(pRow, &rowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray); + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol]; + if (pColVal->cid < pTargetCol->colVal.cid) { + pColVal = tsdbRowIterNext(&rowIter), ++iCol; + + continue; + } + if (pColVal->cid > pTargetCol->colVal.cid) { + break; + } + + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key); + if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) { + if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { + SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (COL_VAL_IS_VALUE(pColVal)) { + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); + if (iColHash == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) { + TAOS_CHECK_EXIT(terrno); + } + } + } + + ++jCol; + + if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) { + pColVal = tsdbRowIterNext(&rowIter), ++iCol; + } + } + tsdbRowClose(&rowIter); + + if (iColHash && tSimpleHashGetSize(iColHash) > 0) { + pRow = memRowIterGet(&iter, false, NULL, 0); + while (pRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid)); + + pTSchema = pr->pCurrSchema; + } + nCol = pTSchema->numOfCols; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pRow, &tsdbRowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol; + pColVal = tsdbRowIterNext(&rowIter), iCol++) { + int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)); + if (pjCol && COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol]; + + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key); + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, *pjCol, &lastCol); + } + + TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid))); + } + } + tsdbRowClose(&rowIter); + + pRow = memRowIterGet(&iter, false, NULL, 0); + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + + tsdbRowClose(&rowIter); + } + + tSimpleHashCleanup(iColHash); + + memRowIterClose(&iter); + + TAOS_RETURN(code); +} + +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + int32_t lino = 0; + + SArray *keyArray = taosArrayInit(16, sizeof(SLastKey)); + if (!keyArray) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray)); + + if (tsUpdateCacheBatch) { + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray)); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } + + if (keyArray) { + taosArrayDestroy(keyArray); + } + + TAOS_RETURN(code); +} + int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0, lino = 0; // fetch schema @@ -2159,37 +2867,6 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { TAOS_RETURN(code); } -static void freeTableInfoFunc(void *param) { - void **p = (void **)param; - taosMemoryFreeClear(*p); -} - -static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { - if (!pReader->pTableMap) { - pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (!pReader->pTableMap) { - return NULL; - } - - tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); - } - - STableLoadInfo *pInfo = NULL; - STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); - if (!ppInfo) { - pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); - if (pInfo) { - if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { - return NULL; - } - } - - return pInfo; - } - - return *ppInfo; -} - static uint64_t *getUidList(SCacheRowsReader *pReader) { if (!pReader->uidList) { int32_t numOfTables = pReader->numOfTables; @@ -2774,114 +3451,6 @@ _err: TAOS_RETURN(code); } -typedef enum SMEMNEXTROWSTATES { - SMEMNEXTROW_ENTER, - SMEMNEXTROW_NEXT, -} SMEMNEXTROWSTATES; - -typedef struct SMemNextRowIter { - SMEMNEXTROWSTATES state; - STbData *pMem; // [input] - STbDataIter iter; // mem buffer skip list iterator - int64_t lastTs; -} SMemNextRowIter; - -static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols) { - SMemNextRowIter *state = (SMemNextRowIter *)iter; - int32_t code = 0; - *pIgnoreEarlierTs = false; - switch (state->state) { - case SMEMNEXTROW_ENTER: { - if (state->pMem != NULL) { - /* - if (state->pMem->maxKey <= state->lastTs) { - *ppRow = NULL; - *pIgnoreEarlierTs = true; - - TAOS_RETURN(code); - } - */ - tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); - - TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); - if (pMemRow) { - *ppRow = pMemRow; - state->state = SMEMNEXTROW_NEXT; - - TAOS_RETURN(code); - } - } - - *ppRow = NULL; - - TAOS_RETURN(code); - } - case SMEMNEXTROW_NEXT: - if (tsdbTbDataIterNext(&state->iter)) { - *ppRow = tsdbTbDataIterGet(&state->iter); - - TAOS_RETURN(code); - } else { - *ppRow = NULL; - - TAOS_RETURN(code); - } - default: - break; - } - -_err: - *ppRow = NULL; - - TAOS_RETURN(code); -} - -static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { - bool deleted = false; - while (*iSkyline > 0) { - TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline); - TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1); - - if (key->ts > pItemBack->ts) { - return false; - } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { - if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) { - // if (key->version <= pItemFront->version || key->version <= pItemBack->version) { - return true; - } else { - if (*iSkyline > 1) { - --*iSkyline; - } else { - return false; - } - } - } else { - if (*iSkyline > 1) { - --*iSkyline; - } else { - return false; - } - } - } - - return deleted; -} - -typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols); -typedef int32_t (*_next_row_clear_fn_t)(void *iter); - -typedef struct { - TSDBROW *pRow; - bool stop; - bool next; - bool ignoreEarlierTs; - void *iter; - _next_row_fn_t nextRowFn; - _next_row_clear_fn_t nextRowClearFn; -} TsdbNextRowState; - typedef struct CacheNextRowIter { SArray *pMemDelData; SArray *pSkyline; @@ -3160,7 +3729,6 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI } _err: - if (code) { tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); } @@ -3187,31 +3755,6 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, TAOS_RETURN(TSDB_CODE_SUCCESS); } -static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { - int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; - *ppDst = taosMemoryMalloc(len); - if (NULL == *ppDst) { - TAOS_RETURN(terrno); - } - memcpy(*ppDst, pSrc, len); - - TAOS_RETURN(TSDB_CODE_SUCCESS); -} - -static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { - if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { - TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); - } - - if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { - TAOS_RETURN(TSDB_CODE_SUCCESS); - } - - taosMemoryFreeClear(pReader->pCurrSchema); - TAOS_RETURN( - metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { int32_t code = 0, lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 5b26d17519..aaf48924e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -325,6 +325,27 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t * taosRUnLockLatch(&pMemTable->latch); } +typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid); + +int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) { + int32_t code = 0; + __tsdb_cache_update cb = (__tsdb_cache_update)func; + + for (int32_t i = 0; i < pMemTable->nBucket; ++i) { + STbData *pTbData = pMemTable->aBucket[i]; + while (pTbData) { + code = (*cb)(pMemTable, pTbData->suid, pTbData->uid); + if (code) { + TAOS_RETURN(code); + } + + pTbData = pTbData->next; + } + } + + return code; +} + static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { int32_t code = 0; @@ -659,7 +680,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) { tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64, TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version); @@ -721,7 +742,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.key.ts >= pTbData->maxKey) { pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow)); } diff --git a/tests/army/output.txt b/tests/army/output.txt deleted file mode 100644 index ed3bd5da1a..0000000000 --- a/tests/army/output.txt +++ /dev/null @@ -1,91 +0,0 @@ -[10/28 19:12:21.666563] SUCC: created database (db_sub) -[10/28 19:12:21.694603] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:12:21.823202] SUCC: Spent 0.1290 seconds to create 1000 table(s) with 8 thread(s) speed: 7752 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:12:22.127442] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 441047.79 records/second -[10/28 19:12:22.128649] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 440895.33 records/second -[10/28 19:12:22.129478] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 440151.69 records/second -[10/28 19:12:22.133756] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 433268.05 records/second -[10/28 19:12:22.135211] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 430329.63 records/second -[10/28 19:12:22.137335] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 425800.08 records/second -[10/28 19:12:22.138252] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 426330.15 records/second -[10/28 19:12:22.141351] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 422778.64 records/second -[10/28 19:12:22.141585] SUCC: Spent 0.311648 (real 0.289041) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3208748.33 (real 3459716.79) records/second -[10/28 19:12:22.141590] SUCC: insert delay, min: 0.9600ms, avg: 2.3123ms, p90: 3.1790ms, p95: 3.5080ms, p99: 4.2230ms, max: 4.9040ms -[10/28 19:28:50.798427] SUCC: created database (db_sub) -[10/28 19:28:50.828326] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:28:50.936429] SUCC: Spent 0.1080 seconds to create 1000 table(s) with 8 thread(s) speed: 9259 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:28:51.187235] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 539204.48 records/second -[10/28 19:28:51.189941] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 532329.43 records/second -[10/28 19:28:51.191551] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 530954.66 records/second -[10/28 19:28:51.191858] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 529259.59 records/second -[10/28 19:28:51.192459] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 530229.44 records/second -[10/28 19:28:51.195372] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 522099.42 records/second -[10/28 19:28:51.197727] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 516620.72 records/second -[10/28 19:28:51.197883] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 517125.12 records/second -[10/28 19:28:51.198123] SUCC: Spent 0.255536 (real 0.237135) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3913342.93 (real 4217007.19) records/second -[10/28 19:28:51.198130] SUCC: insert delay, min: 0.9200ms, avg: 1.8971ms, p90: 2.6870ms, p95: 2.9520ms, p99: 3.5880ms, max: 4.0710ms -[10/28 19:31:44.377691] SUCC: created database (db_sub) -[10/28 19:31:44.392998] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:31:44.696768] SUCC: Spent 0.3040 seconds to create 1000 table(s) with 8 thread(s) speed: 3289 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:31:45.126910] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 304775.47 records/second -[10/28 19:31:45.131979] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 301117.75 records/second -[10/28 19:31:45.135106] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 299854.39 records/second -[10/28 19:31:45.135675] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 298322.24 records/second -[10/28 19:31:45.137069] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 297733.89 records/second -[10/28 19:31:45.137952] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 296900.13 records/second -[10/28 19:31:45.138834] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 295170.54 records/second -[10/28 19:31:45.145048] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 291966.71 records/second -[10/28 19:31:45.145369] SUCC: Spent 0.442506 (real 0.419200) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2259856.36 (real 2385496.18) records/second -[10/28 19:31:45.145377] SUCC: insert delay, min: 1.0400ms, avg: 3.3536ms, p90: 5.3120ms, p95: 7.9660ms, p99: 13.1570ms, max: 19.1410ms -[10/28 19:44:19.873056] SUCC: created database (db_sub) -[10/28 19:44:19.904701] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:44:20.053846] SUCC: Spent 0.1490 seconds to create 1000 table(s) with 8 thread(s) speed: 6711 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:44:20.328698] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 485742.49 records/second -[10/28 19:44:20.330777] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 481686.29 records/second -[10/28 19:44:20.331290] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 480911.65 records/second -[10/28 19:44:20.331665] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 481043.06 records/second -[10/28 19:44:20.333451] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 477172.09 records/second -[10/28 19:44:20.334745] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 475675.84 records/second -[10/28 19:44:20.335056] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 474158.37 records/second -[10/28 19:44:20.337919] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 470816.89 records/second -[10/28 19:44:20.338144] SUCC: Spent 0.277921 (real 0.261310) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3598144.80 (real 3826872.30) records/second -[10/28 19:44:20.338153] SUCC: insert delay, min: 0.9180ms, avg: 2.0905ms, p90: 2.6490ms, p95: 3.0620ms, p99: 4.1480ms, max: 4.7840ms -[10/28 19:58:27.100989] SUCC: created database (db_sub) -[10/28 19:58:27.115572] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:58:27.362948] SUCC: Spent 0.2470 seconds to create 1000 table(s) with 8 thread(s) speed: 4049 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:58:27.807669] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 291891.03 records/second -[10/28 19:58:27.818785] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 285413.54 records/second -[10/28 19:58:27.819649] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 284193.61 records/second -[10/28 19:58:27.819844] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 284352.64 records/second -[10/28 19:58:27.820170] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 284576.63 records/second -[10/28 19:58:27.821489] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 283781.33 records/second -[10/28 19:58:27.822061] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 283112.24 records/second -[10/28 19:58:27.823513] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 282730.59 records/second -[10/28 19:58:27.823779] SUCC: Spent 0.455783 (real 0.438625) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2194026.54 (real 2279851.81) records/second -[10/28 19:58:27.823786] SUCC: insert delay, min: 0.9780ms, avg: 3.5090ms, p90: 5.5650ms, p95: 6.8600ms, p99: 10.6010ms, max: 13.4400ms -[10/28 20:00:06.417182] SUCC: created database (db_sub) -[10/28 20:00:06.448202] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 20:00:06.596961] SUCC: Spent 0.1480 seconds to create 1000 table(s) with 8 thread(s) speed: 6757 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 20:00:06.895455] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 443978.76 records/second -[10/28 20:00:06.896986] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 442549.94 records/second -[10/28 20:00:06.897536] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 440927.99 records/second -[10/28 20:00:06.898905] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 439131.15 records/second -[10/28 20:00:06.899024] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 439628.46 records/second -[10/28 20:00:06.901861] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 435197.37 records/second -[10/28 20:00:06.902305] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 434812.86 records/second -[10/28 20:00:06.904698] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 433406.26 records/second -[10/28 20:00:06.904905] SUCC: Spent 0.301788 (real 0.284949) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3313584.37 (real 3509399.93) records/second -[10/28 20:00:06.904912] SUCC: insert delay, min: 0.8770ms, avg: 2.2796ms, p90: 3.1340ms, p95: 3.6480ms, p99: 4.8280ms, max: 6.0880ms -[10/28 20:05:34.756207] SUCC: created database (db_sub) -[10/28 20:05:34.784793] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 20:05:34.927068] SUCC: Spent 0.1430 seconds to create 1000 table(s) with 8 thread(s) speed: 6993 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 20:05:35.213741] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 466952.82 records/second -[10/28 20:05:35.215403] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 463804.68 records/second -[10/28 20:05:35.221132] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 453322.31 records/second -[10/28 20:05:35.221224] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 453671.11 records/second -[10/28 20:05:35.222003] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 452641.07 records/second -[10/28 20:05:35.222536] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 451796.89 records/second -[10/28 20:05:35.223663] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 449643.52 records/second -[10/28 20:05:35.225246] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 447768.68 records/second -[10/28 20:05:35.225659] SUCC: Spent 0.290871 (real 0.274808) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3437950.16 (real 3638904.25) records/second -[10/28 20:05:35.225666] SUCC: insert delay, min: 0.9360ms, avg: 2.1985ms, p90: 2.9290ms, p95: 3.4580ms, p99: 4.6030ms, max: 6.2660ms diff --git a/tests/script/api/last-query-ws.cpp b/tests/script/api/last-query-ws.cpp new file mode 100644 index 0000000000..d81ed5bc5e --- /dev/null +++ b/tests/script/api/last-query-ws.cpp @@ -0,0 +1,212 @@ +// g++ --std=c++17 -o multiQueryLastrow multiQueryLastrow.cpp -ltaos -lpthread -ltaosws + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "taos.h" +#include "taosws.h" + +int numThreads = 5; +int numQuerys = 100; +int queryType = 0; +int numConnections = 1; +bool useWebSocket = 0; + +using namespace std; + +const std::string dbName = "iot"; +const std::string sTableName = "m"; +int maxTableIndex = 50000; + +std::mutex mtx; +std::condition_variable cv; +vector taosArray; +vector wtaosArray; + +std::atomic finishCounter; +std::chrono::system_clock::time_point startTime; +std::chrono::system_clock::time_point stopTime; +unordered_map consumeHash; + +static void query(int numQuerys, int id, int type); + +void threadFunction(int id) { + // std::unique_lock lock(mtx); + // cv.wait(lock); + + // lock.unlock(); + + //auto startQueryTime = std::chrono::system_clock::now(); + + query(numQuerys, id, queryType); + + //consumeHash[id] = std::chrono::system_clock::now() - startQueryTime; + + // int counter = finishCounter.fetch_add(1); + // if (counter == numThreads - 1) { + // stopTime = std::chrono::system_clock::now(); + // } +} + +void createThreads(const int numThreads, std::vector* pThreads) { + for (int i = 0; i < numThreads; ++i) { + pThreads->emplace_back(threadFunction, i); + } + + std::cout << "2. Threads created\n"; +} + +void connect() { + void* res = NULL; + + for (auto i = 0; i < numConnections; i++) { + if (useWebSocket) { + const char* dsn = "taos+ws://localhost:6041"; + WS_TAOS* wtaos = ws_connect(dsn); + int32_t code = 0; + if (wtaos == NULL) { + code = ws_errno(NULL); + const char* errstr = ws_errstr(NULL); + std::cout << "Connection failed[" << code << "]: " << errstr << "\n"; + return; + } + code = ws_select_db(wtaos, dbName.c_str()); + const char* errstr = ws_errstr(wtaos); + if (code) { + std::cout << "Connection failed on select db[" << code << "]: " << errstr << "\n"; + return; + } + wtaosArray.push_back(wtaos); + } else { + TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", dbName.c_str(), 0); + if (!taos) { + std::cerr << "Failed to connect to TDengine\n"; + return; + } + taosArray.push_back(taos); + } + } + + std::cout << "1. Success to connect to TDengine\n"; +} + +void query(int numQuerys, int id, int type) { + int connIdx = id % numConnections; + + for (int i = 0; i < numQuerys; i++) { + std::string sql; + if (type == 0) { + sql = "select last_row(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex); + } else { + sql = "select first(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex); + } + + if (!useWebSocket) { + TAOS* taos = taosArray[connIdx]; + + TAOS_RES* res = taos_query(taos, sql.c_str()); + if (!res) { + std::cerr << "Failed to query TDengine\n"; + return; + } + + if (taos_errno(res) != 0) { + std::cerr << "Failed to query TDengine since: " << taos_errstr(res) << "\n"; + return; + } + taos_free_result(res); + } else { + WS_TAOS* wtaos = wtaosArray[connIdx]; + + WS_RES* wres = ws_query(wtaos, sql.c_str()); + if (!wres) { + std::cerr << "Failed to query TDengine\n"; + return; + } + + int32_t code = ws_errno(wres); + if (code != 0) { + std::cerr << "Failed to query TDengine since: " << ws_errstr(wres) << "\n"; + return; + } + ws_free_result(wres); + } + } +} + +void printHelp() { + std::cout << "./multiQueryLastrow {numThreads} {numQuerys} {queryType} {numConnections} {useWebSocket}\n"; + exit(-1); +} + +int main(int argc, char* argv[]) { + if (argc != 6) { + printHelp(); + } + + numThreads = atoi(argv[1]); + numQuerys = atoi(argv[2]); + queryType = atoi(argv[3]); + numConnections = atoi(argv[4]); + useWebSocket = atoi(argv[5]); + + std::string queryTypeStr = (queryType == 0) ? "last_row(ts)" : "first(ts)"; + std::cout << "numThreads:" << numThreads << ", queryTimes:" << numQuerys << ", queryType:" << queryTypeStr + << ", numConnections:" << numConnections << ", useWebSocket:" << useWebSocket << "\n"; + + finishCounter.store(0); + + connect(); + + //startTime = std::chrono::system_clock::now(); + + std::vector threads; + createThreads(numThreads, &threads); + + //std::this_thread::sleep_for(std::chrono::seconds(1)); + + std::cout << "3. Start quering\n"; + + startTime = std::chrono::system_clock::now(); + + //cv.notify_all(); + + for (auto& t : threads) { + t.join(); + } + + stopTime = std::chrono::system_clock::now(); + + for (auto& taos : taosArray) { + taos_close(taos); + } + + for (auto& wtaos : wtaosArray) { + ws_close(wtaos); + } + + std::cout << "4. All job done\n"; + + int64_t totalQueryConsumeMs = 0; + for (auto& res : consumeHash) { + totalQueryConsumeMs += res.second.count() /1000000; + } + + std::chrono::nanoseconds elp = stopTime - startTime; + int64_t elpMs = elp.count() / 1000000; + int64_t totalQueryCount = numThreads * numQuerys; + + std::cout << totalQueryCount << " queries finished in " << elpMs << " ms\n"; + std::cout << (float)totalQueryCount * 1000 / elpMs << "q/s\n"; + std::cout << "avg cost:" << totalQueryConsumeMs / totalQueryCount << " ms/q\n"; + + return 0; +}