From ae1bac251a3af627e19c0b0f2894833689df1a50 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 24 Oct 2023 16:08:08 +0800 Subject: [PATCH] enhance: finished skeleton --- include/libs/nodes/querynodes.h | 4 +- source/libs/parser/inc/parInsertUtil.h | 1 + source/libs/parser/src/parInsertSql.c | 183 +++++++++++++++++++------ source/libs/parser/src/parInsertUtil.c | 4 + 4 files changed, 150 insertions(+), 42 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 7dd60cfe5b..21e50923a2 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -432,8 +432,8 @@ typedef struct SVnodeModifyOpStmt { SHashObj* pVgroupsHashObj; // SHashObj SHashObj* pTableBlockHashObj; // SHashObj SHashObj* pSubTableHashObj; // SHashObj - SHashObj* pTableNameHashObj; // set of table names for refreshing meta - SHashObj* pDbFNameHashObj; // set of db names for refreshing meta + SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode + SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode SArray* pVgDataBlocks; // SArray SVCreateTbReq* pCreateTblReq; TdFilePtr fp; diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 303d349b34..0a153225a3 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -45,6 +45,7 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, SArray *tagName, uint8_t tagNum, int32_t ttl); int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); +void insInitColValues(STableMeta* pTableMeta, SArray* aColValues); void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode); diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 46b8993d12..706621d154 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1553,20 +1553,20 @@ static void clearColValArray(SArray* pCols) { } typedef struct SStbRowsDataContext { - SVnodeModifyOpStmt* pStmt; - STableMeta* pStbMeta; SName stbName; + + STableMeta* pStbMeta; + SNode* pTagCond; SBoundColInfo boundColsInfo; - int32_t numOfBoundCols; - int32_t numOfBoundTags; - - SArray* aChildTableNames; - SArray* aTableDataCxts; - SArray* aCreateTbReqs; + // the following fields are for each stb row SArray* aTagVals; SArray* aColVals; SArray* aTagNames; + SName ctbName; + STag* pTag; + STableMeta* pCtbMeta; + SVCreateTbReq* pCreateCtbReq; } SStbRowsDataContext; typedef union SRowsDataContext{ @@ -1574,23 +1574,25 @@ typedef union SRowsDataContext{ SStbRowsDataContext* pStbRowsCxt; } SRowsDataContext; -static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, - SToken* pToken) { +static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); bool isJsonTag = false; - SArray* pTagName = taosArrayInit(8, TSDB_COL_NAME_LEN); - SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)); - STag* pTag = NULL; + SArray* pTagName = pStbRowsCxt->aTagNames; + SArray* pTagVals = pStbRowsCxt->aTagVals; + + bool bFoundTbName = false; + const char* pOrigSql = *ppSql; int32_t code = TSDB_CODE_SUCCESS; - for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) { - const char* pOrigSql = *ppSql; + for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) { + const char* pTmpSql = *ppSql; bool ignoreComma = false; NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma); if (ignoreComma) { - code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql); + code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql); break; } @@ -1607,29 +1609,114 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON; code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); - if (TSDB_CODE_SUCCESS == code) { - code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pTag); + if (code == TSDB_CODE_SUCCESS) { + code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag); } } else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { SColVal tbnameVal; code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(), getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal); - if (code == TSDB_CODE_SUCCESS) { - char tbName[TSDB_TABLE_NAME_LEN]; - memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData); - tbName[tbnameVal.value.nData] = '\0'; + if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal)) { + tNameAssign(&pStbRowsCxt->ctbName, &pStbRowsCxt->stbName); + tNameAddTbName(&pStbRowsCxt->ctbName, tbnameVal.value.pData, tbnameVal.value.nData); + bFoundTbName = true; } } - if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) { + if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) { NEXT_VALID_TOKEN(*ppSql, *pToken); if (TK_NK_COMMA != pToken->type) { code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z); } } } + if (!bFoundTbName) { + code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); + } + if (code == TSDB_CODE_SUCCESS && !isJsonTag) { + code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag); + } + return code; +} +static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { + int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken); + if (code != TSDB_CODE_SUCCESS || !*pGotRow) { + return code; + } + + if (pStbRowsCxt->pTagCond) { + code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond); + } + + if (code == TSDB_CODE_SUCCESS) { + + collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj); + + char ctbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); + STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName)); + if (NULL != pMeta) { + cloneTableMeta(*pMeta, &pStbRowsCxt->pCtbMeta); + } else { + SVgroupInfo vg; + SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, + .requestId = pCxt->pComCxt->requestId, + .requestObjRefId = pCxt->pComCxt->requestRid, + .mgmtEps = pCxt->pComCxt->mgmtEpSet}; + code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); + taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg)); + cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta); + pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid; + pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1; + pStbRowsCxt->pCtbMeta->vgId = vg.vgId; + pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE; + STableMeta* pBackup = NULL; + cloneTableMeta(pStmt->pTableMeta, &pBackup); + taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES); + } + } + + pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (pStbRowsCxt->pCreateCtbReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code == TSDB_CODE_SUCCESS) { + insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid, + pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta), + TSDB_DEFAULT_TABLE_TTL); + pStbRowsCxt->pTag = NULL; + } + + STableDataCxt* pTableDataCxt = NULL; + code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false); + initTableColSubmitData(pTableDataCxt); + if (code == TSDB_CODE_SUCCESS) { + SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1); + code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS == code) { + insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow)); + } + } + if (code == TSDB_CODE_SUCCESS) { + *pGotRow = true; + } + taosArrayClear(pStbRowsCxt->aTagNames); + for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) { + STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i); + if (IS_VAR_DATA_TYPE(p->type)) { + taosMemoryFreeClear(p->pData); + } + } + taosArrayClear(pStbRowsCxt->aTagVals); + taosArrayClear(pStbRowsCxt->aColVals); + tTagFree(pStbRowsCxt->pTag); + taosMemoryFree(pStbRowsCxt->pCtbMeta); + tdDestroySVCreateTbReq(pStmt->pCreateTblReq); + taosMemoryFreeClear(pStmt->pCreateTblReq); // child meta , vgroupid, check privilege // refer to parInsertSml.c // create or reuse table data context from tbName @@ -1637,11 +1724,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt // build create table request, refer to parseTagsClauseImpl // build row and check table data order, refer to parseOneRow - if (TSDB_CODE_SUCCESS == code) { - *pGotRow = true; - } - - return code; + // clear the values, etc return TSDB_CODE_SUCCESS; } @@ -1886,28 +1969,48 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); } +static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsContext) { + taosArrayDestroy(pStbRowsContext->aColVals); + taosArrayDestroy(pStbRowsContext->aTagVals); + taosArrayDestroy(pStbRowsContext->aTagNames); + insDestroyBoundColInfo(&pStbRowsContext->boundColsInfo); +} + static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; - STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta); - SBoundColInfo stbBoundColInfo; - insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &stbBoundColInfo); if (!pStmt->pBoundCols) { - insDestroyBoundColInfo(&stbBoundColInfo); - return buildSyntaxErrMsg(&pCxt->msg, "(...tbname...) bounded cols is expected", pStmt->pSql); + return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql); } - SToken token; - int32_t index = 0; - code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo); - pStmt->pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext)); + SStbRowsDataContext* pStbRowsCxt; + pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext)); + if (!pStbRowsCxt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName); + collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj); + collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj); + pStbRowsCxt->pTagCond = pStmt->pTagCond; + pStbRowsCxt->pStbMeta = pStmt->pTableMeta; + pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN); + pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal)); - // fill SStbRowsDataCxt; + // col values and bound cols info of STableDataContext is not used TODO: remove the construction when createing table data context + pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal)); + insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals); + + STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta); + insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo); + + code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &pStbRowsCxt->boundColsInfo); + + pStmt->pStbRowsCxt = pStbRowsCxt; SRowsDataContext rowsDataCxt; - rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt; + rowsDataCxt.pStbRowsCxt = pStbRowsCxt; if (TSDB_CODE_SUCCESS == code) { code = parseDataClause(pCxt, pStmt, rowsDataCxt); } - insDestroyBoundColInfo(&stbBoundColInfo); + // destroyStbRowsDataContext; TODO: move it to resetEnvPreTable return code; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 1c0b28f883..9791319503 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -170,6 +170,10 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) { } } +void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { + initColValues(pTableMeta, aColValues); +} + int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { pInfo->numOfCols = numOfBound; pInfo->numOfBound = numOfBound;