enhance: insert stb skeleton development
This commit is contained in:
parent
5156c0d00c
commit
e713b7c22f
|
@ -413,6 +413,7 @@ typedef struct SVgDataBlocks {
|
||||||
typedef void (*FFreeTableBlockHash)(SHashObj*);
|
typedef void (*FFreeTableBlockHash)(SHashObj*);
|
||||||
typedef void (*FFreeVgourpBlockArray)(SArray*);
|
typedef void (*FFreeVgourpBlockArray)(SArray*);
|
||||||
|
|
||||||
|
struct SStbRowsDataContext;
|
||||||
typedef struct SVnodeModifyOpStmt {
|
typedef struct SVnodeModifyOpStmt {
|
||||||
ENodeType nodeType;
|
ENodeType nodeType;
|
||||||
ENodeType sqlNodeType;
|
ENodeType sqlNodeType;
|
||||||
|
@ -441,8 +442,7 @@ typedef struct SVnodeModifyOpStmt {
|
||||||
bool fileProcessing;
|
bool fileProcessing;
|
||||||
|
|
||||||
bool stbSyntax;
|
bool stbSyntax;
|
||||||
SName superTableName;
|
struct SStbRowsDataContext* pStbRowsCxt;
|
||||||
SName childTableName;
|
|
||||||
} SVnodeModifyOpStmt;
|
} SVnodeModifyOpStmt;
|
||||||
|
|
||||||
typedef struct SExplainOptions {
|
typedef struct SExplainOptions {
|
||||||
|
|
|
@ -622,7 +622,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStm
|
||||||
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
|
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken,
|
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SSchema* pTagSchema, SToken* pToken,
|
||||||
SArray* pTagName, SArray* pTagVals, STag** pTag) {
|
SArray* pTagName, SArray* pTagVals, STag** pTag) {
|
||||||
if (!isNullValue(pTagSchema->type, pToken)) {
|
if (!isNullValue(pTagSchema->type, pToken)) {
|
||||||
taosArrayPush(pTagName, pTagSchema->name);
|
taosArrayPush(pTagName, pTagSchema->name);
|
||||||
|
@ -642,7 +642,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm
|
||||||
|
|
||||||
STagVal val = {0};
|
STagVal val = {0};
|
||||||
int32_t code =
|
int32_t code =
|
||||||
parseTagToken(&pStmt->pSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg);
|
parseTagToken(ppSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
taosArrayPush(pTagVals, &val);
|
taosArrayPush(pTagVals, &val);
|
||||||
}
|
}
|
||||||
|
@ -845,7 +845,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
||||||
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
|
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
|
||||||
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
|
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseTagValue(pCxt, pStmt, pTagSchema, &token, pTagName, pTagVals, &pTag);
|
code = parseTagValue(pCxt, pStmt, &pStmt->pSql, pTagSchema, &token, pTagName, pTagVals, &pTag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -900,7 +900,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t storeTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
|
pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
|
||||||
pStmt->pTableMeta->uid = pStmt->totalTbNum;
|
pStmt->pTableMeta->uid = pStmt->totalTbNum;
|
||||||
pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
|
pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
|
||||||
|
@ -1038,7 +1038,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
|
static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
bool exists = true;
|
bool exists = true;
|
||||||
|
@ -1074,7 +1074,6 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi
|
||||||
if (NULL != pStmt->pTableMeta) {
|
if (NULL != pStmt->pTableMeta) {
|
||||||
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
pStmt->stbSyntax = true;
|
pStmt->stbSyntax = true;
|
||||||
tNameAssign(&pStmt->superTableName, &pStmt->targetTableName);
|
|
||||||
} else {
|
} else {
|
||||||
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
||||||
}
|
}
|
||||||
|
@ -1088,10 +1087,9 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
|
if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
|
||||||
pStmt->stbSyntax = true;
|
pStmt->stbSyntax = true;
|
||||||
tNameAssign(&pStmt->superTableName, &pStmt->targetTableName);
|
|
||||||
}
|
}
|
||||||
if (!pStmt->stbSyntax) {
|
if (!pStmt->stbSyntax) {
|
||||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1155,7 +1153,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
||||||
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable);
|
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
|
code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
|
||||||
code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
|
code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
|
||||||
|
@ -1174,7 +1172,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOp
|
||||||
code = getUsingTableSchema(pCxt, pStmt);
|
code = getUsingTableSchema(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = storeTableMeta(pCxt, pStmt);
|
code = storeChildTableMeta(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1554,12 +1552,101 @@ static void clearColValArray(SArray* pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SInsertStbParseContext {
|
typedef struct SStbRowsDataContext {
|
||||||
|
SVnodeModifyOpStmt* pStmt;
|
||||||
|
STableMeta* pStbMeta;
|
||||||
|
SName stbName;
|
||||||
|
SBoundColInfo boundColsInfo;
|
||||||
|
int32_t numOfBoundCols;
|
||||||
|
int32_t numOfBoundTags;
|
||||||
|
|
||||||
} SInsertStbParseContext;
|
SArray* aChildTableNames;
|
||||||
|
SArray* aTableDataCxts;
|
||||||
|
SArray* aCreateTbReqs;
|
||||||
|
|
||||||
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
|
SArray* aTagVals;
|
||||||
SToken* pToken, bool *pGotTbName, char* tbName) {
|
SArray* aColVals;
|
||||||
|
SArray* aTagNames;
|
||||||
|
} SStbRowsDataContext;
|
||||||
|
|
||||||
|
typedef union SRowsDataContext{
|
||||||
|
STableDataCxt* pTableDataCxt;
|
||||||
|
SStbRowsDataContext* pStbRowsCxt;
|
||||||
|
} SRowsDataContext;
|
||||||
|
|
||||||
|
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
|
||||||
|
SToken* pToken) {
|
||||||
|
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
|
||||||
|
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
|
||||||
|
|
||||||
|
bool isJsonTag = false;
|
||||||
|
SArray* pTagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||||
|
SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal));
|
||||||
|
STag* pTag = NULL;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
|
||||||
|
const char* pOrigSql = *ppSql;
|
||||||
|
bool ignoreComma = false;
|
||||||
|
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
|
||||||
|
if (ignoreComma) {
|
||||||
|
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TK_NK_RP == pToken->type) {
|
||||||
|
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) {
|
||||||
|
SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
|
||||||
|
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
|
||||||
|
code = parseValueToken(pCxt, ppSql, pToken, pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal);
|
||||||
|
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||||
|
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
|
||||||
|
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
|
||||||
|
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
|
||||||
|
SColVal tbnameVal;
|
||||||
|
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(),
|
||||||
|
getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
|
memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData);
|
||||||
|
tbName[tbnameVal.value.nData] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
|
||||||
|
NEXT_VALID_TOKEN(*ppSql, *pToken);
|
||||||
|
if (TK_NK_COMMA != pToken->type) {
|
||||||
|
code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// child meta , vgroupid, check privilege
|
||||||
|
// refer to parInsertSml.c
|
||||||
|
// create or reuse table data context from tbName
|
||||||
|
// init submit data
|
||||||
|
// build create table request, refer to parseTagsClauseImpl
|
||||||
|
// build row and check table data order, refer to parseOneRow
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pGotRow = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) {
|
||||||
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
|
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
|
||||||
bool isParseBindParam = false;
|
bool isParseBindParam = false;
|
||||||
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
|
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
|
||||||
|
@ -1596,23 +1683,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (pCols->pColIndex[i] == getTbnameSchemaIndex(pTableCxt->pMeta)) {
|
|
||||||
SColVal tbnameVal;
|
|
||||||
code = parseValueToken(pCxt, pSql, pToken, (SSchema*)tGetTbnameColumnSchema(), getTableInfo(pTableCxt->pMeta).precision, &tbnameVal);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
if (pGotTbName != NULL) {
|
|
||||||
*pGotTbName = true;
|
|
||||||
}
|
|
||||||
if (tbName != NULL) {
|
|
||||||
memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData);
|
|
||||||
tbName[tbnameVal.value.nData] = '\0';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
|
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
|
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
|
||||||
NEXT_VALID_TOKEN(*pSql, *pToken);
|
NEXT_VALID_TOKEN(*pSql, *pToken);
|
||||||
|
@ -1640,7 +1713,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
|
||||||
}
|
}
|
||||||
|
|
||||||
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
|
||||||
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
|
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
|
||||||
int32_t* pNumOfRows, SToken* pToken) {
|
int32_t* pNumOfRows, SToken* pToken) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -1654,10 +1727,12 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
||||||
pStmt->pSql += index;
|
pStmt->pSql += index;
|
||||||
|
|
||||||
bool gotRow = false;
|
bool gotRow = false;
|
||||||
bool gotTbname = false;
|
|
||||||
char tbName[TSDB_TABLE_NAME_LEN] = {0};
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken, &gotTbname, tbName);
|
if (!pStmt->stbSyntax) {
|
||||||
|
code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
|
||||||
|
} else {
|
||||||
|
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1682,10 +1757,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
||||||
}
|
}
|
||||||
|
|
||||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
|
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
|
||||||
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
|
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext,
|
||||||
SToken* pToken) {
|
SToken* pToken) {
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken);
|
int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pStmt->totalRowsNum += numOfRows;
|
pStmt->totalRowsNum += numOfRows;
|
||||||
pStmt->totalTbNum += 1;
|
pStmt->totalTbNum += 1;
|
||||||
|
@ -1694,7 +1769,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
|
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
|
||||||
int32_t* pNumOfRows) {
|
int32_t* pNumOfRows) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
(*pNumOfRows) = 0;
|
(*pNumOfRows) = 0;
|
||||||
|
@ -1717,9 +1792,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
||||||
SToken token;
|
SToken token;
|
||||||
strtolower(pLine, pLine);
|
strtolower(pLine, pLine);
|
||||||
const char* pRow = pLine;
|
const char* pRow = pLine;
|
||||||
bool gotTbname = false;
|
if (!pStmt->stbSyntax) {
|
||||||
char tbName[TSDB_TABLE_NAME_LEN] = {0};
|
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
|
||||||
code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token, &gotTbname, tbName);
|
} else {
|
||||||
|
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token);
|
||||||
|
}
|
||||||
if (code && firstLine) {
|
if (code && firstLine) {
|
||||||
firstLine = false;
|
firstLine = false;
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -1749,9 +1826,9 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
|
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows);
|
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pStmt->totalRowsNum += numOfRows;
|
pStmt->totalRowsNum += numOfRows;
|
||||||
pStmt->totalTbNum += 1;
|
pStmt->totalTbNum += 1;
|
||||||
|
@ -1766,7 +1843,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
|
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
|
||||||
STableDataCxt* pTableCxt) {
|
SRowsDataContext rowsDataCxt) {
|
||||||
char filePathStr[TSDB_FILENAME_LEN] = {0};
|
char filePathStr[TSDB_FILENAME_LEN] = {0};
|
||||||
if (TK_NK_STRING == pFilePath->type) {
|
if (TK_NK_STRING == pFilePath->type) {
|
||||||
trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
|
trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
|
||||||
|
@ -1778,10 +1855,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
return parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
|
return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
|
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
|
||||||
SToken* pToken) {
|
SToken* pToken) {
|
||||||
if (tsUseAdapter) {
|
if (tsUseAdapter) {
|
||||||
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
|
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
|
||||||
|
@ -1791,28 +1868,24 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
||||||
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
|
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
|
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
|
||||||
}
|
}
|
||||||
return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt);
|
return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||||
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
|
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
|
||||||
SToken token;
|
SToken token;
|
||||||
NEXT_TOKEN(pStmt->pSql, token);
|
NEXT_TOKEN(pStmt->pSql, token);
|
||||||
switch (token.type) {
|
switch (token.type) {
|
||||||
case TK_VALUES:
|
case TK_VALUES:
|
||||||
return parseValuesClause(pCxt, pStmt, pTableCxt, &token);
|
return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
|
||||||
case TK_FILE:
|
case TK_FILE:
|
||||||
return parseFileClause(pCxt, pStmt, pTableCxt, &token);
|
return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseDataStbClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SBoundColInfo* pBoundColInfo) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
|
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
|
||||||
|
@ -1824,9 +1897,16 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
|
||||||
SToken token;
|
SToken token;
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo);
|
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo);
|
||||||
|
pStmt->pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
|
||||||
|
|
||||||
|
// fill SStbRowsDataCxt;
|
||||||
|
|
||||||
|
SRowsDataContext rowsDataCxt;
|
||||||
|
rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseDataStbClause(pCxt, pStmt, &stbBoundColInfo);
|
code = parseDataClause(pCxt, pStmt, rowsDataCxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1837,8 +1917,10 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
|
||||||
if (!pStmt->stbSyntax) {
|
if (!pStmt->stbSyntax) {
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
|
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
|
||||||
|
SRowsDataContext rowsDataCxt;
|
||||||
|
rowsDataCxt.pTableDataCxt = pTableCxt;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseDataClause(pCxt, pStmt, pTableCxt);
|
code = parseDataClause(pCxt, pStmt, rowsDataCxt);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1848,6 +1930,7 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
|
//TODO: reset env for stb syntax
|
||||||
insDestroyBoundColInfo(&pCxt->tags);
|
insDestroyBoundColInfo(&pCxt->tags);
|
||||||
taosMemoryFreeClear(pStmt->pTableMeta);
|
taosMemoryFreeClear(pStmt->pTableMeta);
|
||||||
nodesDestroyNode(pStmt->pTagCond);
|
nodesDestroyNode(pStmt->pTagCond);
|
||||||
|
@ -2060,7 +2143,7 @@ static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMet
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
|
static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
|
||||||
if (1 != taosArrayGetSize(pTables)) {
|
if (1 != taosArrayGetSize(pTables)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -2116,10 +2199,10 @@ static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const S
|
||||||
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
|
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && isStb) {
|
if (TSDB_CODE_SUCCESS == code && isStb) {
|
||||||
code = storeTableMeta(pCxt, pStmt);
|
code = storeChildTableMeta(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
|
code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
|
if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
|
||||||
code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
|
code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
|
||||||
|
@ -2158,11 +2241,13 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata
|
||||||
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
|
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
|
||||||
pStmt->stbSyntax = true;
|
pStmt->stbSyntax = true;
|
||||||
}
|
}
|
||||||
if (pStmt->usingTableProcessing || pStmt->stbSyntax) {
|
if (!pStmt->stbSyntax) {
|
||||||
|
if (pStmt->usingTableProcessing) {
|
||||||
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
|
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
|
||||||
}
|
}
|
||||||
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
|
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2250,10 +2335,18 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SRowsDataContext rowsDataCxt;
|
||||||
|
|
||||||
|
if (!pStmt->stbSyntax) {
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
|
code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
|
||||||
|
rowsDataCxt.pTableDataCxt = pTableCxt;
|
||||||
|
} else {
|
||||||
|
rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
|
code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
Loading…
Reference in New Issue