Merge branch 'szhou/continue-coing' into szhou/insert-stb-tbname

This commit is contained in:
slzhou 2023-10-25 13:30:49 +08:00
commit efe480ae14
7 changed files with 447 additions and 80 deletions

View File

@ -413,7 +413,8 @@ typedef struct SVgDataBlocks {
typedef void (*FFreeTableBlockHash)(SHashObj*);
typedef void (*FFreeVgourpBlockArray)(SArray*);
struct SStbRowsDataContext;
typedef void (*FFreeStbRowsDataContext)(struct SStbRowsDataContext*);
typedef struct SVnodeModifyOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
@ -428,11 +429,11 @@ typedef struct SVnodeModifyOpStmt {
struct STableMeta* pTableMeta;
SNode* pTagCond;
SArray* pTableTag;
SHashObj* pVgroupsHashObj;
SHashObj* pVgroupsHashObj; // SHashObj<vgId, SVgInfo>
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SHashObj* pSubTableHashObj; // SHashObj<table_name, STableMeta*>
SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode
SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode
SArray* pVgDataBlocks; // SArray<SVgroupDataCxt*>
SVCreateTbReq* pCreateTblReq;
TdFilePtr fp;
@ -442,8 +443,8 @@ typedef struct SVnodeModifyOpStmt {
bool fileProcessing;
bool stbSyntax;
SName superTableName;
SName childTableName;
struct SStbRowsDataContext* pStbRowsCxt;
FFreeStbRowsDataContext freeStbRowsCxtFunc;
} SVnodeModifyOpStmt;
typedef struct SExplainOptions {

View File

@ -879,6 +879,10 @@ void nodesDestroyNode(SNode* pNode) {
}
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
taosMemoryFreeClear(pStmt->pCreateTblReq);
if (pStmt->freeStbRowsCxtFunc) {
pStmt->freeStbRowsCxtFunc(pStmt->pStbRowsCxt);
}
taosMemoryFreeClear(pStmt->pStbRowsCxt);
taosCloseFile(&pStmt->fp);
break;
}

View File

@ -45,6 +45,7 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
SArray *tagName, uint8_t tagNum, int32_t ttl);
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode);

View File

