enhance: finished skeleton

This commit is contained in:
shenglian zhou 2023-10-24 16:08:08 +08:00
parent ae8d5a53e2
commit ae1bac251a
4 changed files with 150 additions and 42 deletions

View File

@ -432,8 +432,8 @@ typedef struct SVnodeModifyOpStmt {
SHashObj* pVgroupsHashObj; // SHashObj<vgId, SVgInfo>
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
SHashObj* pSubTableHashObj; // SHashObj<table_name, STableMeta*>
SHashObj* pTableNameHashObj; // set of table names for refreshing meta
SHashObj* pDbFNameHashObj; // set of db names for refreshing meta
SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode
SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode
SArray* pVgDataBlocks; // SArray<SVgroupDataCxt*>
SVCreateTbReq* pCreateTblReq;
TdFilePtr fp;

View File

@ -45,6 +45,7 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
SArray *tagName, uint8_t tagNum, int32_t ttl);
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode);

View File

@ -1553,20 +1553,20 @@ static void clearColValArray(SArray* pCols) {
}
typedef struct SStbRowsDataContext {
SVnodeModifyOpStmt* pStmt;
STableMeta* pStbMeta;
SName stbName;
STableMeta* pStbMeta;
SNode* pTagCond;
SBoundColInfo boundColsInfo;
int32_t numOfBoundCols;
int32_t numOfBoundTags;
SArray* aChildTableNames;
SArray* aTableDataCxts;
SArray* aCreateTbReqs;
// the following fields are for each stb row
SArray* aTagVals;
SArray* aColVals;
SArray* aTagNames;
SName ctbName;
STag* pTag;
STableMeta* pCtbMeta;
SVCreateTbReq* pCreateCtbReq;
} SStbRowsDataContext;
typedef union SRowsDataContext{
@ -1574,23 +1574,25 @@ typedef union SRowsDataContext{
SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext;
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
SToken* pToken) {
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
bool isJsonTag = false;
SArray* pTagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
SArray* pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal));
STag* pTag = NULL;
SArray* pTagName = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals;
bool bFoundTbName = false;
const char* pOrigSql = *ppSql;
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
const char* pOrigSql = *ppSql;
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
const char* pTmpSql = *ppSql;
bool ignoreComma = false;
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
if (ignoreComma) {
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pOrigSql);
code = buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pTmpSql);
break;
}
@ -1607,29 +1609,114 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pTag);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag);
}
}
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
SColVal tbnameVal;
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)tGetTbnameColumnSchema(),
getTableInfo(pStbRowsCxt->pStbMeta).precision, &tbnameVal);
if (code == TSDB_CODE_SUCCESS) {
char tbName[TSDB_TABLE_NAME_LEN];
memcpy(tbName, tbnameVal.value.pData, tbnameVal.value.nData);
tbName[tbnameVal.value.nData] = '\0';
if (code == TSDB_CODE_SUCCESS && COL_VAL_IS_VALUE(&tbnameVal)) {
tNameAssign(&pStbRowsCxt->ctbName, &pStbRowsCxt->stbName);
tNameAddTbName(&pStbRowsCxt->ctbName, tbnameVal.value.pData, tbnameVal.value.nData);
bFoundTbName = true;
}
}
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
NEXT_VALID_TOKEN(*ppSql, *pToken);
if (TK_NK_COMMA != pToken->type) {
code = buildSyntaxErrMsg(&pCxt->msg, ", expected", pToken->z);
}
}
}
if (!bFoundTbName) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
}
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag);
}
return code;
}
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken);
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
return code;
}
if (pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
}
if (code == TSDB_CODE_SUCCESS) {
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
if (NULL != pMeta) {
cloneTableMeta(*pMeta, &pStbRowsCxt->pCtbMeta);
} else {
SVgroupInfo vg;
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
.requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid,
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
cloneTableMeta(pStbRowsCxt->pStbMeta, &pStbRowsCxt->pCtbMeta);
pStbRowsCxt->pCtbMeta->suid = pStbRowsCxt->pStbMeta->uid;
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
pStbRowsCxt->pCtbMeta->tableType = TSDB_CHILD_TABLE;
STableMeta* pBackup = NULL;
cloneTableMeta(pStmt->pTableMeta, &pBackup);
taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
}
}
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pStbRowsCxt->pCreateCtbReq == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid,
pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta),
TSDB_DEFAULT_TABLE_TTL);
pStbRowsCxt->pTag = NULL;
}
STableDataCxt* pTableDataCxt = NULL;
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false);
initTableColSubmitData(pTableDataCxt);
if (code == TSDB_CODE_SUCCESS) {
SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1);
code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
}
}
if (code == TSDB_CODE_SUCCESS) {
*pGotRow = true;
}
taosArrayClear(pStbRowsCxt->aTagNames);
for (int i = 0; i < taosArrayGetSize(pStbRowsCxt->aTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pStbRowsCxt->aTagVals, i);
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFreeClear(p->pData);
}
}
taosArrayClear(pStbRowsCxt->aTagVals);
taosArrayClear(pStbRowsCxt->aColVals);
tTagFree(pStbRowsCxt->pTag);
taosMemoryFree(pStbRowsCxt->pCtbMeta);
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
taosMemoryFreeClear(pStmt->pCreateTblReq);
// child meta , vgroupid, check privilege
// refer to parInsertSml.c
// create or reuse table data context from tbName
@ -1637,11 +1724,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
// build create table request, refer to parseTagsClauseImpl
// build row and check table data order, refer to parseOneRow
if (TSDB_CODE_SUCCESS == code) {
*pGotRow = true;
}
return code;
// clear the values, etc
return TSDB_CODE_SUCCESS;
}
@ -1886,28 +1969,48 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
static void destroyStbRowsDataContext(SStbRowsDataContext* pStbRowsContext) {
taosArrayDestroy(pStbRowsContext->aColVals);
taosArrayDestroy(pStbRowsContext->aTagVals);
taosArrayDestroy(pStbRowsContext->aTagNames);
insDestroyBoundColInfo(&pStbRowsContext->boundColsInfo);
}
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
SBoundColInfo stbBoundColInfo;
insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &stbBoundColInfo);
if (!pStmt->pBoundCols) {
insDestroyBoundColInfo(&stbBoundColInfo);
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname...) bounded cols is expected", pStmt->pSql);
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql);
}
SToken token;
int32_t index = 0;
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &stbBoundColInfo);
pStmt->pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
SStbRowsDataContext* pStbRowsCxt;
pStbRowsCxt = taosMemoryCalloc(1, sizeof(SStbRowsDataContext));
if (!pStbRowsCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tNameAssign(&pStbRowsCxt->stbName, &pStmt->targetTableName);
collectUseTable(&pStbRowsCxt->stbName, pStmt->pTableNameHashObj);
collectUseDatabase(&pStbRowsCxt->stbName, pStmt->pDbFNameHashObj);
pStbRowsCxt->pTagCond = pStmt->pTagCond;
pStbRowsCxt->pStbMeta = pStmt->pTableMeta;
pStbRowsCxt->aTagNames = taosArrayInit(8, TSDB_COL_NAME_LEN);
pStbRowsCxt->aTagVals = taosArrayInit(8, sizeof(STagVal));
// fill SStbRowsDataCxt;
// col values and bound cols info of STableDataContext is not used TODO: remove the construction when createing table data context
pStbRowsCxt->aColVals = taosArrayInit(getNumOfColumns(pStbRowsCxt->pStbMeta), sizeof(SColVal));
insInitColValues(pStbRowsCxt->pStbMeta, pStbRowsCxt->aColVals);
STableComInfo tblInfo = getTableInfo(pStmt->pTableMeta);
insInitBoundColsInfo(tblInfo.numOfColumns + tblInfo.numOfTags + 1, &pStbRowsCxt->boundColsInfo);
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, &pStbRowsCxt->boundColsInfo);
pStmt->pStbRowsCxt = pStbRowsCxt;
SRowsDataContext rowsDataCxt;
rowsDataCxt.pStbRowsCxt = pStmt->pStbRowsCxt;
rowsDataCxt.pStbRowsCxt = pStbRowsCxt;
if (TSDB_CODE_SUCCESS == code) {
code = parseDataClause(pCxt, pStmt, rowsDataCxt);
}
insDestroyBoundColInfo(&stbBoundColInfo);
// destroyStbRowsDataContext; TODO: move it to resetEnvPreTable
return code;
}

View File

@ -170,6 +170,10 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
}
}
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) {
initColValues(pTableMeta, aColValues);
}
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound;
pInfo->numOfBound = numOfBound;