From e713b7c22fdd2cf9ae610a95513cc5652ae4df57 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 23 Oct 2023 16:27:00 +0800 Subject: [PATCH] enhance: insert stb skeleton development --- include/libs/nodes/querynodes.h | 4 +- source/libs/parser/src/parInsertSql.c | 219 ++++++++++++++++++-------- 2 files changed, 158 insertions(+), 65 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index bfd23d1965..6d8f16986d 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -413,6 +413,7 @@ typedef struct SVgDataBlocks { typedef void (*FFreeTableBlockHash)(SHashObj*); typedef void (*FFreeVgourpBlockArray)(SArray*); +struct SStbRowsDataContext; typedef struct SVnodeModifyOpStmt { ENodeType nodeType; ENodeType sqlNodeType; @@ -441,8 +442,7 @@ typedef struct SVnodeModifyOpStmt { bool fileProcessing; bool stbSyntax; - SName superTableName; - SName childTableName; + struct SStbRowsDataContext* pStbRowsCxt; } SVnodeModifyOpStmt; typedef struct SExplainOptions { diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index cf7726e81b..aa8704a62b 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -622,7 +622,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStm return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags); } -static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken, +static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SSchema* pTagSchema, SToken* pToken, SArray* pTagName, SArray* pTagVals, STag** pTag) { if (!isNullValue(pTagSchema->type, pToken)) { taosArrayPush(pTagName, pTagSchema->name); @@ -642,7 +642,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm STagVal val = {0}; int32_t code = - parseTagToken(&pStmt->pSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg); + parseTagToken(ppSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { taosArrayPush(pTagVals, &val); } @@ -845,7 +845,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON; code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { - code = parseTagValue(pCxt, pStmt, pTagSchema, &token, pTagName, pTagVals, &pTag); + code = parseTagValue(pCxt, pStmt, &pStmt->pSql, pTagSchema, &token, pTagName, pTagVals, &pTag); } } @@ -900,7 +900,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS return code; } -static int32_t storeTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { +static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { pStmt->pTableMeta->suid = pStmt->pTableMeta->uid; pStmt->pTableMeta->uid = pStmt->totalTbNum; pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; @@ -1038,7 +1038,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet return code; } -static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) { +static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) { int32_t code = TSDB_CODE_SUCCESS; SVgroupInfo vg; bool exists = true; @@ -1074,7 +1074,6 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi if (NULL != pStmt->pTableMeta) { if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) { pStmt->stbSyntax = true; - tNameAssign(&pStmt->superTableName, &pStmt->targetTableName); } else { code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } @@ -1088,10 +1087,9 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) { pStmt->stbSyntax = true; - tNameAssign(&pStmt->superTableName, &pStmt->targetTableName); } if (!pStmt->stbSyntax) { - code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); } } } @@ -1155,7 +1153,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); + code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); } if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj); @@ -1174,7 +1172,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOp code = getUsingTableSchema(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = storeTableMeta(pCxt, pStmt); + code = storeChildTableMeta(pCxt, pStmt); } return code; } @@ -1554,12 +1552,101 @@ static void clearColValArray(SArray* pCols) { } } -typedef struct SInsertStbParseContext { +typedef struct SStbRowsDataContext { + SVnodeModifyOpStmt* pStmt; + STableMeta* pStbMeta; + SName stbName; + SBoundColInfo boundColsInfo; + int32_t numOfBoundCols; + int32_t numOfBoundTags; -} SInsertStbParseContext; + SArray* aChildTableNames; + SArray* aTableDataCxts; + SArray* aCreateTbReqs; -static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, - SToken* pToken, bool *pGotTbName, char* tbName) { + SArray* aTagVals; + SArray* aColVals; + SArray* aTagNames; +} SStbRowsDataContext; + +typedef union SRowsDataContext{ + STableDataCxt* pTableDataCxt; + SStbRowsDataContext* pStbRowsCxt; +} SRowsDataContext; + +static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, + SToken* pToken) { + SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; + SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); + + bool isJsonTag = false; + SArray* pTagName = taosArrayInit(8, TSDB_COL_NAME_LEN); + SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)); + STag* pTag = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { + const char* pOrigSql = *ppSql; + bool ignoreComma = false; + NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma); + if (ignoreComma) { + code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql); + break; + } + + if (TK_NK_RP == pToken->type) { + code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); + break; + } + + if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) { + SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; + SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]); + code = parseValueToken(pCxt, ppSql, pToken, pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal); + } else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { + SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; + isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON; + code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); + if (TSDB_CODE_SUCCESS == code) { + code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pTag); + } + } + else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { + SColVal tbnameVal; + code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(), + getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal); + if (code == TSDB_CODE_SUCCESS) { + char tbName[TSDB_TABLE_NAME_LEN]; + memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData); + tbName[tbnameVal.value.nData] = '\0'; + } + } + + if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { + NEXT_VALID_TOKEN(*ppSql, *pToken); + if (TK_NK_COMMA != pToken->type) { + code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z); + } + } + } + + // child meta , vgroupid, check privilege + // refer to parInsertSml.c + // create or reuse table data context from tbName + // init submit data + // build create table request, refer to parseTagsClauseImpl + // build row and check table data order, refer to parseOneRow + + if (TSDB_CODE_SUCCESS == code) { + *pGotRow = true; + } + + return code; + + return TSDB_CODE_SUCCESS; +} + +static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) { SBoundColInfo* pCols = &pTableCxt->boundColsInfo; bool isParseBindParam = false; SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); @@ -1596,21 +1683,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } if (TSDB_CODE_SUCCESS == code) { - if (pCols->pColIndex[i] == getTbnameSchemaIndex(pTableCxt->pMeta)) { - SColVal tbnameVal; - code = parseValueToken(pCxt, pSql, pToken, (SSchema*)tGetTbnameColumnSchema(), getTableInfo(pTableCxt->pMeta).precision, &tbnameVal); - if (code == TSDB_CODE_SUCCESS) { - if (pGotTbName != NULL) { - *pGotTbName = true; - } - if (tbName != NULL) { - memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData); - tbName[tbnameVal.value.nData] = '\0'; - } - } - } else { - code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal); - } + code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal); } } @@ -1640,7 +1713,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } // pSql -> (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, int32_t* pNumOfRows, SToken* pToken) { int32_t code = TSDB_CODE_SUCCESS; @@ -1654,10 +1727,12 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, pStmt->pSql += index; bool gotRow = false; - bool gotTbname = false; - char tbName[TSDB_TABLE_NAME_LEN] = {0}; if (TSDB_CODE_SUCCESS == code) { - code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken, &gotTbname, tbName); + if (!pStmt->stbSyntax) { + code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken); + } else { + code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken); + } } if (TSDB_CODE_SUCCESS == code) { @@ -1682,10 +1757,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, } // VALUES (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext, SToken* pToken) { int32_t numOfRows = 0; - int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken); + int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken); if (TSDB_CODE_SUCCESS == code) { pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; @@ -1694,7 +1769,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return code; } -static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, int32_t* pNumOfRows) { int32_t code = TSDB_CODE_SUCCESS; (*pNumOfRows) = 0; @@ -1717,9 +1792,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt SToken token; strtolower(pLine, pLine); const char* pRow = pLine; - bool gotTbname = false; - char tbName[TSDB_TABLE_NAME_LEN] = {0}; - code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token, &gotTbname, tbName); + if (!pStmt->stbSyntax) { + code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token); + } else { + code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token); + } if (code && firstLine) { firstLine = false; code = 0; @@ -1749,9 +1826,9 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt return code; } -static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) { +static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { int32_t numOfRows = 0; - int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows); + int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows); if (TSDB_CODE_SUCCESS == code) { pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; @@ -1766,7 +1843,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt } static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath, - STableDataCxt* pTableCxt) { + SRowsDataContext rowsDataCxt) { char filePathStr[TSDB_FILENAME_LEN] = {0}; if (TK_NK_STRING == pFilePath->type) { trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr)); @@ -1778,10 +1855,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return TAOS_SYSTEM_ERROR(errno); } - return parseDataFromFileImpl(pCxt, pStmt, pTableCxt); + return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt); } -static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, SToken* pToken) { if (tsUseAdapter) { return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading"); @@ -1791,28 +1868,24 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z); } - return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt); + return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt); } // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path -static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) { +static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { SToken token; NEXT_TOKEN(pStmt->pSql, token); switch (token.type) { case TK_VALUES: - return parseValuesClause(pCxt, pStmt, pTableCxt, &token); + return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token); case TK_FILE: - return parseFileClause(pCxt, pStmt, pTableCxt, &token); + return parseFileClause(pCxt, pStmt, rowsDataCxt, &token); default: break; } return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } -static int32_t parseDataStbClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SBoundColInfo* pBoundColInfo) { - return TSDB_CODE_SUCCESS; -} - static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta); @@ -1824,9 +1897,16 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif SToken token; int32_t index = 0; code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo); + pStmt->pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext)); + + // fill SStbRowsDataCxt; + + SRowsDataContext rowsDataCxt; + rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt; if (TSDB_CODE_SUCCESS == code) { - code = parseDataStbClause(pCxt, pStmt, &stbBoundColInfo); + code = parseDataClause(pCxt, pStmt, rowsDataCxt); } + return code; } @@ -1837,8 +1917,10 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod if (!pStmt->stbSyntax) { STableDataCxt* pTableCxt = NULL; int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt); + SRowsDataContext rowsDataCxt; + rowsDataCxt.pTableDataCxt = pTableCxt; if (TSDB_CODE_SUCCESS == code) { - code = parseDataClause(pCxt, pStmt, pTableCxt); + code = parseDataClause(pCxt, pStmt, rowsDataCxt); } return code; } else { @@ -1848,6 +1930,7 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod } static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { + //TODO: reset env for stb syntax insDestroyBoundColInfo(&pCxt->tags); taosMemoryFreeClear(pStmt->pTableMeta); nodesDestroyNode(pStmt->pTagCond); @@ -2060,7 +2143,7 @@ static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMet return pRes->code; } -static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) { +static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) { if (1 != taosArrayGetSize(pTables)) { return TSDB_CODE_FAILED; } @@ -2116,10 +2199,10 @@ static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const S code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } if (TSDB_CODE_SUCCESS == code && isStb) { - code = storeTableMeta(pCxt, pStmt); + code = storeChildTableMeta(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code) { - code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); + code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); } if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) { code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt); @@ -2158,10 +2241,12 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) { pStmt->stbSyntax = true; } - if (pStmt->usingTableProcessing || pStmt->stbSyntax) { - return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); + if (!pStmt->stbSyntax) { + if (pStmt->usingTableProcessing) { + return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); + } + return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false); } - return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false); } return code; } @@ -2250,10 +2335,18 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp } static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { - STableDataCxt* pTableCxt = NULL; - int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt); + int32_t code = TSDB_CODE_SUCCESS; + SRowsDataContext rowsDataCxt; + + if (!pStmt->stbSyntax) { + STableDataCxt* pTableCxt = NULL; + code = getTableDataCxt(pCxt, pStmt, &pTableCxt); + rowsDataCxt.pTableDataCxt = pTableCxt; + } else { + rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt; + } if (TSDB_CODE_SUCCESS == code) { - code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt); + code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt); } if (TSDB_CODE_SUCCESS == code) {