@ -182,9 +182,32 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
return TSDB_CODE_SUCCESS;
}
typedef enum {
BOUND_TAGS,
BOUND_COLUMNS,
BOUND_ALL_AND_TBNAME
} EBoundColumnsType;
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
}
// pStmt->pSql -> field1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SSchema* pSchema,
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType, STableMeta* pTableMeta,
SBoundColInfo* pBoundInfo) {
SSchema* pSchema = NULL;
if (boundColsType == BOUND_TAGS) {
pSchema = getTableTagSchema(pTableMeta);
} else if (boundColsType == BOUND_COLUMNS) {
pSchema = getTableColumnSchema(pTableMeta);
} else {
pSchema = pTableMeta->schema;
if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) {
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
}
int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta);
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -207,6 +230,14 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
token.z = tmpTokenBuf;
token.n = strdequote(token.z);
if (boundColsType == BOUND_ALL_AND_TBNAME) {
if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
pUseCols[tbnameSchemaIndex] = true;
++pBoundInfo->numOfBound;
continue;
}
}
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
@ -224,10 +255,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
}
}
if (TSDB_CODE_SUCCESS == code && !isTags && !pUseCols[0]) {
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
}
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) &&!pUseCols[tbnameSchemaIndex]) {
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
}
taosMemoryFree(pUseCols);
return code;
@ -586,10 +619,10 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStm
}
pStmt->pSql += index;
return parseBoundColumns(pCxt, &pStmt->pSql, true, getTableTagSchema(pStmt->pTableMeta), &pCxt->tags);
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags);
}
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken,
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SSchema* pTagSchema, SToken* pToken,
SArray* pTagName, SArray* pTagVals, STag** pTag) {
if (!isNullValue(pTagSchema->type, pToken)) {
taosArrayPush(pTagName, pTagSchema->name);
@ -609,7 +642,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm
STagVal val = {0};
int32_t code =
parseTagToken(&pStmt->pSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg);
parseTagToken(ppSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pTagVals, &val);
}
@ -812,7 +845,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseTagValue(pCxt, pStmt, pTagSchema, &token, pTagName, pTagVals, &pTag);
code = parseTagValue(pCxt, pStmt, &pStmt->pSql, pTagSchema, &token, pTagName, pTagVals, &pTag);
}
}
@ -867,7 +900,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
return code;
}
static int32_t storeTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
pStmt->pTableMeta->suid = pStmt->pTableMeta->uid;
pStmt->pTableMeta->uid = pStmt->totalTbNum;
pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE;
@ -1005,7 +1038,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet
return code;
}
static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) {
int32_t code = TSDB_CODE_SUCCESS;
SVgroupInfo vg;
bool exists = true;
@ -1041,7 +1074,6 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi
if (NULL != pStmt->pTableMeta) {
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) {
pStmt->stbSyntax = true;
tNameAssign(&pStmt->superTableName, &pStmt->targetTableName);
} else {
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
@ -1055,10 +1087,9 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
pStmt->stbSyntax = true;
tNameAssign(&pStmt->superTableName, &pStmt->targetTableName);
}
if (!pStmt->stbSyntax) {
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
}
}
}
@ -1122,7 +1153,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable);
}
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
}
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj);
@ -1141,7 +1172,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOp
code = getUsingTableSchema(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = storeTableMeta(pCxt, pStmt);
code = storeChildTableMeta(pCxt, pStmt);
}
return code;
}
@ -1220,12 +1251,12 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOp
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
// pStmt->pSql -> field1_name, ...)
return parseBoundColumns(pCxt, &pStmt->pSql, false, getTableColumnSchema(pStmt->pTableMeta),
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta,
&pTableCxt->boundColsInfo);
}
if (NULL != pStmt->pBoundCols) {
return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, getTableColumnSchema(pStmt->pTableMeta),
return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta,
&pTableCxt->boundColsInfo);
}
@ -1521,8 +1552,208 @@ static void clearColValArray(SArray* pCols) {
}
}
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) {
typedef struct SStbRowsDataContext {
SName stbName;
STableMeta* pStbMeta;
SNode* pTagCond;
SBoundColInfo boundColsInfo;
// the following fields are for each stb row
SArray* aTagVals;
SArray* aColVals;
SArray* aTagNames;
SName ctbName;
STag* pTag;
STableMeta* pCtbMeta;
SVCreateTbReq* pCreateCtbReq;
} SStbRowsDataContext;
typedef union SRowsDataContext{
STableDataCxt* pTableDataCxt;
SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext;
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
bool isJsonTag = false;
SArray* pTagName = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals;
bool bFoundTbName = false;
const char* pOrigSql = *ppSql;
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
const char* pTmpSql = *ppSql;
bool ignoreComma = false;
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
if (ignoreComma) {
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
break;
}
if (TK_NK_RP == pToken->type) {
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
break;
}
if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) {
SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
code = parseValueToken(pCxt, ppSql, pToken, pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal);
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag);
}
}
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
SColVal tbnameVal = COL_VAL_NONE(-1, TSDB_DATA_TYPE_BINARY);
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(),
getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal);
if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal)) {
tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname));
char ctbName[TSDB_TABLE_NAME_LEN];
memcpy(ctbName, tbnameVal.value.pData, tbnameVal.value.nData);
ctbName[tbnameVal.value.nData] = '\0';
tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, tbnameVal.value.nData);
bFoundTbName = true;
taosMemoryFreeClear(tbnameVal.value.pData);
}
}
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
NEXT_VALID_TOKEN(*ppSql, *pToken);
if (TK_NK_COMMA != pToken->type) {
code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
}
}
}
if (!bFoundTbName) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
}
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag);
}
if (code == TSDB_CODE_SUCCESS) {
*pGotRow = true;
}
return code;
}
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) {
int32_t code = TSDB_CODE_SUCCESS;
if (pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
}
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pStbRowsCxt->pCreateCtbReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid,
pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta),
TSDB_DEFAULT_TABLE_TTL);
pStbRowsCxt->pTag = NULL;
}
if (code == TSDB_CODE_SUCCESS) {
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
if (NULL != pCtbMeta) {
pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
} else {
SVgroupInfo vg;
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
.requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid,
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
STableMeta* pBackup = NULL;
cloneTableMeta(pStmt->pTableMeta, &pBackup);
taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
}
}
return code;
}
static void resetStbRowsDataContextPreStbRow(SStbRowsDataContext* pStbRowsCxt) {
pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
}
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
if (pStbRowsCxt == NULL) return;
taosArrayClear(pStbRowsCxt->aTagNames);
for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFreeClear(p->pData);
}
}
taosArrayClear(pStbRowsCxt->aTagVals);
clearColValArray(pStbRowsCxt->aColVals);
taosArrayClear(pStbRowsCxt->aColVals);
tTagFree(pStbRowsCxt->pTag);
pStbRowsCxt->pTag = NULL;
pStbRowsCxt->pCtbMeta->uid = 0;
pStbRowsCxt->pCtbMeta->vgId = 0;
tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
}
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
resetStbRowsDataContextPreStbRow(pStbRowsCxt);
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken);
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
return code;
}
if (code == TSDB_CODE_SUCCESS) {
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
}
STableDataCxt* pTableDataCxt = NULL;
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false);
initTableColSubmitData(pTableDataCxt);
if (code == TSDB_CODE_SUCCESS) {
SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1);
code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
}
}
if (code == TSDB_CODE_SUCCESS) {
*pGotRow = true;
}
clearStbRowsDataContext(pStbRowsCxt);
return TSDB_CODE_SUCCESS;
}
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
@ -1589,7 +1820,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
}
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
int32_t* pNumOfRows, SToken* pToken) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1604,7 +1835,11 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
bool gotRow = false;
if (TSDB_CODE_SUCCESS == code) {
code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken);
if (!pStmt->stbSyntax) {
code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
} else {
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken);
}
}
if (TSDB_CODE_SUCCESS == code) {
@ -1629,10 +1864,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
}
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext,
SToken* pToken) {
int32_t numOfRows = 0;
int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken);
int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken);
if (TSDB_CODE_SUCCESS == code) {
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
@ -1641,7 +1876,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
return code;
}
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
int32_t* pNumOfRows) {
int32_t code = TSDB_CODE_SUCCESS;
(*pNumOfRows) = 0;
@ -1664,8 +1899,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
SToken token;
strtolower(pLine, pLine);
const char* pRow = pLine;
code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token);
if (!pStmt->stbSyntax) {
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
} else {
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token);
}
if (code && firstLine) {
firstLine = false;
code = 0;
@ -1695,9 +1933,9 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
return code;
}
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
int32_t numOfRows = 0;
int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows);
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
if (TSDB_CODE_SUCCESS == code) {
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
@ -1712,7 +1950,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
}
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
STableDataCxt* pTableCxt) {
SRowsDataContext rowsDataCxt) {
char filePathStr[TSDB_FILENAME_LEN] = {0};
if (TK_NK_STRING == pFilePath->type) {
trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
@ -1724,10 +1962,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
return TAOS_SYSTEM_ERROR(errno);
}
return parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
}
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt,
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt,
SToken* pToken) {
if (tsUseAdapter) {
return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading");
@ -1737,34 +1975,120 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
}
return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt);
return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt);
}
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
SToken token;
NEXT_TOKEN(pStmt->pSql, token);
switch (token.type) {
case TK_VALUES:
return parseValuesClause(pCxt, pStmt, pTableCxt, &token);
return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
case TK_FILE:
return parseFileClause(pCxt, pStmt, pTableCxt, &token);
return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
default:
break;
}
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
if (pStbRowsCxt == NULL) return;
clearStbRowsDataContext(pStbRowsCxt);
taosArrayDestroy(pStbRowsCxt->aColVals);
pStbRowsCxt->aColVals = NULL;
taosArrayDestroy(pStbRowsCxt->aTagVals);
pStbRowsCxt->aTagVals = NULL;
taosArrayDestroy(pStbRowsCxt->aTagNames);
pStbRowsCxt->aTagNames = NULL;
insDestroyBoundColInfo(&pStbRowsCxt->boundColsInfo);
tTagFree(pStbRowsCxt->pTag);
pStbRowsCxt->pTag = NULL;
taosMemoryFreeClear(pStbRowsCxt->pCtbMeta);
tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq);
taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
}
static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext** ppStbRowsCxt) {
SStbRowsDataContext* pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
if (!pStbRowsCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName);
collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
pStbRowsCxt->pTagCond = pStmt->pTagCond;
pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta);
pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
// col values and bound cols info of STableDataContext is not used
pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo);
*ppStbRowsCxt = pStbRowsCxt;
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pStmt->pBoundCols) {
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql);
}
SStbRowsDataContext* pStbRowsCxt = NULL;
code = constructStbRowsDataContext(pStmt, &pStbRowsCxt);
if (code == TSDB_CODE_SUCCESS) {
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
&pStbRowsCxt->boundColsInfo);
pStmt->pStbRowsCxt = pStbRowsCxt;
}
if (code == TSDB_CODE_SUCCESS) {
SRowsDataContext rowsDataCxt;
rowsDataCxt.pStbRowsCxt = pStbRowsCxt;
code = parseDataClause(pCxt, pStmt, rowsDataCxt);
}
if (code != TSDB_CODE_SUCCESS) {
destroyStbRowsDataContext(pStbRowsCxt);
taosMemoryFreeClear(pStbRowsCxt);
}
return code;
}
// input pStmt->pSql:
// 1. [(tag1_name, ...)] ...
// 2. VALUES ... | FILE ...
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
STableDataCxt* pTableCxt = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
if (TSDB_CODE_SUCCESS == code) {
code = parseDataClause(pCxt, pStmt, pTableCxt);
if (pStmt->stbSyntax && TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
return buildSyntaxErrMsg(&pCxt->msg, "insert into super table syntax is not supported for stmt", NULL);
}
if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
SRowsDataContext rowsDataCxt;
rowsDataCxt.pTableDataCxt = pTableCxt;
if (TSDB_CODE_SUCCESS == code) {
code = parseDataClause(pCxt, pStmt, rowsDataCxt);
}
return code;
} else {
int32_t code = parseInsertStbClauseBottom(pCxt, pStmt);
return code;
}
return code;
}
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
@ -1780,38 +2104,18 @@ static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm
pStmt->usingTableProcessing = false;
pStmt->fileProcessing = false;
pStmt->usingTableName.type = 0;
destroyStbRowsDataContext(pStmt->pStbRowsCxt);
taosMemoryFreeClear(pStmt->pStbRowsCxt);
pStmt->stbSyntax = false;
}
static int32_t parseStbBoundColumnsClause(SInsertParseContext* pCxt, const char* pBoundCols,
STableMeta* pTableMeta, SBoundColInfo* pBoundColsInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
SBoundColInfo stbBoundColInfo;
insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &stbBoundColInfo);
if (!pStmt->pBoundCols) {
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname...) bounded cols is expected", pStmt->pSql);
}
SToken token;
int32_t index = 0;
parseStbBoundColumnsClause(pCxt, pStmt->pBoundCols, pStmt->pTableMeta, &stbBoundColInfo);
return code;
}
// input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ...
static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) {
resetEnvPreTable(pCxt, pStmt);
int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
if (!pStmt->stbSyntax) {
code = parseInsertTableClauseBottom(pCxt, pStmt);
} else {
code = parseInsertStbClauseBottom(pCxt, pStmt);
}
code = parseInsertTableClauseBottom(pCxt, pStmt);
}
return code;
@ -1929,9 +2233,11 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
}
pStmt->pSql = pCxt->pComCxt->pSql;
pStmt->freeHashFunc = insDestroyTableDataCxtHashMap;
pStmt->freeArrayFunc = insDestroyVgroupDataCxtList;
pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
if (!reentry) {
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pStmt->pTableBlockHashObj =
@ -2003,7 +2309,7 @@ static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMet
return pRes->code;
}
static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) {
if (1 != taosArrayGetSize(pTables)) {
return TSDB_CODE_FAILED;
}
@ -2059,10 +2365,10 @@ static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const S
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
}
if (TSDB_CODE_SUCCESS == code && isStb) {
code = storeTableMeta(pCxt, pStmt);
code = storeChildTableMeta(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
}
if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
@ -2101,10 +2407,12 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata
if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) {
pStmt->stbSyntax = true;
}
if (pStmt->usingTableProcessing || pStmt->stbSyntax) {
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
if (!pStmt->stbSyntax) {
if (pStmt->usingTableProcessing) {
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
}
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
}
return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
}
return code;
}
@ -2193,10 +2501,18 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp
}
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
STableDataCxt* pTableCxt = NULL;
int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
int32_t code = TSDB_CODE_SUCCESS;
SRowsDataContext rowsDataCxt;
if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL;
code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
rowsDataCxt.pTableDataCxt = pTableCxt;
} else {
rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
}
if (TSDB_CODE_SUCCESS == code) {
code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt);
}
if (TSDB_CODE_SUCCESS == code) {

View File

@ -170,6 +170,10 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
}
}
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) {
initColValues(pTableMeta, aColValues);
}
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound;
pInfo->numOfBound = numOfBound;

