enhance: get tbname from one row for stb syntax

This commit is contained in:
shenglian zhou 2023-10-22 15:14:50 +08:00
parent 6488672dd6
commit 5156c0d00c
1 changed files with 66 additions and 77 deletions

View File

@ -182,9 +182,32 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
return TSDB_CODE_SUCCESS;
}
typedef enum {
BOUND_TAGS,
BOUND_COLUMNS,
BOUND_ALL_AND_TBNAME
} EBoundColumnsType;
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
}
// pStmt->pSql -> field1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SSchema* pSchema,
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType, STableMeta* pTableMeta,
SBoundColInfo* pBoundInfo) {
SSchema* pSchema = NULL;
if (boundColsType == BOUND_TAGS) {
pSchema = getTableTagSchema(pTableMeta);
} else if (boundColsType == BOUND_COLUMNS) {
pSchema = getTableColumnSchema(pTableMeta);
} else {
pSchema = pTableMeta->schema;
if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
}
int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta);
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -207,6 +230,14 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
token.z = tmpTokenBuf;
token.n = strdequote(token.z);
if (boundColsType == BOUND_ALL_AND_TBNAME) {
if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
pUseCols[tbnameSchemaIndex] = true;
++pBoundInfo->numOfBound;
continue;
}
}
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
@ -224,10 +255,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
}
}
if (TSDB_CODE_SUCCESS == code && !isTags && !pUseCols[0]) {
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
}
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) &&!pUseCols[tbnameSchemaIndex]) {
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
}
taosMemoryFree(pUseCols);
return code;
@ -586,7 +619,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStm
}
pStmt->pSql += index;
return parseBoundColumns(pCxt, &pStmt->pSql, true, getTableTagSchema(pStmt->pTableMeta), &pCxt->tags);
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
}
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken,
@ -1220,12 +1253,12 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOp
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
// pStmt->pSql -> field1_name, ...)
return parseBoundColumns(pCxt, &pStmt->pSql, false, getTableColumnSchema(pStmt->pTableMeta),
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta,
&pTableCxt->boundColsInfo);
}
if (NULL != pStmt->pBoundCols) {
return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, getTableColumnSchema(pStmt->pTableMeta),
return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta,
&pTableCxt->boundColsInfo);
}
@ -1521,8 +1554,12 @@ static void clearColValArray(SArray* pCols) {
}
}
typedef struct SInsertStbParseContext {
} SInsertStbParseContext;
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) {
SToken* pToken, bool *pGotTbName, char* tbName) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
@ -1559,7 +1596,21 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
}
if (TSDB_CODE_SUCCESS == code) {
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
if (pCols->pColIndex[i] == getTbnameSchemaIndex(pTableCxt->pMeta)) {
SColVal tbnameVal;
code = parseValueToken(pCxt, pSql, pToken, (SSchema*)tGetTbnameColumnSchema(), getTableInfo(pTableCxt->pMeta).precision, &tbnameVal);
if (code == TSDB_CODE_SUCCESS) {
if (pGotTbName != NULL) {
*pGotTbName = true;
}
if (tbName != NULL) {
memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData);
tbName[tbnameVal.value.nData] = '\0';
}
}
} else {
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
}
}
}
@ -1603,8 +1654,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
pStmt->pSql += index;
bool gotRow = false;
bool gotTbname = false;
char tbName[TSDB_TABLE_NAME_LEN] = {0};
if (TSDB_CODE_SUCCESS == code) {
code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken);
code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken, &gotTbname, tbName);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1664,8 +1717,9 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
SToken token;
strtolower(pLine, pLine);
const char* pRow = pLine;
code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token);
bool gotTbname = false;
char tbName[TSDB_TABLE_NAME_LEN] = {0};
code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token, &gotTbname, tbName);
if (code && firstLine) {
firstLine = false;
code = 0;
@ -1755,71 +1809,6 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
static int32_t parseBoundStbColumnsClause(SInsertParseContext* pCxt, const char** pSql,
STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
// tbname schema index == (pBoundInfo->numOfCols - 1) == (tiInfo.numOfColumns + tiInfo.numOfTags)
if (pBoundInfo->numOfCols != pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns + 1) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
int32_t tbnameSchemaIndex = pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
while (TSDB_CODE_SUCCESS == code) {
SToken token;
NEXT_TOKEN(*pSql, token);
if (TK_NK_RP == token.type) {
break;
}
char tmpTokenBuf[TSDB_COL_NAME_LEN + 2] = {0}; // used for deleting Escape character backstick(`)
strncpy(tmpTokenBuf, token.z, token.n);
token.z = tmpTokenBuf;
token.n = strdequote(token.z);
if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
pUseCols[tbnameSchemaIndex] = true;
++pBoundInfo->numOfBound;
continue;
}
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pTableMeta->schema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pTableMeta->schema);
}
if (index < 0) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
} else if (pUseCols[index]) {
code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
}
}
if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
}
if (TSDB_CODE_SUCCESS == code && !pUseCols[tbnameSchemaIndex]) {
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
}
taosMemoryFree(pUseCols);
return code;
}
static int32_t parseDataStbClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SBoundColInfo* pBoundColInfo) {
return TSDB_CODE_SUCCESS;
}
@ -1834,7 +1823,7 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
}
SToken token;
int32_t index = 0;
code = parseBoundStbColumnsClause(pCxt, &pStmt->pBoundCols, pStmt->pTableMeta, &stbBoundColInfo);
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo);
if (TSDB_CODE_SUCCESS == code) {
code = parseDataStbClause(pCxt, pStmt, &stbBoundColInfo);
}