diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index a7a6c8cd6c..379f599786 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -413,7 +413,8 @@ typedef struct SVgDataBlocks { typedef void (*FFreeTableBlockHash)(SHashObj*); typedef void (*FFreeVgourpBlockArray)(SArray*); - +struct SStbRowsDataContext; +typedef void (*FFreeStbRowsDataContext)(struct SStbRowsDataContext*); typedef struct SVnodeModifyOpStmt { ENodeType nodeType; ENodeType sqlNodeType; @@ -428,11 +429,11 @@ typedef struct SVnodeModifyOpStmt { struct STableMeta* pTableMeta; SNode* pTagCond; SArray* pTableTag; - SHashObj* pVgroupsHashObj; + SHashObj* pVgroupsHashObj; // SHashObj SHashObj* pTableBlockHashObj; // SHashObj - SHashObj* pSubTableHashObj; - SHashObj* pTableNameHashObj; - SHashObj* pDbFNameHashObj; + SHashObj* pSubTableHashObj; // SHashObj + SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode + SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode SArray* pVgDataBlocks; // SArray SVCreateTbReq* pCreateTblReq; TdFilePtr fp; @@ -442,8 +443,8 @@ typedef struct SVnodeModifyOpStmt { bool fileProcessing; bool stbSyntax; - SName superTableName; - SName childTableName; + struct SStbRowsDataContext* pStbRowsCxt; + FFreeStbRowsDataContext freeStbRowsCxtFunc; } SVnodeModifyOpStmt; typedef struct SExplainOptions { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index c5a1bfa599..926c907ae4 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -879,6 +879,10 @@ void nodesDestroyNode(SNode* pNode) { } tdDestroySVCreateTbReq(pStmt->pCreateTblReq); taosMemoryFreeClear(pStmt->pCreateTblReq); + if (pStmt->freeStbRowsCxtFunc) { + pStmt->freeStbRowsCxtFunc(pStmt->pStbRowsCxt); + } + taosMemoryFreeClear(pStmt->pStbRowsCxt); taosCloseFile(&pStmt->fp); break; } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 303d349b34..0a153225a3 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -45,6 +45,7 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, SArray *tagName, uint8_t tagNum, int32_t ttl); int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); +void insInitColValues(STableMeta* pTableMeta, SArray* aColValues); void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1351a48a52..6e1c599df4 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -182,9 +182,32 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify return TSDB_CODE_SUCCESS; } +typedef enum { + BOUND_TAGS, + BOUND_COLUMNS, + BOUND_ALL_AND_TBNAME +} EBoundColumnsType; + +static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) { + return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; +} + // pStmt->pSql -> field1_name, ...) -static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SSchema* pSchema, +static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType, STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) { + SSchema* pSchema = NULL; + if (boundColsType == BOUND_TAGS) { + pSchema = getTableTagSchema(pTableMeta); + } else if (boundColsType == BOUND_COLUMNS) { + pSchema = getTableColumnSchema(pTableMeta); + } else { + pSchema = pTableMeta->schema; + if (pBoundInfo->numOfCols != getTbnameSchemaIndex(pTableMeta) + 1) { + return TSDB_CODE_PAR_INTERNAL_ERROR; + } + } + int32_t tbnameSchemaIndex = getTbnameSchemaIndex(pTableMeta); + bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); if (NULL == pUseCols) { return TSDB_CODE_OUT_OF_MEMORY; @@ -207,6 +230,14 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b token.z = tmpTokenBuf; token.n = strdequote(token.z); + if (boundColsType == BOUND_ALL_AND_TBNAME) { + if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) { + pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex; + pUseCols[tbnameSchemaIndex] = true; + ++pBoundInfo->numOfBound; + continue; + } + } int16_t t = lastColIdx + 1; int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); if (index < 0 && t > 0) { @@ -224,10 +255,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b } } - if (TSDB_CODE_SUCCESS == code && !isTags && !pUseCols[0]) { + if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) { code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); } - + if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) &&!pUseCols[tbnameSchemaIndex]) { + code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null"); + } taosMemoryFree(pUseCols); return code; @@ -586,10 +619,10 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStm } pStmt->pSql += index; - return parseBoundColumns(pCxt, &pStmt->pSql, true, getTableTagSchema(pStmt->pTableMeta), &pCxt->tags); + return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_TAGS, pStmt->pTableMeta, &pCxt->tags); } -static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken, +static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SSchema* pTagSchema, SToken* pToken, SArray* pTagName, SArray* pTagVals, STag** pTag) { if (!isNullValue(pTagSchema->type, pToken)) { taosArrayPush(pTagName, pTagSchema->name); @@ -609,7 +642,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm STagVal val = {0}; int32_t code = - parseTagToken(&pStmt->pSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg); + parseTagToken(ppSql, pToken, pTagSchema, pStmt->pTableMeta->tableInfo.precision, &val, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { taosArrayPush(pTagVals, &val); } @@ -812,7 +845,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON; code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg); if (TSDB_CODE_SUCCESS == code) { - code = parseTagValue(pCxt, pStmt, pTagSchema, &token, pTagName, pTagVals, &pTag); + code = parseTagValue(pCxt, pStmt, &pStmt->pSql, pTagSchema, &token, pTagName, pTagVals, &pTag); } } @@ -867,7 +900,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS return code; } -static int32_t storeTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { +static int32_t storeChildTableMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { pStmt->pTableMeta->suid = pStmt->pTableMeta->uid; pStmt->pTableMeta->uid = pStmt->totalTbNum; pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; @@ -1005,7 +1038,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet return code; } -static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) { +static int32_t getTargetTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool isStb, bool* pMissCache) { int32_t code = TSDB_CODE_SUCCESS; SVgroupInfo vg; bool exists = true; @@ -1041,7 +1074,6 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi if (NULL != pStmt->pTableMeta) { if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE) { pStmt->stbSyntax = true; - tNameAssign(&pStmt->superTableName, &pStmt->targetTableName); } else { code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } @@ -1055,10 +1087,9 @@ static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModi if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) { pStmt->stbSyntax = true; - tNameAssign(&pStmt->superTableName, &pStmt->targetTableName); } if (!pStmt->stbSyntax) { - code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); + code = getTargetTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache); } } } @@ -1122,7 +1153,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = getTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); + code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache); } if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { code = collectUseDatabase(&pStmt->usingTableName, pStmt->pDbFNameHashObj); @@ -1141,7 +1172,7 @@ static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOp code = getUsingTableSchema(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - code = storeTableMeta(pCxt, pStmt); + code = storeChildTableMeta(pCxt, pStmt); } return code; } @@ -1220,12 +1251,12 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOp return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } // pStmt->pSql -> field1_name, ...) - return parseBoundColumns(pCxt, &pStmt->pSql, false, getTableColumnSchema(pStmt->pTableMeta), + return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo); } if (NULL != pStmt->pBoundCols) { - return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, getTableColumnSchema(pStmt->pTableMeta), + return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo); } @@ -1521,8 +1552,208 @@ static void clearColValArray(SArray* pCols) { } } -static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, - SToken* pToken) { +typedef struct SStbRowsDataContext { + SName stbName; + + STableMeta* pStbMeta; + SNode* pTagCond; + SBoundColInfo boundColsInfo; + + // the following fields are for each stb row + SArray* aTagVals; + SArray* aColVals; + SArray* aTagNames; + SName ctbName; + STag* pTag; + STableMeta* pCtbMeta; + SVCreateTbReq* pCreateCtbReq; +} SStbRowsDataContext; + +typedef union SRowsDataContext{ + STableDataCxt* pTableDataCxt; + SStbRowsDataContext* pStbRowsCxt; +} SRowsDataContext; + +static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { + SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; + SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); + + bool isJsonTag = false; + SArray* pTagName = pStbRowsCxt->aTagNames; + SArray* pTagVals = pStbRowsCxt->aTagVals; + + bool bFoundTbName = false; + const char* pOrigSql = *ppSql; + + int32_t code = TSDB_CODE_SUCCESS; + for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) { + const char* pTmpSql = *ppSql; + bool ignoreComma = false; + NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma); + if (ignoreComma) { + code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql); + break; + } + + if (TK_NK_RP == pToken->type) { + code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); + break; + } + + if (pCols->pColIndex[i] < getNumOfColumns(pStbRowsCxt->pStbMeta)) { + SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; + SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]); + code = parseValueToken(pCxt, ppSql, pToken, pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal); + } else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { + SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; + isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON; + code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); + if (code == TSDB_CODE_SUCCESS) { + code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag); + } + } + else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { + SColVal tbnameVal = COL_VAL_NONE(-1, TSDB_DATA_TYPE_BINARY); + code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(), + getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal); + if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal)) { + tNameSetDbName(&pStbRowsCxt->ctbName, pStbRowsCxt->stbName.acctId, pStbRowsCxt->stbName.dbname, strlen(pStbRowsCxt->stbName.dbname)); + char ctbName[TSDB_TABLE_NAME_LEN]; + memcpy(ctbName, tbnameVal.value.pData, tbnameVal.value.nData); + ctbName[tbnameVal.value.nData] = '\0'; + tNameAddTbName(&pStbRowsCxt->ctbName, ctbName, tbnameVal.value.nData); + bFoundTbName = true; + taosMemoryFreeClear(tbnameVal.value.pData); + } + } + + if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) { + NEXT_VALID_TOKEN(*ppSql, *pToken); + if (TK_NK_COMMA != pToken->type) { + code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z); + } + } + } + if (!bFoundTbName) { + code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); + } + if (code == TSDB_CODE_SUCCESS && !isJsonTag) { + code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag); + } + if (code == TSDB_CODE_SUCCESS) { + *pGotRow = true; + } + return code; +} + +static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) { + int32_t code = TSDB_CODE_SUCCESS; + if (pStbRowsCxt->pTagCond) { + code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond); + } + + pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (pStbRowsCxt->pCreateCtbReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code == TSDB_CODE_SUCCESS) { + insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid, + pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta), + TSDB_DEFAULT_TABLE_TTL); + pStbRowsCxt->pTag = NULL; + } + + if (code == TSDB_CODE_SUCCESS) { + collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj); + + char ctbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); + STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName)); + if (NULL != pCtbMeta) { + pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid; + pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId; + } else { + SVgroupInfo vg; + SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, + .requestId = pCxt->pComCxt->requestId, + .requestObjRefId = pCxt->pComCxt->requestRid, + .mgmtEps = pCxt->pComCxt->mgmtEpSet}; + code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); + taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg)); + pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1; + pStbRowsCxt->pCtbMeta->vgId = vg.vgId; + STableMeta* pBackup = NULL; + cloneTableMeta(pStmt->pTableMeta, &pBackup); + taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES); + } + } + return code; +} + +static void resetStbRowsDataContextPreStbRow(SStbRowsDataContext* pStbRowsCxt) { + pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE; + pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid; + + insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals); +} + +static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { + if (pStbRowsCxt == NULL) return; + + taosArrayClear(pStbRowsCxt->aTagNames); + for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) { + STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i); + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFreeClear(p->pData); + } + } + taosArrayClear(pStbRowsCxt->aTagVals); + + clearColValArray(pStbRowsCxt->aColVals); + taosArrayClear(pStbRowsCxt->aColVals); + + tTagFree(pStbRowsCxt->pTag); + pStbRowsCxt->pTag = NULL; + pStbRowsCxt->pCtbMeta->uid = 0; + pStbRowsCxt->pCtbMeta->vgId = 0; + tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq); + taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq); +} + +static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { + resetStbRowsDataContextPreStbRow(pStbRowsCxt); + int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken); + if (code != TSDB_CODE_SUCCESS || !*pGotRow) { + return code; + } + + if (code == TSDB_CODE_SUCCESS) { + code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); + } + + STableDataCxt* pTableDataCxt = NULL; + code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false); + initTableColSubmitData(pTableDataCxt); + if (code == TSDB_CODE_SUCCESS) { + SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1); + code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS == code) { + insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow)); + } + } + if (code == TSDB_CODE_SUCCESS) { + *pGotRow = true; + } + + clearStbRowsDataContext(pStbRowsCxt); + + return TSDB_CODE_SUCCESS; +} + +static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) { SBoundColInfo* pCols = &pTableCxt->boundColsInfo; bool isParseBindParam = false; SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); @@ -1589,7 +1820,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC } // pSql -> (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, int32_t* pNumOfRows, SToken* pToken) { int32_t code = TSDB_CODE_SUCCESS; @@ -1604,7 +1835,11 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool gotRow = false; if (TSDB_CODE_SUCCESS == code) { - code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken); + if (!pStmt->stbSyntax) { + code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken); + } else { + code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken); + } } if (TSDB_CODE_SUCCESS == code) { @@ -1629,10 +1864,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, } // VALUES (field1_value, ...) [(field1_value2, ...) ...] -static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataContext, SToken* pToken) { int32_t numOfRows = 0; - int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken); + int32_t code = parseValues(pCxt, pStmt, rowsDataContext, &numOfRows, pToken); if (TSDB_CODE_SUCCESS == code) { pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; @@ -1641,7 +1876,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return code; } -static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, int32_t* pNumOfRows) { int32_t code = TSDB_CODE_SUCCESS; (*pNumOfRows) = 0; @@ -1664,8 +1899,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt SToken token; strtolower(pLine, pLine); const char* pRow = pLine; - - code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token); + if (!pStmt->stbSyntax) { + code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token); + } else { + code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token); + } if (code && firstLine) { firstLine = false; code = 0; @@ -1695,9 +1933,9 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt return code; } -static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) { +static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { int32_t numOfRows = 0; - int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows); + int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows); if (TSDB_CODE_SUCCESS == code) { pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; @@ -1712,7 +1950,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt } static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath, - STableDataCxt* pTableCxt) { + SRowsDataContext rowsDataCxt) { char filePathStr[TSDB_FILENAME_LEN] = {0}; if (TK_NK_STRING == pFilePath->type) { trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr)); @@ -1724,10 +1962,10 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* return TAOS_SYSTEM_ERROR(errno); } - return parseDataFromFileImpl(pCxt, pStmt, pTableCxt); + return parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt); } -static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt, +static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt, SToken* pToken) { if (tsUseAdapter) { return buildInvalidOperationMsg(&pCxt->msg, "proxy mode does not support csv loading"); @@ -1737,34 +1975,120 @@ static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z); } - return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt); + return parseDataFromFile(pCxt, pStmt, pToken, rowsDataCxt); } // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path -static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) { +static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { SToken token; NEXT_TOKEN(pStmt->pSql, token); switch (token.type) { case TK_VALUES: - return parseValuesClause(pCxt, pStmt, pTableCxt, &token); + return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token); case TK_FILE: - return parseFileClause(pCxt, pStmt, pTableCxt, &token); + return parseFileClause(pCxt, pStmt, rowsDataCxt, &token); default: break; } return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } +static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { + if (pStbRowsCxt == NULL) return; + clearStbRowsDataContext(pStbRowsCxt); + taosArrayDestroy(pStbRowsCxt->aColVals); + pStbRowsCxt->aColVals = NULL; + taosArrayDestroy(pStbRowsCxt->aTagVals); + pStbRowsCxt->aTagVals = NULL; + taosArrayDestroy(pStbRowsCxt->aTagNames); + pStbRowsCxt->aTagNames = NULL; + insDestroyBoundColInfo(&pStbRowsCxt->boundColsInfo); + tTagFree(pStbRowsCxt->pTag); + pStbRowsCxt->pTag = NULL; + taosMemoryFreeClear(pStbRowsCxt->pCtbMeta); + tdDestroySVCreateTbReq(pStbRowsCxt->pCreateCtbReq); + taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq); +} + +static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext** ppStbRowsCxt) { + SStbRowsDataContext* pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext)); + if (!pStbRowsCxt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName); + collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj); + collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj); + + pStbRowsCxt->pTagCond = pStmt->pTagCond; + pStbRowsCxt->pStbMeta = pStmt->pTableMeta; + + cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta); + pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE; + pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid; + + pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN); + pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal)); + + // col values and bound cols info of STableDataContext is not used + pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal)); + insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals); + + STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta); + insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo); + + *ppStbRowsCxt = pStbRowsCxt; + return TSDB_CODE_SUCCESS; +} + +static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + if (!pStmt->pBoundCols) { + return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql); + } + + SStbRowsDataContext* pStbRowsCxt = NULL; + code = constructStbRowsDataContext(pStmt, &pStbRowsCxt); + + if (code == TSDB_CODE_SUCCESS) { + code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, + &pStbRowsCxt->boundColsInfo); + pStmt->pStbRowsCxt = pStbRowsCxt; + } + + if (code == TSDB_CODE_SUCCESS) { + SRowsDataContext rowsDataCxt; + rowsDataCxt.pStbRowsCxt = pStbRowsCxt; + code = parseDataClause(pCxt, pStmt, rowsDataCxt); + } + + if (code != TSDB_CODE_SUCCESS) { + destroyStbRowsDataContext(pStbRowsCxt); + taosMemoryFreeClear(pStbRowsCxt); + } + + return code; +} + // input pStmt->pSql: // 1. [(tag1_name, ...)] ... // 2. VALUES ... | FILE ... static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { - STableDataCxt* pTableCxt = NULL; - int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt); - if (TSDB_CODE_SUCCESS == code) { - code = parseDataClause(pCxt, pStmt, pTableCxt); + if (pStmt->stbSyntax && TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { + return buildSyntaxErrMsg(&pCxt->msg, "insert into super table syntax is not supported for stmt", NULL); + } + if (!pStmt->stbSyntax) { + STableDataCxt* pTableCxt = NULL; + int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt); + SRowsDataContext rowsDataCxt; + rowsDataCxt.pTableDataCxt = pTableCxt; + if (TSDB_CODE_SUCCESS == code) { + code = parseDataClause(pCxt, pStmt, rowsDataCxt); + } + return code; + } else { + int32_t code = parseInsertStbClauseBottom(pCxt, pStmt); + return code; } - return code; } static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { @@ -1780,38 +2104,18 @@ static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm pStmt->usingTableProcessing = false; pStmt->fileProcessing = false; pStmt->usingTableName.type = 0; + + destroyStbRowsDataContext(pStmt->pStbRowsCxt); + taosMemoryFreeClear(pStmt->pStbRowsCxt); pStmt->stbSyntax = false; } -static int32_t parseStbBoundColumnsClause(SInsertParseContext* pCxt, const char* pBoundCols, - STableMeta* pTableMeta, SBoundColInfo* pBoundColsInfo) { - return TSDB_CODE_SUCCESS; -} - -static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { - int32_t code = TSDB_CODE_SUCCESS; - STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta); - SBoundColInfo stbBoundColInfo; - insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &stbBoundColInfo); - if (!pStmt->pBoundCols) { - return buildSyntaxErrMsg(&pCxt->msg, "(...tbname...) bounded cols is expected", pStmt->pSql); - } - SToken token; - int32_t index = 0; - parseStbBoundColumnsClause(pCxt, pStmt->pBoundCols, pStmt->pTableMeta, &stbBoundColInfo); - return code; -} - // input pStmt->pSql: [(field1_name, ...)] [ USING ... ] VALUES ... | FILE ... static int32_t parseInsertTableClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pTbName) { resetEnvPreTable(pCxt, pStmt); int32_t code = parseSchemaClauseTop(pCxt, pStmt, pTbName); if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { - if (!pStmt->stbSyntax) { - code = parseInsertTableClauseBottom(pCxt, pStmt); - } else { - code = parseInsertStbClauseBottom(pCxt, pStmt); - } + code = parseInsertTableClauseBottom(pCxt, pStmt); } return code; @@ -1929,9 +2233,11 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT); } pStmt->pSql = pCxt->pComCxt->pSql; + pStmt->freeHashFunc = insDestroyTableDataCxtHashMap; pStmt->freeArrayFunc = insDestroyVgroupDataCxtList; - + pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext; + if (!reentry) { pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pStmt->pTableBlockHashObj = @@ -2003,7 +2309,7 @@ static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMet return pRes->code; } -static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) { +static int32_t addTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpStmt* pStmt, bool isStb) { if (1 != taosArrayGetSize(pTables)) { return TSDB_CODE_FAILED; } @@ -2059,10 +2365,10 @@ static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const S code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } if (TSDB_CODE_SUCCESS == code && isStb) { - code = storeTableMeta(pCxt, pStmt); + code = storeChildTableMeta(pCxt, pStmt); } if (TSDB_CODE_SUCCESS == code) { - code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); + code = addTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb); } if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) { code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt); @@ -2101,10 +2407,12 @@ static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCata if (pStmt->pTableMeta->tableType == TSDB_SUPER_TABLE && !pStmt->usingTableProcessing) { pStmt->stbSyntax = true; } - if (pStmt->usingTableProcessing || pStmt->stbSyntax) { - return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); + if (!pStmt->stbSyntax) { + if (pStmt->usingTableProcessing) { + return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true); + } + return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false); } - return processTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false); } return code; } @@ -2193,10 +2501,18 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp } static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { - STableDataCxt* pTableCxt = NULL; - int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt); + int32_t code = TSDB_CODE_SUCCESS; + SRowsDataContext rowsDataCxt; + + if (!pStmt->stbSyntax) { + STableDataCxt* pTableCxt = NULL; + code = getTableDataCxt(pCxt, pStmt, &pTableCxt); + rowsDataCxt.pTableDataCxt = pTableCxt; + } else { + rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt; + } if (TSDB_CODE_SUCCESS == code) { - code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt); + code = parseDataFromFileImpl(pCxt, pStmt, rowsDataCxt); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 1c0b28f883..9791319503 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -170,6 +170,10 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) { } } +void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { + initColValues(pTableMeta, aColValues); +} + int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { pInfo->numOfCols = numOfBound; pInfo->numOfBound = numOfBound; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6915f802c3..476322ab04 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -938,6 +938,7 @@ e ,,y,script,./test.sh -f tsim/insert/delete0.sim ,,y,script,./test.sh -f tsim/insert/update1_sort_merge.sim ,,y,script,./test.sh -f tsim/insert/update2.sim +,,y,script,./test.sh -f tsim/insert/insert_stb.sim ,,y,script,./test.sh -f tsim/parser/alter__for_community_version.sim ,,y,script,./test.sh -f tsim/parser/alter_column.sim ,,y,script,./test.sh -f tsim/parser/alter_stable.sim diff --git a/tests/script/tsim/insert/insert_stb.sim b/tests/script/tsim/insert/insert_stb.sim new file mode 100644 index 0000000000..cee209f4e1 --- /dev/null +++ b/tests/script/tsim/insert/insert_stb.sim @@ -0,0 +1,40 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database d1 +sql create database d2 + +sql use d1; +sql create table st(ts timestamp, f int) tags(t int); +sql insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1); +sql insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2); +sql insert into ct1 values('2021-04-19 00:00:02', 2); + +sql use d2; +sql create table st(ts timestamp, f int) tags(t int); +sql insert into ct1 using st tags(1) values('2021-04-19 00:00:00', 1); +sql insert into ct2 using st tags(2) values('2021-04-19 00:00:01', 2); + +sql create database db1 vgroups 1; +sql create table db1.stb (ts timestamp, c1 int, c2 int) tags(t1 int, t2 int); + +sql use d1; +sql insert into st (tbname, ts, f, t) values('ct3', '2021-04-19 08:00:03', 3, 3); +sql insert into d1.st (tbname, ts, f) values('ct6', '2021-04-19 08:00:04', 6); +sql insert into d1.st (tbname, ts, f) values('ct6', '2021-04-19 08:00:05', 7)('ct8', '2021-04-19 08:00:06', 8); +sql insert into d1.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:07', 9, 9)('ct8', '2021-04-19 08:00:08', 10, 10); +sql insert into d1.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:09', 9, 9)('ct8', '2021-04-19 08:00:10', 10, 10) d2.st (tbname, ts, f, t) values('ct6', '2021-04-19 08:00:11', 9, 9)('ct8', '2021-04-19 08:00:12', 10, 10); + +sql select * from d1.st +print $rows +if $rows != 11 then + return -1 +endi +sql select * from d2.st; +print $rows +if $rows != 4 then + return -1 +endi +system sh/exec.sh -n dnode1 -s stop -x SIGINT