From ac2ce21414d6f2ac7c0e6e6ddb200560e5d5f716 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sun, 6 Nov 2022 14:26:19 +0800 Subject: [PATCH] enh: insert optimize --- source/libs/parser/src/parInsertSql.c | 71 ++++++++++++++++++------- source/libs/parser/test/mockCatalog.cpp | 14 +++-- 2 files changed, 63 insertions(+), 22 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index cb7d8f8e88..ce75e4ff8a 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -43,6 +43,7 @@ typedef struct SInsertParseContext { char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW]; SParsedDataColInfo tags; // for stmt bool missCache; + bool usingDuplicateTable; } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -725,7 +726,7 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p // 1. [(field1_name, ...)] // 2. VALUES ... | FILE ... static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { - if ('\0' == pStmt->usingTableName.tname[0]) { + if (!pStmt->usingTableProcessing || pCxt->usingDuplicateTable) { return TSDB_CODE_SUCCESS; } @@ -854,7 +855,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifOpS if (TSDB_CODE_SUCCESS == code) { code = getUsingTableSchema(pCxt, pStmt); } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = storeTableMeta(pCxt, pStmt); } return code; @@ -874,11 +875,11 @@ static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpStmt* return getTargetTableSchema(pCxt, pStmt); } + pStmt->usingTableProcessing = true; // pStmt->pSql -> stb_name [(tag1_name, ...) pStmt->pSql += index; - bool duplicate = false; - int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &duplicate); - if (TSDB_CODE_SUCCESS == code && !duplicate) { + int32_t code = parseDuplicateUsingClause(pCxt, pStmt, &pCxt->usingDuplicateTable); + if (TSDB_CODE_SUCCESS == code && !pCxt->usingDuplicateTable) { return parseUsingTableNameImpl(pCxt, pStmt); } return code; @@ -1331,6 +1332,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, (*pNumOfRows) = 0; char* pLine = NULL; int64_t readLen = 0; + pStmt->fileProcessing = false; while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(pStmt->fp, &pLine)) != -1) { if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) { pLine[--readLen] = '\0'; @@ -1348,7 +1350,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, if (TSDB_CODE_SUCCESS == code) { SToken token; strtolower(pLine, pLine); - code = parseOneRow(pCxt, (const char**)&pLine, pDataBuf, &gotRow, &token); + const char* pRow = pLine; + code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token); } if (TSDB_CODE_SUCCESS == code && gotRow) { @@ -1357,12 +1360,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, } if (TSDB_CODE_SUCCESS == code && pDataBuf->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) { + pStmt->fileProcessing = true; break; } } if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && - (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { + (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) { code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); } return code; @@ -1383,13 +1387,13 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStm pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT); - if (taosEOFFile(pStmt->fp)) { + if (!pStmt->fileProcessing) { taosCloseFile(&pStmt->fp); } else { parserDebug("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId); } } - return TSDB_CODE_SUCCESS; + return code; } static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pFilePath, @@ -1534,6 +1538,10 @@ static void destroyEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt destroyBoundColumnInfo(&pCxt->tags); taosMemoryFreeClear(pStmt->pTableMeta); tdDestroySVCreateTbReq(&pStmt->createTblReq); + pCxt->missCache = false; + pCxt->usingDuplicateTable = false; + pStmt->usingTableProcessing = false; + pStmt->fileProcessing = false; } // tb_name @@ -1546,7 +1554,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt int32_t code = TSDB_CODE_SUCCESS; bool hasData = true; // for each table - while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache) { + while (TSDB_CODE_SUCCESS == code && hasData && !pCxt->missCache && !pStmt->fileProcessing) { destroyEnvPreTable(pCxt, pStmt); // pStmt->pSql -> tb_name ... NEXT_TOKEN(pStmt->pSql, token); @@ -1556,8 +1564,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt } } - parserDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pStmt->totalRowsNum); - if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { code = parseInsertBodyBottom(pCxt, pStmt); } @@ -1670,7 +1676,24 @@ static int32_t getTableSchemaFromMetaData(const SMetaData* pMetaData, SVnodeModi return code; } -static int32_t setVnodeModifOpStmt(SParseContext* pCxt, const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt) { +static void destoryTablesReq(void* p) { + STablesReq* pRes = (STablesReq*)p; + taosArrayDestroy(pRes->pTables); +} + +static void clearCatalogReq(SCatalogReq* pCatalogReq) { + taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); + pCatalogReq->pTableMeta = NULL; + taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); + pCatalogReq->pTableHash = NULL; + taosArrayDestroy(pCatalogReq->pUser); + pCatalogReq->pUser = NULL; +} + +static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData, + SVnodeModifOpStmt* pStmt) { + clearCatalogReq(pCatalogReq); + if (pCxt->pStmtCb) { (*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj); if (NULL == pStmt->pVgroupsHashObj) { @@ -1690,11 +1713,17 @@ static int32_t setVnodeModifOpStmt(SParseContext* pCxt, const SMetaData* pMetaDa return getTableSchemaFromMetaData(pMetaData, pStmt, false); } -static int32_t initInsertQuery(SParseContext* pCxt, const SMetaData* pMetaData, SQuery** pQuery) { +static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData, + SQuery** pQuery) { if (NULL == *pQuery) { return createInsertQuery(pCxt, pQuery); } - return setVnodeModifOpStmt(pCxt, pMetaData, (SVnodeModifOpStmt*)(*pQuery)->pRoot); + + if (!((SVnodeModifOpStmt*)(*pQuery)->pRoot)->fileProcessing) { + return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, (SVnodeModifOpStmt*)(*pQuery)->pRoot); + } + + return TSDB_CODE_SUCCESS; } static int32_t setRefreshMate(SQuery* pQuery) { @@ -1735,8 +1764,6 @@ static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifOpStm code = parseDataFromFileImpl(pCxt, pStmt, pDataBuf); } - parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pStmt->totalRowsNum); - if (TSDB_CODE_SUCCESS == code) { if (pStmt->fileProcessing) { code = parseInsertBodyBottom(pCxt, pStmt); @@ -1825,10 +1852,16 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) { if (pCxt->missCache) { + parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId, + ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + pQuery->execStage = QUERY_EXEC_STAGE_PARSE; return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq); } + parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId, + ((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum); + pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE; return TSDB_CODE_SUCCESS; } @@ -1837,9 +1870,11 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal SInsertParseContext context = { .pComCxt = pCxt, .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, + .missCache = false, + .usingDuplicateTable = false, }; - int32_t code = initInsertQuery(pCxt, pMetaData, pQuery); + int32_t code = initInsertQuery(pCxt, pCatalogReq, pMetaData, pQuery); if (TSDB_CODE_SUCCESS == code) { code = parseInsertSqlImpl(&context, (SVnodeModifOpStmt*)(*pQuery)->pRoot); } diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index 96ba8ab273..4f5ddd9a51 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -228,8 +228,7 @@ int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, SRequestConnInfo* pConn return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta); } -int32_t __catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, - STableMeta** pTableMeta) { +int32_t __catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta) { return g_mockCatalogService->catalogGetTableMeta(pTableName, pTableMeta, true); } @@ -238,8 +237,7 @@ int32_t __catalogGetTableHashVgroup(struct SCatalog* pCatalog, SRequestConnInfo* return g_mockCatalogService->catalogGetTableHashVgroup(pTableName, vgInfo); } -int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, - SVgroupInfo* pVgroup, bool* exists) { +int32_t __catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists) { int32_t code = g_mockCatalogService->catalogGetTableHashVgroup(pTableName, pVgroup, true); *exists = 0 != pVgroup->vgId; return code; @@ -269,6 +267,13 @@ int32_t __catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* us return 0; } +int32_t __catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool* pass, + bool* exists) { + *pass = true; + *exists = true; + return 0; +} + int32_t __catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo) { return g_mockCatalogService->catalogGetUdfInfo(funcName, pInfo); } @@ -311,6 +316,7 @@ void initMetaDataEnv() { stub.set(catalogGetDBVgList, __catalogGetDBVgList); stub.set(catalogGetDBCfg, __catalogGetDBCfg); stub.set(catalogChkAuth, __catalogChkAuth); + stub.set(catalogChkAuthFromCache, __catalogChkAuthFromCache); stub.set(catalogGetUdfInfo, __catalogGetUdfInfo); stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta); stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta);