From e2b18a71822e346b1bfdbc397fa60c487d034dfc Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Tue, 4 Mar 2025 10:04:58 +0800 Subject: [PATCH] End(insert):Use cache to improve auto create table performance. --- include/common/tmsg.h | 2 + include/libs/catalog/catalog.h | 1 + source/client/test/stmt2Test.cpp | 4 +- source/common/src/msg/tmsg.c | 6 ++ source/dnode/vnode/src/vnd/vnodeQuery.c | 6 ++ source/libs/catalog/inc/catalogInt.h | 1 + source/libs/catalog/src/ctgAsync.c | 3 + source/libs/catalog/src/ctgRemote.c | 1 + source/libs/parser/src/parInsertSql.c | 112 ++++++++++++++++++++---- 9 files changed, 120 insertions(+), 16 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5d4af4cd08..37bf9c8c34 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -490,6 +490,7 @@ typedef enum ENodeType { typedef struct { int32_t vgId; uint8_t option; // 0x0 REQ_OPT_TBNAME, 0x01 REQ_OPT_TBUID + uint8_t autoCreateCtb; // 0x0 not auto create, 0x01 auto create const char* dbFName; const char* tbName; } SBuildTableInput; @@ -2173,6 +2174,7 @@ typedef struct { char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; uint8_t option; + uint8_t autoCreateCtb; } STableInfoReq; int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq); diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 7c6f02513e..93e0fdfb4c 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -79,6 +79,7 @@ typedef struct SDbInfo { typedef struct STablesReq { char dbFName[TSDB_DB_FNAME_LEN]; SArray* pTables; + uint8_t autoCreate; // 0x0 not auto create, 0x01 auto create } STablesReq; typedef struct SCatalogReq { diff --git a/source/client/test/stmt2Test.cpp b/source/client/test/stmt2Test.cpp index 6bae063124..8675f70944 100644 --- a/source/client/test/stmt2Test.cpp +++ b/source/client/test/stmt2Test.cpp @@ -1927,8 +1927,10 @@ TEST(stmt2Case, async_order) { while (!stop_task) { auto elapsed_time = std::chrono::steady_clock::now() - start_time; if (std::chrono::duration_cast(elapsed_time).count() > 60) { + if (t.joinable()) { + t.detach(); + } FAIL() << "Test[stmt2_async_test] timed out"; - t.detach(); break; } std::this_thread::sleep_for(std::chrono::seconds(1)); // 每 1s 检查一次 diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index d599799d59..b81b094882 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -6292,6 +6292,7 @@ int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dbFName)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->tbName)); TAOS_CHECK_EXIT(tEncodeU8(&encoder, pReq->option)); + TAOS_CHECK_EXIT(tEncodeU8(&encoder, pReq->autoCreateCtb)); tEndEncode(&encoder); _exit: @@ -6331,6 +6332,11 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq } else { pReq->option = 0; } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeU8(&decoder, &pReq->autoCreateCtb)); + } else { + pReq->autoCreateCtb = 0; + } tEndDecode(&decoder); _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 49dfb99499..34894825f2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -90,12 +90,14 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { void *pRsp = NULL; SSchemaWrapper schema = {0}; SSchemaWrapper schemaTag = {0}; + uint8_t autoCreateCtb = 0; // decode req if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { code = terrno; goto _exit4; } + autoCreateCtb = infoReq.autoCreateCtb; if (infoReq.option == REQ_OPT_TBUID) reqTbUid = true; metaRsp.dbId = pVnode->config.dbId; @@ -223,6 +225,10 @@ _exit4: rpcMsg.code = code; rpcMsg.msgType = pMsg->msgType; + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && autoCreateCtb == 1) { + code = TSDB_CODE_SUCCESS; + } + if (code) { qError("get table %s meta with %" PRIu8 " failed cause of %s", infoReq.tbName, infoReq.option, tstrerror(code)); } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index dd553ac301..f254a4f52c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -481,6 +481,7 @@ struct SCtgTask { typedef struct SCtgTaskReq { SCtgTask* pTask; int32_t msgIdx; + uint8_t autoCreateCtb; } SCtgTaskReq; typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 917d9feed6..0e7751a99e 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -3093,6 +3093,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgJob* pJob = pTask->pJob; SName* pName = NULL; + bool autoCreate = false; int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t fetchIdx = 0; @@ -3103,6 +3104,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { ctgError("fail to get the %dth STablesReq, num:%d", i, dbNum); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + autoCreate = pReq->autoCreate; ctgDebug("start to check tb metas in db %s, tbNum %ld", pReq->dbFName, taosArrayGetSize(pReq->pTables)); CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); @@ -3143,6 +3145,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { } SCtgTaskReq tReq; + tReq.autoCreateCtb = (autoCreate && i == pCtx->fetchNum - 1) ? 1 : 0; tReq.pTask = pTask; tReq.msgIdx = pFetch->fetchIdx; CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId)); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index ec93b7dee2..dca4e2d2fa 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -1380,6 +1380,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .option = reqType == TDMT_VND_TABLE_NAME ? REQ_OPT_TBUID : REQ_OPT_TBNAME, + .autoCreateCtb = tReq->autoCreateCtb, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)}; char* msg = NULL; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 5ff6e4f555..0c60a787ce 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -35,6 +35,8 @@ typedef struct SInsertParseContext { } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); +static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt); +static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate); static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; @@ -102,6 +104,7 @@ static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) { } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) +#if 0 static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) { int32_t code = TSDB_CODE_SUCCESS; SToken token; @@ -137,6 +140,29 @@ static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) { return code; } +#else +static int32_t ignoreUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { + const char** pSql = &pStmt->pSql; + int32_t code = TSDB_CODE_SUCCESS; + SToken token; + NEXT_TOKEN(*pSql, token); + code = parseBoundTagsClause(pCxt, pStmt); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + // pSql -> TAGS (tag1_value, ...) + code = parseTagsClause(pCxt, pStmt, true); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + if (TSDB_CODE_SUCCESS == code) { + code = skipTableOptions(pCxt, pSql); + } + + return code; +} +#endif static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) { int32_t code = TSDB_CODE_SUCCESS; @@ -150,10 +176,12 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName)); if (NULL != pMeta) { *pDuplicate = true; - code = ignoreUsingClause(pCxt, &pStmt->pSql); - if (TSDB_CODE_SUCCESS == code) { - return cloneTableMeta(*pMeta, &pStmt->pTableMeta); + pCxt->missCache = false; + code = cloneTableMeta(*pMeta, &pStmt->pTableMeta); + if (TSDB_CODE_SUCCESS != code) { + return code; } + return ignoreUsingClause(pCxt, pStmt); } return code; @@ -937,7 +965,7 @@ static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode* } // pSql -> tag1_value, ...) -static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { +static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) { int32_t code = TSDB_CODE_SUCCESS; SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta); SArray* pTagVals = NULL; @@ -1011,7 +1039,7 @@ _exit: // input pStmt->pSql: TAGS (tag1_value, ...) [table_options] ... // output pStmt->pSql: [table_options] ... -static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { +static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) { SToken token; NEXT_TOKEN(pStmt->pSql, token); if (TK_TAGS != token.type) { @@ -1023,7 +1051,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z); } - int32_t code = parseTagsClauseImpl(pCxt, pStmt); + int32_t code = parseTagsClauseImpl(pCxt, pStmt, autoCreate); if (TSDB_CODE_SUCCESS == code) { NEXT_VALID_TOKEN(pStmt->pSql, token); if (TK_NK_COMMA == token.type) { @@ -1108,7 +1136,7 @@ static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpS int32_t code = parseBoundTagsClause(pCxt, pStmt); if (TSDB_CODE_SUCCESS == code) { - code = parseTagsClause(pCxt, pStmt); + code = parseTagsClause(pCxt, pStmt, false); } if (TSDB_CODE_SUCCESS == code) { code = parseTableOptions(pCxt, pStmt); @@ -1289,13 +1317,12 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpS } static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; if (pCxt->forceUpdate) { pCxt->missCache = true; return TSDB_CODE_SUCCESS; } - - int32_t code = checkAuth(pCxt->pComCxt, &pStmt->usingTableName, &pCxt->missCache, &pStmt->pTagCond); - if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { + if (!pCxt->missCache) { bool bUsingTable = true; code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable); } @@ -1333,15 +1360,27 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOp static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { SToken token; int32_t index = 0; + int32_t code = TSDB_CODE_SUCCESS; + NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index); - if (TK_USING != token.type) { - return getTargetTableSchema(pCxt, pStmt); + if (pCxt->isStmtBind) { + if (token.type != TK_USING) { + return getTargetTableSchema(pCxt, pStmt); + } + } else { + code = getTargetTableSchema(pCxt, pStmt); + if (token.type != TK_USING) { + return code; + } else if ((!pCxt->missCache) && (TSDB_CODE_SUCCESS == code) && (!pCxt->isStmtBind)) { + pStmt->pSql += index; + return ignoreUsingClause(pCxt, pStmt); + } } pStmt->usingTableProcessing = true; // pStmt->pSql -> stb_name [(tag1_name, ...) pStmt->pSql += index; - int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable); + code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable); if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) { return parseUsingTableNameImpl(pCxt, pStmt); } @@ -2842,7 +2881,7 @@ static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) { } static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) { - if (1 != taosArrayGetSize(pTables)) { + if (1 != taosArrayGetSize(pTables) && 2 != taosArrayGetSize(pTables)) { return TSDB_CODE_FAILED; } @@ -3119,6 +3158,29 @@ static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return parseInsertSqlFromTable(pCxt, pStmt); } +static int32_t buildUsingInsertTableReq(SName* pSName, SName* pCName, SArray** pTables) { + if (NULL == *pTables) { + *pTables = taosArrayInit(2, sizeof(SName)); + if (NULL == *pTables) { + goto _err; + } + } + if (NULL == taosArrayPush(*pTables, pSName)) { + goto _err; + } + if (NULL == taosArrayPush(*pTables, pCName)) { + goto _err; + } + return TSDB_CODE_SUCCESS; + +_err: + if (NULL != *pTables) { + taosArrayDestroy(*pTables); + *pTables = NULL; + } + return terrno; +} + static int32_t buildInsertTableReq(SName* pName, SArray** pTables) { *pTables = taosArrayInit(1, sizeof(SName)); if (NULL == *pTables) { @@ -3133,6 +3195,26 @@ static int32_t buildInsertTableReq(SName* pName, SArray** pTables) { return TSDB_CODE_SUCCESS; } +static int32_t buildInsertUsingDbReq(SName* pSName, SName* pCName, SArray** pDbs) { + if (NULL == *pDbs) { + *pDbs = taosArrayInit(1, sizeof(STablesReq)); + if (NULL == *pDbs) { + return terrno; + } + } + + STablesReq req = {0}; + req.autoCreate = 1; + (void)tNameGetFullDbName(pSName, req.dbFName); + (void)tNameGetFullDbName(pCName, req.dbFName); + + int32_t code = buildUsingInsertTableReq(pSName, pCName, &req.pTables); + if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + return code; +} + static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) { if (NULL == *pDbs) { *pDbs = taosArrayInit(1, sizeof(STablesReq)); @@ -3182,7 +3264,7 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpSt if (0 == pStmt->usingTableName.type) { code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta); } else { - code = buildInsertDbReq(&pStmt->usingTableName, &pCatalogReq->pTableMeta); + code = buildInsertUsingDbReq(&pStmt->usingTableName, &pStmt->targetTableName, &pCatalogReq->pTableMeta); } } if (TSDB_CODE_SUCCESS == code) {