View File

@ -938,6 +938,7 @@ e
,,y,script,./test.sh -f tsim/insert/delete0.sim
,,y,script,./test.sh -f tsim/insert/update1_sort_merge.sim
,,y,script,./test.sh -f tsim/insert/update2.sim
,,y,script,./test.sh -f tsim/insert/insert_stb.sim
,,y,script,./test.sh -f tsim/parser/alter__for_community_version.sim
,,y,script,./test.sh -f tsim/parser/alter_column.sim
,,y,script,./test.sh -f tsim/parser/alter_stable.sim

View File

@ -0,0 +1,40 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database d1
sql create database d2
sql use d1;
sql create table st(ts timestamp, f int) tags(t int);
sql insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1);
sql insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2);
sql insert into ct1 values('2021-04-19 00:00:02', 2);
sql use d2;
sql create table st(ts timestamp, f int) tags(t int);
sql insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1);
sql insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2);
sql create database db1 vgroups 1;
sql create table db1.stb (ts timestamp, c1 int, c2 int) tags(t1 int, t2 int);
sql use d1;
sql insert into st (tbname, ts, f, t) values('ct3', '2021-04-19 08:00:03', 3, 3);
sql insert into d1.st (tbname, ts, f) values('ct6', '2021-04-19 08:00:04', 6);
sql insert into d1.st (tbname, ts, f) values('ct6', '2021-04-19 08:00:05', 7)('ct8', '2021-04-19 08:00:06', 8);
sql insert into d1.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:07', 9, 9)('ct8', '2021-04-19 08:00:08', 10, 10);
sql insert into d1.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:09', 9, 9)('ct8', '2021-04-19 08:00:10', 10, 10) d2.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:11', 9, 9)('ct8', '2021-04-19 08:00:12', 10, 10);
sql select * from d1.st
print $rows
if $rows != 11 then
return -1
endi
sql select * from d2.st;
print $rows
if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT