diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 13826a1a74..ef6ed4af1d 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 9bbda8309f..9a6a5329ae 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index b013d45911..17446d184d 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG main + GIT_TAG 3.0 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90465f611e..e6333d2ddc 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -85,7 +85,7 @@ extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; extern int8_t tsMemPoolFullFunc; -//extern int32_t tsQueryBufferPoolSize; +// extern int32_t tsQueryBufferPoolSize; extern int32_t tsMinReservedMemorySize; extern int64_t tsCurrentAvailMemorySize; extern int8_t tsNeedTrim; @@ -283,7 +283,7 @@ extern int32_t tsS3MigrateIntervalSec; extern bool tsS3MigrateEnabled; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; - +extern bool tsUpdateCacheBatch; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern int tsStreamAggCnt; diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index 788f9b4e29..dba6aa3beb 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -1112,6 +1112,7 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields)); if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) { pStmt->bInfo.needParse = true; + qDestroyStmtDataBlock(*pDataBlock); if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) { tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_APP_ERROR); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3cee1cd7de..8529c5b690 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -329,6 +329,8 @@ bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; +bool tsUpdateCacheBatch = true; + int8_t tsS3EpNum = 0; char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index ade7b63afa..402c4321a1 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -893,7 +893,7 @@ static void mndCompactPullup(SMnode *pMnode) { } taosArrayDestroy(pArray); } - +#ifdef TD_ENTERPRISE static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) { if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) { return 0; @@ -995,6 +995,7 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) { } return 0; } +#endif static int32_t mndProcessCompactTimer(SRpcMsg *pReq) { #ifdef TD_ENTERPRISE diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 9720dc6b8a..5de8487ced 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -108,30 +108,94 @@ static bool checkStatusForEachReplica(SVgObj *pVgroup) { return true; } -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { +static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) { + SSnodeObj *pObj = NULL; + void *pIter = NULL; + int32_t code = 0; + + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {.nodeId = SNODE_HANDLE}; + code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + if (code) { + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); + return code; + } + + char buf[256] = {0}; + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } + + void *p = taosArrayPush(pVgroupList, &entry); + if (p == NULL) { + code = terrno; + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); + return code; + } else { + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + } + + sdbRelease(pMnode->pSdb, pObj); + } + + return code; +} + +static int32_t mndCheckMnodeStatus(SMnode* pMnode) { + int32_t code = 0; + ESdbStatus objStatus; + void *pIter = NULL; + SMnodeObj *pObj = NULL; + + while (1) { + pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true); + if (pIter == NULL) { + break; + } + + if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) { + mDebug("mnode sync state:%d not leader/follower", pObj->syncState); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_FAILED; + } + + if (objStatus != SDB_STATUS_READY) { + mWarn("mnode status:%d not ready", objStatus); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_FAILED; + } + + sdbRelease(pMnode->pSdb, pObj); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; int32_t code = 0; - SArray *pVgroupList = NULL; SHashObj *pHash = NULL; - pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); - if (pVgroupList == NULL) { - mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); - code = terrno; - goto _err; - } - pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pHash == NULL) { mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno)); - code = terrno; - goto _err; + return terrno; } - *allReady = true; - while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) { @@ -148,7 +212,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); - goto _err; // take snapshot failed, and not all ready + goto _end; // take snapshot failed, and not all ready } } else { if (*pReplica != pVgroup->replica) { @@ -158,7 +222,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } } - // if not all ready till now, no need to check the remaining vgroups. + // if not all ready till now, no need to check the remaining vgroups, // but still we need to put the info of the existed vgroups into the snapshot list if (*allReady) { *allReady = checkStatusForEachReplica(pVgroup); @@ -176,7 +240,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { code = terrno; sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); - goto _err; + goto _end; } else { mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -184,51 +248,49 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { sdbRelease(pSdb, pVgroup); } - SSnodeObj *pObj = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); - if (pIter == NULL) { - break; - } +_end: + taosHashCleanup(pHash); + return code; +} - SNodeEntry entry = {.nodeId = SNODE_HANDLE}; - code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); - if (code) { - sdbRelease(pSdb, pObj); - sdbCancelFetch(pSdb, pIter); - mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); - goto _err; - } +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { + int32_t code = 0; + SArray *pVgroupList = NULL; - char buf[256] = {0}; - code = epsetToStr(&entry.epset, buf, tListLen(buf)); - if (code != 0) { // print error and continue - mError("failed to convert epset to str, code:%s", tstrerror(code)); - } + *pList = NULL; + *allReady = true; - void *p = taosArrayPush(pVgroupList, &entry); - if (p == NULL) { - code = terrno; - sdbRelease(pSdb, pObj); - sdbCancelFetch(pSdb, pIter); - mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); - goto _err; - } else { - mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); - } + pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pVgroupList == NULL) { + mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } - sdbRelease(pSdb, pObj); + // 1. check for all vnodes status + code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady); + if (code) { + goto _err; + } + + // 2. add snode info + code = mndAddSnodeInfo(pMnode, pVgroupList); + if (code) { + goto _err; + } + + // 3. check for mnode status + code = mndCheckMnodeStatus(pMnode); + if (code != TSDB_CODE_SUCCESS) { + *allReady = false; } *pList = pVgroupList; - taosHashCleanup(pHash); return code; _err: *allReady = false; taosArrayDestroy(pVgroupList); - taosHashCleanup(pHash); - return code; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f39da72507..29248e360a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,6 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); +int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func); // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); @@ -335,7 +336,6 @@ struct STsdbFS { typedef struct { rocksdb_t *db; rocksdb_comparator_t *my_comparator; - rocksdb_cache_t *blockcache; rocksdb_block_based_table_options_t *tableoptions; rocksdb_options_t *options; rocksdb_flushoptions_t *flushoptions; @@ -347,6 +347,7 @@ typedef struct { tb_uid_t suid; tb_uid_t uid; STSchema *pTSchema; + SArray *ctxArray; } SRocksCache; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a46da532e5..eac082b0c4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -172,9 +172,6 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - rocksdb_cache_t *cache = rocksdb_cache_create_lru(5 * 1024 * 1024); - pTsdb->rCache.blockcache = cache; - rocksdb_block_based_table_options_t *tableoptions = rocksdb_block_based_options_create(); pTsdb->rCache.tableoptions = tableoptions; @@ -185,7 +182,6 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_comparator(options, cmp); - rocksdb_block_based_options_set_block_cache(tableoptions, cache); rocksdb_options_set_block_based_table_factory(options, tableoptions); rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL // rocksdb_options_set_inplace_update_support(options, 1); @@ -234,9 +230,15 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { pTsdb->rCache.suid = -1; pTsdb->rCache.uid = -1; pTsdb->rCache.pTSchema = NULL; + pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx)); + if (!pTsdb->rCache.ctxArray) { + TAOS_CHECK_GOTO(terrno, &lino, _err7); + } TAOS_RETURN(code); +_err7: + (void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex); _err6: rocksdb_writebatch_destroy(writebatch); _err5: @@ -248,7 +250,6 @@ _err3: _err2: rocksdb_options_destroy(options); rocksdb_block_based_options_destroy(tableoptions); - rocksdb_cache_destroy(cache); _err: rocksdb_comparator_destroy(cmp); @@ -264,9 +265,9 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) { rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_block_based_options_destroy(pTsdb->rCache.tableoptions); - rocksdb_cache_destroy(pTsdb->rCache.blockcache); rocksdb_comparator_destroy(pTsdb->rCache.my_comparator); taosMemoryFree(pTsdb->rCache.pTSchema); + taosArrayDestroy(pTsdb->rCache.ctxArray); } static void rocksMayWrite(STsdb *pTsdb, bool force) { @@ -491,10 +492,283 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { return 0; } +static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { + bool deleted = false; + while (*iSkyline > 0) { + TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline); + TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1); + + if (key->ts > pItemBack->ts) { + return false; + } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { + if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) { + // if (key->version <= pItemFront->version || key->version <= pItemBack->version) { + return true; + } else { + if (*iSkyline > 1) { + --*iSkyline; + } else { + return false; + } + } + } else { + if (*iSkyline > 1) { + --*iSkyline; + } else { + return false; + } + } + } + + return deleted; +} + +// Get next non-deleted row from imem +static TSDBROW *tsdbImemGetNextRow(STbDataIter *pTbIter, SArray *pSkyline, int64_t *piSkyline) { + int32_t code = 0; + + if (tsdbTbDataIterNext(pTbIter)) { + TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); + TSDBKEY rowKey = TSDBROW_KEY(pMemRow); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); + if (!deleted) { + return pMemRow; + } + } + + return NULL; +} + +// Get first non-deleted row from imem +static TSDBROW *tsdbImemGetFirstRow(SMemTable *imem, STbData *pIMem, STbDataIter *pTbIter, SArray *pSkyline, + int64_t *piSkyline) { + int32_t code = 0; + + tsdbTbDataIterOpen(pIMem, NULL, 1, pTbIter); + TSDBROW *pMemRow = tsdbTbDataIterGet(pTbIter); + if (pMemRow) { + // if non deleted, return the found row. + TSDBKEY rowKey = TSDBROW_KEY(pMemRow); + bool deleted = tsdbKeyDeleted(&rowKey, pSkyline, piSkyline); + if (!deleted) { + return pMemRow; + } + } else { + return NULL; + } + + // continue to find the non-deleted first row from imem, using get next row + return tsdbImemGetNextRow(pTbIter, pSkyline, piSkyline); +} + +void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; + + if (suid > 0 && suid == pRCache->suid) { + pRCache->sver = -1; + pRCache->suid = -1; + } + if (suid == 0 && uid == pRCache->uid) { + pRCache->sver = -1; + pRCache->uid = -1; + } +} + +static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { + SRocksCache *pRCache = &pTsdb->rCache; + if (pRCache->pTSchema && sver == pRCache->sver) { + if (suid > 0 && suid == pRCache->suid) { + return 0; + } + if (suid == 0 && uid == pRCache->uid) { + return 0; + } + } + + pRCache->suid = suid; + pRCache->uid = uid; + pRCache->sver = sver; + tDestroyTSchema(pRCache->pTSchema); + return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pRCache->pTSchema); +} + +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray); + +int32_t tsdbLoadFromImem(SMemTable *imem, int64_t suid, int64_t uid) { + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = imem->pTsdb; + SArray *pMemDelData = NULL; + SArray *pSkyline = NULL; + int64_t iSkyline = 0; + STbDataIter tbIter = {0}; + TSDBROW *pMemRow = NULL; + STSchema *pTSchema = NULL; + SSHashObj *iColHash = NULL; + int32_t sver; + int32_t nCol; + SArray *ctxArray = pTsdb->rCache.ctxArray; + STsdbRowKey tsdbRowKey = {0}; + + STbData *pIMem = tsdbGetTbDataFromMemTable(imem, suid, uid); + + // load imem tomb data and build skyline + TAOS_CHECK_GOTO(loadMemTombData(&pMemDelData, NULL, pIMem, INT64_MAX), &lino, _exit); + + // tsdbBuildDeleteSkyline + size_t delSize = TARRAY_SIZE(pMemDelData); + if (delSize > 0) { + pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + if (!pSkyline) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tsdbBuildDeleteSkyline(pMemDelData, 0, (int32_t)(delSize - 1), pSkyline)); + iSkyline = taosArrayGetSize(pSkyline) - 1; + } + + pMemRow = tsdbImemGetFirstRow(imem, pIMem, &tbIter, pSkyline, &iSkyline); + if (!pMemRow) { + goto _exit; + } + + // iter first row to last_row/last col values to ctxArray, and mark last null col ids + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); + pTSchema = pTsdb->rCache.pTSchema; + nCol = pTSchema->numOfCols; + + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema)); + + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + + if (COL_VAL_IS_VALUE(pColVal)) { + updateCtx.lflag = LFLAG_LAST; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); + if (iColHash == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &pColVal->cid, sizeof(pColVal->cid))) { + TAOS_CHECK_EXIT(terrno); + } + } + } + tsdbRowClose(&iter); + + // continue to get next row to fill null last col values + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + while (pMemRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + sver = TSDBROW_SVERSION(pMemRow); + TAOS_CHECK_EXIT(tsdbUpdateSkm(pTsdb, suid, uid, sver)); + pTSchema = pTsdb->rCache.pTSchema; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pMemRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&iter, pMemRow, pTSchema)); + + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + if (tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)) && COL_VAL_IS_VALUE(pColVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + if (!taosArrayPush(ctxArray, &updateCtx)) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid))); + } + } + tsdbRowClose(&iter); + + pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline); + } + + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + + tsdbRowClose(&iter); + } + + taosArrayClear(ctxArray); + // destroy any allocated resource + tSimpleHashCleanup(iColHash); + if (pMemDelData) { + taosArrayDestroy(pMemDelData); + } + if (pSkyline) { + taosArrayDestroy(pSkyline); + } + + TAOS_RETURN(code); +} + +static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) { + if (!pTsdb) return 0; + if (!pTsdb->imem) return 0; + + int32_t code = 0; + int32_t lino = 0; + SMemTable *imem = pTsdb->imem; + int32_t nTbData = imem->nTbData; + int64_t nRow = imem->nRow; + int64_t nDel = imem->nDel; + + if (nRow == 0 || nTbData == 0) return 0; + + TAOS_CHECK_EXIT(tsdbMemTableSaveToCache(imem, tsdbLoadFromImem)); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, nRow, nDel); + } + + TAOS_RETURN(code); +} + int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; - char *err = NULL; + // 0, tsdbCacheUpdateFromIMem if updateCacheBatch + // flush dirty data of lru into rocks + // 4, and update when writing if !updateCacheBatch + // 5, merge cache & mem if updateCacheBatch + + if (tsUpdateCacheBatch) { + code = tsdbCacheUpdateFromIMem(pTsdb); + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + + TAOS_RETURN(code); + } + } + + char *err = NULL; SLRUCache *pCache = pTsdb->lruCache; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; @@ -1300,97 +1574,55 @@ _exit: TAOS_RETURN(code); } -void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (!pRCache->pTSchema || sver <= pTsdb->rCache.sver) return; - - if (suid > 0 && suid == pRCache->suid) { - pRCache->sver = -1; - pRCache->suid = -1; - } - if (suid == 0 && uid == pRCache->uid) { - pRCache->sver = -1; - pRCache->uid = -1; - } -} - -static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) { - SRocksCache *pRCache = &pTsdb->rCache; - if (pRCache->pTSchema && sver == pRCache->sver) { - if (suid > 0 && suid == pRCache->suid) { - return 0; - } - if (suid == 0 && uid == pRCache->uid) { - return 0; - } - } - - pRCache->suid = suid; - pRCache->uid = uid; - pRCache->sver = sver; - tDestroyTSchema(pRCache->pTSchema); - return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema); -} - int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow) { int32_t code = 0, lino = 0; // 1. prepare last - TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(&lRow); - SArray *ctxArray = NULL; - SSHashObj *iColHash = NULL; + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); + SSHashObj *iColHash = NULL; + STSDBRowIter iter = {0}; TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit); pTSchema = pTsdb->rCache.pTSchema; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t nCol = pTSchema->numOfCols; - - ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx)); - if (ctxArray == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); - } + SArray *ctxArray = pTsdb->rCache.ctxArray; // 1. prepare by lrow STsdbRowKey tsdbRowKey = {0}; tsdbRowGetKey(&lRow, &tsdbRowKey); - STSDBRowIter iter = {0}; TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); int32_t iCol = 0; for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } if (COL_VAL_IS_VALUE(pColVal)) { updateCtx.lflag = LFLAG_LAST; if (!taosArrayPush(ctxArray, &updateCtx)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(terrno, &lino, _exit); } } else { if (!iColHash) { iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (iColHash == NULL) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } } if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) { - tsdbRowClose(&iter); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit); } } } - tsdbRowClose(&iter); // 2. prepare by the other rows for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) { @@ -1424,32 +1656,28 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6 } } - // 3. do update - code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); - if (code < TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - } + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); _exit: if (code) { tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); } - taosArrayDestroy(ctxArray); + tsdbRowClose(&iter); tSimpleHashCleanup(iColHash); + taosArrayClear(ctxArray); TAOS_RETURN(code); } int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { - int32_t code = 0, lino = 0; + int32_t code = 0, lino = 0; + STSDBRowIter iter = {0}; + STSchema *pTSchema = NULL; + SArray *ctxArray = NULL; TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1); - - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(&lRow); - SArray *ctxArray = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); TAOS_CHECK_RETURN(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema)); @@ -1500,29 +1728,18 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo } // 2. prepare last row - STSDBRowIter iter = {0}; - code = tsdbRowIterOpen(&iter, &lRow, pTSchema); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - TAOS_CHECK_GOTO(code, &lino, _exit); - } + TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit); for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; if (!taosArrayPush(ctxArray, &updateCtx)) { TAOS_CHECK_GOTO(terrno, &lino, _exit); } } - tsdbRowClose(&iter); - // 3. do update - code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); - if (code != TSDB_CODE_SUCCESS) { - tsdbTrace("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, - tstrerror(code)); - } + TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, suid, uid, ctxArray), &lino, _exit); _exit: + tsdbRowClose(&iter); taosMemoryFreeClear(pTSchema); taosArrayDestroy(ctxArray); @@ -1789,8 +2006,9 @@ _exit: TAOS_RETURN(code); } -int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; +static int32_t tsdbCacheGetBatchFromLru(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, + int8_t ltype, SArray *keyArray) { + int32_t code = 0, lino = 0; SArray *remainCols = NULL; SArray *ignoreFromRocks = NULL; SLRUCache *pCache = pTsdb->lruCache; @@ -1811,6 +2029,10 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache key.lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; } + if (!taosArrayPush(keyArray, &key)) { + TAOS_CHECK_EXIT(terrno); + } + LRUHandle *h = taosLRUCacheLookup(pCache, &key, ROCKS_KEY_LEN); SLastCol *pLastCol = h ? (SLastCol *)taosLRUCacheValue(pCache, h) : NULL; if (h && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { @@ -1914,6 +2136,492 @@ _exit: TAOS_RETURN(code); } +typedef enum SMEMNEXTROWSTATES { + SMEMNEXTROW_ENTER, + SMEMNEXTROW_NEXT, +} SMEMNEXTROWSTATES; + +typedef struct SMemNextRowIter { + SMEMNEXTROWSTATES state; + STbData *pMem; // [input] + STbDataIter iter; // mem buffer skip list iterator + int64_t lastTs; +} SMemNextRowIter; + +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { + SMemNextRowIter *state = (SMemNextRowIter *)iter; + int32_t code = 0; + *pIgnoreEarlierTs = false; + switch (state->state) { + case SMEMNEXTROW_ENTER: { + if (state->pMem != NULL) { + /* + if (state->pMem->maxKey <= state->lastTs) { + *ppRow = NULL; + *pIgnoreEarlierTs = true; + + TAOS_RETURN(code); + } + */ + tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); + + TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); + if (pMemRow) { + *ppRow = pMemRow; + state->state = SMEMNEXTROW_NEXT; + + TAOS_RETURN(code); + } + } + + *ppRow = NULL; + + TAOS_RETURN(code); + } + case SMEMNEXTROW_NEXT: + if (tsdbTbDataIterNext(&state->iter)) { + *ppRow = tsdbTbDataIterGet(&state->iter); + + TAOS_RETURN(code); + } else { + *ppRow = NULL; + + TAOS_RETURN(code); + } + default: + break; + } + +_err: + *ppRow = NULL; + + TAOS_RETURN(code); +} + +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols); +typedef int32_t (*_next_row_clear_fn_t)(void *iter); + +typedef struct { + TSDBROW *pRow; + bool stop; + bool next; + bool ignoreEarlierTs; + void *iter; + _next_row_fn_t nextRowFn; + _next_row_clear_fn_t nextRowClearFn; +} TsdbNextRowState; + +typedef struct { + SArray *pMemDelData; + SArray *pSkyline; + int64_t iSkyline; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + TSDBROW memRow, imemRow; + TsdbNextRowState input[2]; + SCacheRowsReader *pr; + STsdb *pTsdb; +} MemNextRowIter; + +static int32_t memRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) { + int32_t code = 0, lino = 0; + + STbData *pMem = NULL; + if (pReadSnap->pMem) { + pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid); + } + + STbData *pIMem = NULL; + if (pReadSnap->pIMem) { + pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid); + } + + pIter->pTsdb = pTsdb; + + pIter->pMemDelData = NULL; + + TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit); + + pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; + + pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; + pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; + + if (pMem) { + pIter->memState.pMem = pMem; + pIter->memState.state = SMEMNEXTROW_ENTER; + pIter->input[0].stop = false; + pIter->input[0].next = true; + } + + if (pIMem) { + pIter->imemState.pMem = pIMem; + pIter->imemState.state = SMEMNEXTROW_ENTER; + pIter->input[1].stop = false; + pIter->input[1].next = true; + } + + pIter->pr = pr; + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + TAOS_RETURN(code); +} + +static void memRowIterClose(MemNextRowIter *pIter) { + for (int i = 0; i < 2; ++i) { + if (pIter->input[i].nextRowClearFn) { + (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter); + } + } + + if (pIter->pSkyline) { + taosArrayDestroy(pIter->pSkyline); + } + + if (pIter->pMemDelData) { + taosArrayDestroy(pIter->pMemDelData); + } +} + +static void freeTableInfoFunc(void *param) { + void **p = (void **)param; + taosMemoryFreeClear(*p); +} + +static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { + if (!pReader->pTableMap) { + pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (!pReader->pTableMap) { + return NULL; + } + + tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); + } + + STableLoadInfo *pInfo = NULL; + STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); + if (!ppInfo) { + pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); + if (pInfo) { + if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { + return NULL; + } + } + + return pInfo; + } + + return *ppInfo; +} + +static TSDBROW *memRowIterGet(MemNextRowIter *pIter, bool isLast, int16_t *aCols, int nCols) { + int32_t code = 0, lino = 0; + + for (;;) { + for (int i = 0; i < 2; ++i) { + if (pIter->input[i].next && !pIter->input[i].stop) { + TAOS_CHECK_GOTO(pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, + &pIter->input[i].ignoreEarlierTs, isLast, aCols, nCols), + &lino, _exit); + + if (pIter->input[i].pRow == NULL) { + pIter->input[i].stop = true; + pIter->input[i].next = false; + } + } + } + + if (pIter->input[0].stop && pIter->input[1].stop) { + return NULL; + } + + TSDBROW *max[2] = {0}; + int iMax[2] = {-1, -1}; + int nMax = 0; + SRowKey maxKey = {.ts = TSKEY_MIN}; + + for (int i = 0; i < 2; ++i) { + if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) { + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pIter->input[i].pRow, &tsdbRowKey); + + // merging & deduplicating on client side + int c = tRowKeyCompare(&maxKey, &tsdbRowKey.key); + if (c <= 0) { + if (c < 0) { + nMax = 0; + maxKey = tsdbRowKey.key; + } + + iMax[nMax] = i; + max[nMax++] = pIter->input[i].pRow; + } + pIter->input[i].next = false; + } + } + + TSDBROW *merge[2] = {0}; + int iMerge[2] = {-1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey1 = TSDBROW_KEY(max[i]); + + if (!pIter->pSkyline) { + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + TSDB_CHECK_NULL(pIter->pSkyline, code, lino, _exit, terrno); + + uint64_t uid = pIter->idx.uid; + STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid); + TSDB_CHECK_NULL(pInfo, code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY); + + if (pInfo->pTombData == NULL) { + pInfo->pTombData = taosArrayInit(4, sizeof(SDelData)); + TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _exit, terrno); + } + + if (!taosArrayAddAll(pInfo->pTombData, pIter->pMemDelData)) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } + + size_t delSize = TARRAY_SIZE(pInfo->pTombData); + if (delSize > 0) { + code = tsdbBuildDeleteSkyline(pInfo->pTombData, 0, (int32_t)(delSize - 1), pIter->pSkyline); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + pIter->iSkyline = taosArrayGetSize(pIter->pSkyline) - 1; + } + + bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline); + if (!deleted) { + iMerge[nMerge] = iMax[i]; + merge[nMerge++] = max[i]; + } + + pIter->input[iMax[i]].next = deleted; + } + + if (nMerge > 0) { + pIter->input[iMerge[0]].next = true; + + return merge[0]; + } + } + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + return NULL; +} + +static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { + int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; + *ppDst = taosMemoryMalloc(len); + if (NULL == *ppDst) { + TAOS_RETURN(terrno); + } + memcpy(*ppDst, pSrc, len); + + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { + if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { + TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); + } + + if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { + TAOS_RETURN(TSDB_CODE_SUCCESS); + } + + taosMemoryFreeClear(pReader->pCurrSchema); + TAOS_RETURN( + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); +} + +static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, + SArray *keyArray) { + int32_t code = 0; + int32_t lino = 0; + STSchema *pTSchema = pr->pSchema; + SLRUCache *pCache = pTsdb->lruCache; + SArray *pCidList = pr->pCidList; + int numKeys = TARRAY_SIZE(pCidList); + MemNextRowIter iter = {0}; + SSHashObj *iColHash = NULL; + + // 1, get from mem, imem filtered with delete info + TAOS_CHECK_EXIT(memRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pReadSnap, pr)); + + TSDBROW *pRow = memRowIterGet(&iter, false, NULL, 0); + if (!pRow) { + goto _exit; + } + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid)); + + pTSchema = pr->pCurrSchema; + } + int32_t nCol = pTSchema->numOfCols; + + STsdbRowKey rowKey = {0}; + tsdbRowGetKey(pRow, &rowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + int32_t iCol = 0, jCol = 0, jnCol = TARRAY_SIZE(pLastArray); + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol && jCol < jnCol;) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[jCol]; + if (pColVal->cid < pTargetCol->colVal.cid) { + pColVal = tsdbRowIterNext(&rowIter), ++iCol; + + continue; + } + if (pColVal->cid > pTargetCol->colVal.cid) { + break; + } + + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &rowKey.key); + if (!IS_LAST_KEY(((SLastKey *)TARRAY_DATA(keyArray))[jCol])) { + if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) { + SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (COL_VAL_IS_VALUE(pColVal)) { + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = rowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, jCol, &lastCol); + } + } else { + if (!iColHash) { + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT)); + if (iColHash == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (tSimpleHashPut(iColHash, &pColVal->cid, sizeof(pColVal->cid), &jCol, sizeof(jCol))) { + TAOS_CHECK_EXIT(terrno); + } + } + } + + ++jCol; + + if (jCol < jnCol && ((SLastCol *)TARRAY_DATA(pLastArray))[jCol].colVal.cid > pColVal->cid) { + pColVal = tsdbRowIterNext(&rowIter), ++iCol; + } + } + tsdbRowClose(&rowIter); + + if (iColHash && tSimpleHashGetSize(iColHash) > 0) { + pRow = memRowIterGet(&iter, false, NULL, 0); + while (pRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + TAOS_CHECK_EXIT(updateTSchema(sversion, pr, uid)); + + pTSchema = pr->pCurrSchema; + } + nCol = pTSchema->numOfCols; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(pRow, &tsdbRowKey); + + STSDBRowIter rowIter = {0}; + TAOS_CHECK_EXIT(tsdbRowIterOpen(&rowIter, pRow, pTSchema)); + + iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&rowIter); pColVal && iCol < nCol; + pColVal = tsdbRowIterNext(&rowIter), iCol++) { + int32_t *pjCol = tSimpleHashGet(iColHash, &pColVal->cid, sizeof(pColVal->cid)); + if (pjCol && COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pTargetCol = &((SLastCol *)TARRAY_DATA(pLastArray))[*pjCol]; + + int32_t cmp_res = tRowKeyCompare(&pTargetCol->rowKey, &tsdbRowKey.key); + if (cmp_res <= 0) { + SLastCol lastCol = { + .rowKey = tsdbRowKey.key, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID}; + TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(&lastCol, NULL)); + + tsdbCacheFreeSLastColItem(pTargetCol); + taosArraySet(pLastArray, *pjCol, &lastCol); + } + + TAOS_CHECK_EXIT(tSimpleHashRemove(iColHash, &pColVal->cid, sizeof(pColVal->cid))); + } + } + tsdbRowClose(&rowIter); + + pRow = memRowIterGet(&iter, false, NULL, 0); + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + + tsdbRowClose(&rowIter); + } + + tSimpleHashCleanup(iColHash); + + memRowIterClose(&iter); + + TAOS_RETURN(code); +} + +int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { + int32_t code = 0; + int32_t lino = 0; + + SArray *keyArray = taosArrayInit(16, sizeof(SLastKey)); + if (!keyArray) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype, keyArray)); + + if (tsUpdateCacheBatch) { + TAOS_CHECK_EXIT(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, keyArray)); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); + } + + if (keyArray) { + taosArrayDestroy(keyArray); + } + + TAOS_RETURN(code); +} + int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0, lino = 0; // fetch schema @@ -2159,37 +2867,6 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { TAOS_RETURN(code); } -static void freeTableInfoFunc(void *param) { - void **p = (void **)param; - taosMemoryFreeClear(*p); -} - -static STableLoadInfo *getTableLoadInfo(SCacheRowsReader *pReader, uint64_t uid) { - if (!pReader->pTableMap) { - pReader->pTableMap = tSimpleHashInit(pReader->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (!pReader->pTableMap) { - return NULL; - } - - tSimpleHashSetFreeFp(pReader->pTableMap, freeTableInfoFunc); - } - - STableLoadInfo *pInfo = NULL; - STableLoadInfo **ppInfo = tSimpleHashGet(pReader->pTableMap, &uid, sizeof(uid)); - if (!ppInfo) { - pInfo = taosMemoryCalloc(1, sizeof(STableLoadInfo)); - if (pInfo) { - if (tSimpleHashPut(pReader->pTableMap, &uid, sizeof(uint64_t), &pInfo, POINTER_BYTES)) { - return NULL; - } - } - - return pInfo; - } - - return *ppInfo; -} - static uint64_t *getUidList(SCacheRowsReader *pReader) { if (!pReader->uidList) { int32_t numOfTables = pReader->numOfTables; @@ -2774,114 +3451,6 @@ _err: TAOS_RETURN(code); } -typedef enum SMEMNEXTROWSTATES { - SMEMNEXTROW_ENTER, - SMEMNEXTROW_NEXT, -} SMEMNEXTROWSTATES; - -typedef struct SMemNextRowIter { - SMEMNEXTROWSTATES state; - STbData *pMem; // [input] - STbDataIter iter; // mem buffer skip list iterator - int64_t lastTs; -} SMemNextRowIter; - -static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols) { - SMemNextRowIter *state = (SMemNextRowIter *)iter; - int32_t code = 0; - *pIgnoreEarlierTs = false; - switch (state->state) { - case SMEMNEXTROW_ENTER: { - if (state->pMem != NULL) { - /* - if (state->pMem->maxKey <= state->lastTs) { - *ppRow = NULL; - *pIgnoreEarlierTs = true; - - TAOS_RETURN(code); - } - */ - tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); - - TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); - if (pMemRow) { - *ppRow = pMemRow; - state->state = SMEMNEXTROW_NEXT; - - TAOS_RETURN(code); - } - } - - *ppRow = NULL; - - TAOS_RETURN(code); - } - case SMEMNEXTROW_NEXT: - if (tsdbTbDataIterNext(&state->iter)) { - *ppRow = tsdbTbDataIterGet(&state->iter); - - TAOS_RETURN(code); - } else { - *ppRow = NULL; - - TAOS_RETURN(code); - } - default: - break; - } - -_err: - *ppRow = NULL; - - TAOS_RETURN(code); -} - -static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { - bool deleted = false; - while (*iSkyline > 0) { - TSDBKEY *pItemBack = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline); - TSDBKEY *pItemFront = (TSDBKEY *)taosArrayGet(pSkyline, *iSkyline - 1); - - if (key->ts > pItemBack->ts) { - return false; - } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { - if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) { - // if (key->version <= pItemFront->version || key->version <= pItemBack->version) { - return true; - } else { - if (*iSkyline > 1) { - --*iSkyline; - } else { - return false; - } - } - } else { - if (*iSkyline > 1) { - --*iSkyline; - } else { - return false; - } - } - } - - return deleted; -} - -typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols); -typedef int32_t (*_next_row_clear_fn_t)(void *iter); - -typedef struct { - TSDBROW *pRow; - bool stop; - bool next; - bool ignoreEarlierTs; - void *iter; - _next_row_fn_t nextRowFn; - _next_row_clear_fn_t nextRowClearFn; -} TsdbNextRowState; - typedef struct CacheNextRowIter { SArray *pMemDelData; SArray *pSkyline; @@ -3160,7 +3729,6 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI } _err: - if (code) { tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); } @@ -3187,31 +3755,6 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, TAOS_RETURN(TSDB_CODE_SUCCESS); } -static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { - int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; - *ppDst = taosMemoryMalloc(len); - if (NULL == *ppDst) { - TAOS_RETURN(terrno); - } - memcpy(*ppDst, pSrc, len); - - TAOS_RETURN(TSDB_CODE_SUCCESS); -} - -static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) { - if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) { - TAOS_RETURN(cloneTSchema(pReader->pSchema, &pReader->pCurrSchema)); - } - - if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) { - TAOS_RETURN(TSDB_CODE_SUCCESS); - } - - taosMemoryFreeClear(pReader->pCurrSchema); - TAOS_RETURN( - metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &pReader->pCurrSchema)); -} - static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds) { int32_t code = 0, lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 5b26d17519..aaf48924e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -325,6 +325,27 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t * taosRUnLockLatch(&pMemTable->latch); } +typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid); + +int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) { + int32_t code = 0; + __tsdb_cache_update cb = (__tsdb_cache_update)func; + + for (int32_t i = 0; i < pMemTable->nBucket; ++i) { + STbData *pTbData = pMemTable->aBucket[i]; + while (pTbData) { + code = (*cb)(pMemTable, pTbData->suid, pTbData->uid); + if (code) { + TAOS_RETURN(code); + } + + pTbData = pTbData->next; + } + } + + return code; +} + static int32_t tsdbMemTableRehash(SMemTable *pMemTable) { int32_t code = 0; @@ -659,7 +680,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) { tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64, TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version); @@ -721,7 +742,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.key.ts >= pTbData->maxKey) { pTbData->maxKey = key.key.ts; } - if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { + if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) { TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow)); } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 9090ae7a4d..e2069deefc 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1849,7 +1849,6 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname)); tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname)); tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname)); - tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname)); pStmt->usingTableName.type = 1; pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; // set the table type to child table for parse cache @@ -2060,9 +2059,7 @@ static int32_t parseStbBoundInfo(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo)); (*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo; - (*ppTableDataCxt)->boundColsInfo.numOfCols = pStbRowsCxt->boundColsInfo.numOfBound; - (*ppTableDataCxt)->boundColsInfo.numOfBound = pStbRowsCxt->boundColsInfo.numOfBound; - (*ppTableDataCxt)->boundColsInfo.hasBoundCols = pStbRowsCxt->boundColsInfo.hasBoundCols; + (*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t)); if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) { return terrno; @@ -3175,9 +3172,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .isStmtBind = pCxt->isStmtBind}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); - SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)((*pQuery)->pRoot); if (TSDB_CODE_SUCCESS == code) { - code = parseInsertSqlImpl(&context, pStmt); + code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)((*pQuery)->pRoot)); } if (TSDB_CODE_SUCCESS == code) { code = setNextStageInfo(&context, *pQuery, pCatalogReq); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index b3c89a6b1c..c69abea0ca 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -606,6 +606,13 @@ int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const c code = terrno; goto end; } + } else { + SVCreateTbReq* tmp = pDataBlock->pData->pCreateTbReq; + taosMemoryFreeClear(tmp->name); + taosMemoryFreeClear(tmp->ctb.pTag); + taosMemoryFreeClear(tmp->ctb.stbName); + taosArrayDestroy(tmp->ctb.tagName); + tmp->ctb.tagName = NULL; } code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index a2e874aabf..b91334944d 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -475,7 +475,13 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); } - TAOS_CHECK_RETURN(cfgSetItemVal(pItem, name, value, stype)); + code = cfgSetItemVal(pItem, name, value, stype); + if (code != TSDB_CODE_SUCCESS) { + if (lock) { + (void)taosThreadMutexUnlock(&pCfg->lock); + } + TAOS_RETURN(code); + } if (lock) { (void)taosThreadMutexUnlock(&pCfg->lock); diff --git a/tests/army/output.txt b/tests/army/output.txt deleted file mode 100644 index ed3bd5da1a..0000000000 --- a/tests/army/output.txt +++ /dev/null @@ -1,91 +0,0 @@ -[10/28 19:12:21.666563] SUCC: created database (db_sub) -[10/28 19:12:21.694603] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:12:21.823202] SUCC: Spent 0.1290 seconds to create 1000 table(s) with 8 thread(s) speed: 7752 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:12:22.127442] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 441047.79 records/second -[10/28 19:12:22.128649] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 440895.33 records/second -[10/28 19:12:22.129478] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 440151.69 records/second -[10/28 19:12:22.133756] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 433268.05 records/second -[10/28 19:12:22.135211] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 430329.63 records/second -[10/28 19:12:22.137335] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 425800.08 records/second -[10/28 19:12:22.138252] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 426330.15 records/second -[10/28 19:12:22.141351] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 422778.64 records/second -[10/28 19:12:22.141585] SUCC: Spent 0.311648 (real 0.289041) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3208748.33 (real 3459716.79) records/second -[10/28 19:12:22.141590] SUCC: insert delay, min: 0.9600ms, avg: 2.3123ms, p90: 3.1790ms, p95: 3.5080ms, p99: 4.2230ms, max: 4.9040ms -[10/28 19:28:50.798427] SUCC: created database (db_sub) -[10/28 19:28:50.828326] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:28:50.936429] SUCC: Spent 0.1080 seconds to create 1000 table(s) with 8 thread(s) speed: 9259 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:28:51.187235] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 539204.48 records/second -[10/28 19:28:51.189941] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 532329.43 records/second -[10/28 19:28:51.191551] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 530954.66 records/second -[10/28 19:28:51.191858] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 529259.59 records/second -[10/28 19:28:51.192459] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 530229.44 records/second -[10/28 19:28:51.195372] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 522099.42 records/second -[10/28 19:28:51.197727] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 516620.72 records/second -[10/28 19:28:51.197883] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 517125.12 records/second -[10/28 19:28:51.198123] SUCC: Spent 0.255536 (real 0.237135) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3913342.93 (real 4217007.19) records/second -[10/28 19:28:51.198130] SUCC: insert delay, min: 0.9200ms, avg: 1.8971ms, p90: 2.6870ms, p95: 2.9520ms, p99: 3.5880ms, max: 4.0710ms -[10/28 19:31:44.377691] SUCC: created database (db_sub) -[10/28 19:31:44.392998] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:31:44.696768] SUCC: Spent 0.3040 seconds to create 1000 table(s) with 8 thread(s) speed: 3289 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:31:45.126910] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 304775.47 records/second -[10/28 19:31:45.131979] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 301117.75 records/second -[10/28 19:31:45.135106] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 299854.39 records/second -[10/28 19:31:45.135675] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 298322.24 records/second -[10/28 19:31:45.137069] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 297733.89 records/second -[10/28 19:31:45.137952] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 296900.13 records/second -[10/28 19:31:45.138834] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 295170.54 records/second -[10/28 19:31:45.145048] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 291966.71 records/second -[10/28 19:31:45.145369] SUCC: Spent 0.442506 (real 0.419200) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2259856.36 (real 2385496.18) records/second -[10/28 19:31:45.145377] SUCC: insert delay, min: 1.0400ms, avg: 3.3536ms, p90: 5.3120ms, p95: 7.9660ms, p99: 13.1570ms, max: 19.1410ms -[10/28 19:44:19.873056] SUCC: created database (db_sub) -[10/28 19:44:19.904701] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:44:20.053846] SUCC: Spent 0.1490 seconds to create 1000 table(s) with 8 thread(s) speed: 6711 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:44:20.328698] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 485742.49 records/second -[10/28 19:44:20.330777] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 481686.29 records/second -[10/28 19:44:20.331290] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 480911.65 records/second -[10/28 19:44:20.331665] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 481043.06 records/second -[10/28 19:44:20.333451] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 477172.09 records/second -[10/28 19:44:20.334745] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 475675.84 records/second -[10/28 19:44:20.335056] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 474158.37 records/second -[10/28 19:44:20.337919] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 470816.89 records/second -[10/28 19:44:20.338144] SUCC: Spent 0.277921 (real 0.261310) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3598144.80 (real 3826872.30) records/second -[10/28 19:44:20.338153] SUCC: insert delay, min: 0.9180ms, avg: 2.0905ms, p90: 2.6490ms, p95: 3.0620ms, p99: 4.1480ms, max: 4.7840ms -[10/28 19:58:27.100989] SUCC: created database (db_sub) -[10/28 19:58:27.115572] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 19:58:27.362948] SUCC: Spent 0.2470 seconds to create 1000 table(s) with 8 thread(s) speed: 4049 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 19:58:27.807669] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 291891.03 records/second -[10/28 19:58:27.818785] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 285413.54 records/second -[10/28 19:58:27.819649] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 284193.61 records/second -[10/28 19:58:27.819844] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 284352.64 records/second -[10/28 19:58:27.820170] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 284576.63 records/second -[10/28 19:58:27.821489] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 283781.33 records/second -[10/28 19:58:27.822061] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 283112.24 records/second -[10/28 19:58:27.823513] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 282730.59 records/second -[10/28 19:58:27.823779] SUCC: Spent 0.455783 (real 0.438625) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2194026.54 (real 2279851.81) records/second -[10/28 19:58:27.823786] SUCC: insert delay, min: 0.9780ms, avg: 3.5090ms, p90: 5.5650ms, p95: 6.8600ms, p99: 10.6010ms, max: 13.4400ms -[10/28 20:00:06.417182] SUCC: created database (db_sub) -[10/28 20:00:06.448202] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 20:00:06.596961] SUCC: Spent 0.1480 seconds to create 1000 table(s) with 8 thread(s) speed: 6757 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 20:00:06.895455] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 443978.76 records/second -[10/28 20:00:06.896986] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 442549.94 records/second -[10/28 20:00:06.897536] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 440927.99 records/second -[10/28 20:00:06.898905] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 439131.15 records/second -[10/28 20:00:06.899024] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 439628.46 records/second -[10/28 20:00:06.901861] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 435197.37 records/second -[10/28 20:00:06.902305] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 434812.86 records/second -[10/28 20:00:06.904698] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 433406.26 records/second -[10/28 20:00:06.904905] SUCC: Spent 0.301788 (real 0.284949) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3313584.37 (real 3509399.93) records/second -[10/28 20:00:06.904912] SUCC: insert delay, min: 0.8770ms, avg: 2.2796ms, p90: 3.1340ms, p95: 3.6480ms, p99: 4.8280ms, max: 6.0880ms -[10/28 20:05:34.756207] SUCC: created database (db_sub) -[10/28 20:05:34.784793] INFO: start creating 1000 table(s) with 8 thread(s) -[10/28 20:05:34.927068] SUCC: Spent 0.1430 seconds to create 1000 table(s) with 8 thread(s) speed: 6993 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created -[10/28 20:05:35.213741] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 466952.82 records/second -[10/28 20:05:35.215403] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 463804.68 records/second -[10/28 20:05:35.221132] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 453322.31 records/second -[10/28 20:05:35.221224] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 453671.11 records/second -[10/28 20:05:35.222003] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 452641.07 records/second -[10/28 20:05:35.222536] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 451796.89 records/second -[10/28 20:05:35.223663] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 449643.52 records/second -[10/28 20:05:35.225246] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 447768.68 records/second -[10/28 20:05:35.225659] SUCC: Spent 0.290871 (real 0.274808) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3437950.16 (real 3638904.25) records/second -[10/28 20:05:35.225666] SUCC: insert delay, min: 0.9360ms, avg: 2.1985ms, p90: 2.9290ms, p95: 3.4580ms, p99: 4.6030ms, max: 6.2660ms diff --git a/tests/script/api/last-query-ws.cpp b/tests/script/api/last-query-ws.cpp new file mode 100644 index 0000000000..d81ed5bc5e --- /dev/null +++ b/tests/script/api/last-query-ws.cpp @@ -0,0 +1,212 @@ +// g++ --std=c++17 -o multiQueryLastrow multiQueryLastrow.cpp -ltaos -lpthread -ltaosws + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "taos.h" +#include "taosws.h" + +int numThreads = 5; +int numQuerys = 100; +int queryType = 0; +int numConnections = 1; +bool useWebSocket = 0; + +using namespace std; + +const std::string dbName = "iot"; +const std::string sTableName = "m"; +int maxTableIndex = 50000; + +std::mutex mtx; +std::condition_variable cv; +vector taosArray; +vector wtaosArray; + +std::atomic finishCounter; +std::chrono::system_clock::time_point startTime; +std::chrono::system_clock::time_point stopTime; +unordered_map consumeHash; + +static void query(int numQuerys, int id, int type); + +void threadFunction(int id) { + // std::unique_lock lock(mtx); + // cv.wait(lock); + + // lock.unlock(); + + //auto startQueryTime = std::chrono::system_clock::now(); + + query(numQuerys, id, queryType); + + //consumeHash[id] = std::chrono::system_clock::now() - startQueryTime; + + // int counter = finishCounter.fetch_add(1); + // if (counter == numThreads - 1) { + // stopTime = std::chrono::system_clock::now(); + // } +} + +void createThreads(const int numThreads, std::vector* pThreads) { + for (int i = 0; i < numThreads; ++i) { + pThreads->emplace_back(threadFunction, i); + } + + std::cout << "2. Threads created\n"; +} + +void connect() { + void* res = NULL; + + for (auto i = 0; i < numConnections; i++) { + if (useWebSocket) { + const char* dsn = "taos+ws://localhost:6041"; + WS_TAOS* wtaos = ws_connect(dsn); + int32_t code = 0; + if (wtaos == NULL) { + code = ws_errno(NULL); + const char* errstr = ws_errstr(NULL); + std::cout << "Connection failed[" << code << "]: " << errstr << "\n"; + return; + } + code = ws_select_db(wtaos, dbName.c_str()); + const char* errstr = ws_errstr(wtaos); + if (code) { + std::cout << "Connection failed on select db[" << code << "]: " << errstr << "\n"; + return; + } + wtaosArray.push_back(wtaos); + } else { + TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", dbName.c_str(), 0); + if (!taos) { + std::cerr << "Failed to connect to TDengine\n"; + return; + } + taosArray.push_back(taos); + } + } + + std::cout << "1. Success to connect to TDengine\n"; +} + +void query(int numQuerys, int id, int type) { + int connIdx = id % numConnections; + + for (int i = 0; i < numQuerys; i++) { + std::string sql; + if (type == 0) { + sql = "select last_row(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex); + } else { + sql = "select first(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex); + } + + if (!useWebSocket) { + TAOS* taos = taosArray[connIdx]; + + TAOS_RES* res = taos_query(taos, sql.c_str()); + if (!res) { + std::cerr << "Failed to query TDengine\n"; + return; + } + + if (taos_errno(res) != 0) { + std::cerr << "Failed to query TDengine since: " << taos_errstr(res) << "\n"; + return; + } + taos_free_result(res); + } else { + WS_TAOS* wtaos = wtaosArray[connIdx]; + + WS_RES* wres = ws_query(wtaos, sql.c_str()); + if (!wres) { + std::cerr << "Failed to query TDengine\n"; + return; + } + + int32_t code = ws_errno(wres); + if (code != 0) { + std::cerr << "Failed to query TDengine since: " << ws_errstr(wres) << "\n"; + return; + } + ws_free_result(wres); + } + } +} + +void printHelp() { + std::cout << "./multiQueryLastrow {numThreads} {numQuerys} {queryType} {numConnections} {useWebSocket}\n"; + exit(-1); +} + +int main(int argc, char* argv[]) { + if (argc != 6) { + printHelp(); + } + + numThreads = atoi(argv[1]); + numQuerys = atoi(argv[2]); + queryType = atoi(argv[3]); + numConnections = atoi(argv[4]); + useWebSocket = atoi(argv[5]); + + std::string queryTypeStr = (queryType == 0) ? "last_row(ts)" : "first(ts)"; + std::cout << "numThreads:" << numThreads << ", queryTimes:" << numQuerys << ", queryType:" << queryTypeStr + << ", numConnections:" << numConnections << ", useWebSocket:" << useWebSocket << "\n"; + + finishCounter.store(0); + + connect(); + + //startTime = std::chrono::system_clock::now(); + + std::vector threads; + createThreads(numThreads, &threads); + + //std::this_thread::sleep_for(std::chrono::seconds(1)); + + std::cout << "3. Start quering\n"; + + startTime = std::chrono::system_clock::now(); + + //cv.notify_all(); + + for (auto& t : threads) { + t.join(); + } + + stopTime = std::chrono::system_clock::now(); + + for (auto& taos : taosArray) { + taos_close(taos); + } + + for (auto& wtaos : wtaosArray) { + ws_close(wtaos); + } + + std::cout << "4. All job done\n"; + + int64_t totalQueryConsumeMs = 0; + for (auto& res : consumeHash) { + totalQueryConsumeMs += res.second.count() /1000000; + } + + std::chrono::nanoseconds elp = stopTime - startTime; + int64_t elpMs = elp.count() / 1000000; + int64_t totalQueryCount = numThreads * numQuerys; + + std::cout << totalQueryCount << " queries finished in " << elpMs << " ms\n"; + std::cout << (float)totalQueryCount * 1000 / elpMs << "q/s\n"; + std::cout << "avg cost:" << totalQueryConsumeMs / totalQueryCount << " ms/q\n"; + + return 0; +} diff --git a/tests/script/api/stmt2-example.c b/tests/script/api/stmt2-example.c index 692ff90a06..6b59ba3118 100644 --- a/tests/script/api/stmt2-example.c +++ b/tests/script/api/stmt2-example.c @@ -33,7 +33,7 @@ void do_stmt(TAOS* taos) { char* tbs[2] = {"tb", "tb2"}; int t1_val[2] = {0, 1}; - int t2_len[2] = {10, 10}; + int t2_len[2] = {5, 5}; int t3_len[2] = {sizeof(int), sizeof(int)}; TAOS_STMT2_BIND tags[2][2] = {{{0, &t1_val[0], &t3_len[0], NULL, 0}, {0, "after1", &t2_len[0], NULL, 0}}, {{0, &t1_val[1], &t3_len[1], NULL, 0}, {0, "after2", &t2_len[1], NULL, 0}}}; @@ -87,7 +87,7 @@ void do_stmt(TAOS* taos) { taos_stmt2_close(stmt); return; } - + taos_stmt2_free_stb_fields(stmt, pFields); taos_stmt2_close(stmt); }