Merge pull request #29054 from taosdata/enh/TD-32937

enh:[TD-32937]stmt2 support tbname bind in cols
This commit is contained in:
Shengliang Guan 2024-12-07 18:54:08 +08:00 committed by GitHub
commit 50087851f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 111 additions and 34 deletions

View File

@ -194,6 +194,7 @@ typedef struct SBoundColInfo {
int32_t numOfCols; int32_t numOfCols;
int32_t numOfBound; int32_t numOfBound;
bool hasBoundCols; bool hasBoundCols;
bool mixTagsCols;
} SBoundColInfo; } SBoundColInfo;
typedef struct STableColsData { typedef struct STableColsData {

View File

@ -1094,6 +1094,13 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL
} }
STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields)); STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
pStmt->bInfo.needParse = true;
if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -247,8 +247,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
return code; return code;
} }
static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts, int64_t* interval, static int32_t parseTimestampOrInterval(const char** end, SToken* pToken, int16_t timePrec, int64_t* ts,
SMsgBuf* pMsgBuf, bool* isTs) { int64_t* interval, SMsgBuf* pMsgBuf, bool* isTs) {
if (pToken->type == TK_NOW) { if (pToken->type == TK_NOW) {
*isTs = true; *isTs = true;
*ts = taosGetTimestamp(timePrec); *ts = taosGetTimestamp(timePrec);
@ -1387,7 +1387,16 @@ static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
} }
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(&pStmt->targetTableName, tbFName); int32_t code = 0;
if (pCxt->preCtbname) {
tstrncpy(pStmt->targetTableName.tname, pStmt->usingTableName.tname, sizeof(pStmt->targetTableName.tname));
tstrncpy(pStmt->targetTableName.dbname, pStmt->usingTableName.dbname, sizeof(pStmt->targetTableName.dbname));
pStmt->targetTableName.type = TSDB_SUPER_TABLE;
pStmt->pTableMeta->tableType = TSDB_SUPER_TABLE;
}
code = tNameExtractFullName(&pStmt->targetTableName, tbFName);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
@ -1812,8 +1821,10 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
SArray* pTagVals = pStbRowsCxt->aTagVals; SArray* pTagVals = pStbRowsCxt->aTagVals;
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag; bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta); int32_t numOfCols = getNumOfColumns(pStbRowsCxt->pStbMeta);
int32_t numOfTags = getNumOfTags(pStbRowsCxt->pStbMeta);
int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta); int32_t tbnameIdx = getTbnameSchemaIndex(pStbRowsCxt->pStbMeta);
uint8_t precision = getTableInfo(pStbRowsCxt->pStbMeta).precision; uint8_t precision = getTableInfo(pStbRowsCxt->pStbMeta).precision;
int idx = 0;
for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) { for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
const char* pTmpSql = *ppSql; const char* pTmpSql = *ppSql;
bool ignoreComma = false; bool ignoreComma = false;
@ -1831,13 +1842,37 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
if (TK_NK_QUESTION == pToken->type) { if (TK_NK_QUESTION == pToken->type) {
pCxt->isStmtBind = true; pCxt->isStmtBind = true;
pStmt->usingTableProcessing = true;
if (pCols->pColIndex[i] == tbnameIdx) { if (pCols->pColIndex[i] == tbnameIdx) {
pCxt->preCtbname = false; char* tbName = NULL;
if ((*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName) == TSDB_CODE_SUCCESS) {
tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
pStmt->usingTableName.type = 1;
*bFoundTbName = true; *bFoundTbName = true;
} }
if (NULL == pCxt->pComCxt->pStmtCb) { } else if (pCols->pColIndex[i] < numOfCols) {
code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", pToken->z); // bind column
break; } else if (pCols->pColIndex[i] < tbnameIdx) {
if (pCxt->tags.pColIndex == NULL) {
pCxt->tags.pColIndex = taosMemoryCalloc(numOfTags, sizeof(int16_t));
if (NULL == pCxt->tags.pColIndex) {
return terrno;
}
}
if (!(idx < numOfTags)) {
return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
}
pCxt->tags.pColIndex[idx++] = pCols->pColIndex[i] - numOfCols;
pCxt->tags.mixTagsCols = true;
pCxt->tags.numOfBound++;
pCxt->tags.numOfCols++;
} else {
return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfBound");
} }
} else { } else {
if (pCols->pColIndex[i] < numOfCols) { if (pCols->pColIndex[i] < numOfCols) {
@ -1882,6 +1917,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
} }
} }
} }
return code; return code;
} }
@ -1901,14 +1937,18 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens, tagSchemas, code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens, tagSchemas,
&numOfTagTokens, &bFoundTbName); &numOfTagTokens, &bFoundTbName);
if (code == TSDB_CODE_SUCCESS && !bFoundTbName) { if (code != TSDB_CODE_SUCCESS) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); return code;
} }
if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->ctbName.tname[0] == '\0') { if (!bFoundTbName) {
if (!pCxt->isStmtBind) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
} else {
*pGotRow = true; *pGotRow = true;
return TSDB_CODE_TSC_STMT_TBNAME_ERROR; return TSDB_CODE_TSC_STMT_TBNAME_ERROR;
} }
}
bool ctbFirst = true; bool ctbFirst = true;
char ctbFName[TSDB_TABLE_FNAME_LEN]; char ctbFName[TSDB_TABLE_FNAME_LEN];
@ -2050,14 +2090,24 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pCxt->isStmtBind) {
char ctbFName[TSDB_TABLE_FNAME_LEN];
code = tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, ctbFName, strlen(ctbFName), pStbRowsCxt->pCtbMeta,
&pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, true, true);
} else {
code = code =
insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
} }
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = initTableColSubmitData(*ppTableDataCxt); code = initTableColSubmitData(*ppTableDataCxt);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS && !pCxt->isStmtBind) {
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1); SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow); code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -941,7 +941,7 @@ int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSc
int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_STB** fields, int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_STB** fields,
STableMeta* pMeta, void* boundTags, bool preCtbname) { STableMeta* pMeta, void* boundTags, bool preCtbname) {
SBoundColInfo* tags = (SBoundColInfo*)boundTags; SBoundColInfo* tags = (SBoundColInfo*)boundTags;
int32_t numOfBound = boundColsInfo.numOfBound + tags->numOfBound + (preCtbname ? 1 : 0); int32_t numOfBound = boundColsInfo.numOfBound + (tags->mixTagsCols ? 0 : tags->numOfBound) + (preCtbname ? 1 : 0);
int32_t idx = 0; int32_t idx = 0;
if (fields != NULL) { if (fields != NULL) {
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_STB)); *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_STB));
@ -957,13 +957,13 @@ int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32
idx++; idx++;
} }
if (tags->numOfBound > 0) { if (tags->numOfBound > 0 && !tags->mixTagsCols) {
SSchema* pSchema = getTableTagSchema(pMeta); SSchema* tagSchema = getTableTagSchema(pMeta);
for (int32_t i = 0; i < tags->numOfBound; ++i) { for (int32_t i = 0; i < tags->numOfBound; ++i) {
(*fields)[idx].field_type = TAOS_FIELD_TAG; (*fields)[idx].field_type = TAOS_FIELD_TAG;
SSchema* schema = &pSchema[tags->pColIndex[i]]; SSchema* schema = &tagSchema[tags->pColIndex[i]];
tstrncpy((*fields)[idx].name, schema->name, sizeof((*fields)[i].name)); tstrncpy((*fields)[idx].name, schema->name, sizeof((*fields)[i].name));
(*fields)[idx].type = schema->type; (*fields)[idx].type = schema->type;
(*fields)[idx].bytes = schema->bytes; (*fields)[idx].bytes = schema->bytes;

View File

@ -191,6 +191,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound; pInfo->numOfCols = numOfBound;
pInfo->numOfBound = numOfBound; pInfo->numOfBound = numOfBound;
pInfo->hasBoundCols = false; pInfo->hasBoundCols = false;
pInfo->mixTagsCols = false;
pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t)); pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
if (NULL == pInfo->pColIndex) { if (NULL == pInfo->pColIndex) {
return terrno; return terrno;
@ -204,6 +205,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
void insResetBoundColsInfo(SBoundColInfo* pInfo) { void insResetBoundColsInfo(SBoundColInfo* pInfo) {
pInfo->numOfBound = pInfo->numOfCols; pInfo->numOfBound = pInfo->numOfCols;
pInfo->hasBoundCols = false; pInfo->hasBoundCols = false;
pInfo->mixTagsCols = false;
for (int32_t i = 0; i < pInfo->numOfCols; ++i) { for (int32_t i = 0; i < pInfo->numOfCols; ++i) {
pInfo->pColIndex[i] = i; pInfo->pColIndex[i] = i;
} }
@ -739,7 +741,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool
STableDataCxt* pTableCxt = *(STableDataCxt**)p; STableDataCxt* pTableCxt = *(STableDataCxt**)p;
if (colFormat) { if (colFormat) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
if (pCol->nVal <= 0) { if (pCol && pCol->nVal <= 0) {
p = taosHashIterate(pTableHash, p); p = taosHashIterate(pTableHash, p);
continue; continue;
} }

View File

@ -33,18 +33,23 @@ void do_stmt(TAOS* taos) {
char* tbs[2] = {"tb", "tb2"}; char* tbs[2] = {"tb", "tb2"};
int t1_val[2] = {0, 1}; int t1_val[2] = {0, 1};
int t2_len[2] = {3, 3}; int t2_len[2] = {10, 10};
TAOS_STMT2_BIND tags[2][2] = {{{0, &t1_val[0], NULL, NULL, 0}, {0, "a1", &t2_len[0], NULL, 0}}, int t3_len[2] = {sizeof(int), sizeof(int)};
{{0, &t1_val[1], NULL, NULL, 0}, {0, "a2", &t2_len[1], NULL, 0}}}; TAOS_STMT2_BIND tags[2][2] = {{{0, &t1_val[0], &t3_len[0], NULL, 0}, {0, "after1", &t2_len[0], NULL, 0}},
{{0, &t1_val[1], &t3_len[1], NULL, 0}, {0, "after2", &t2_len[1], NULL, 0}}};
TAOS_STMT2_BIND params[2][2] = { TAOS_STMT2_BIND params[2][2] = {
{{TSDB_DATA_TYPE_TIMESTAMP, v.ts, NULL, is_null, 2}, {TSDB_DATA_TYPE_BINARY, v.b, b_len, is_null2, 2}}, {{TSDB_DATA_TYPE_TIMESTAMP, v.ts, t64_len, is_null, 2}, {TSDB_DATA_TYPE_BINARY, v.b, b_len, NULL, 2}},
{{TSDB_DATA_TYPE_TIMESTAMP, v.ts, NULL, is_null, 2}, {TSDB_DATA_TYPE_BINARY, v.b, b_len, is_null2, 2}}}; {{TSDB_DATA_TYPE_TIMESTAMP, v.ts, t64_len, is_null, 2}, {TSDB_DATA_TYPE_BINARY, v.b, b_len, NULL, 2}}};
TAOS_STMT2_BIND* tagv[2] = {&tags[0][0], &tags[1][0]}; TAOS_STMT2_BIND* tagv[2] = {&tags[0][0], &tags[1][0]};
TAOS_STMT2_BIND* paramv[2] = {&params[0][0], &params[1][0]}; TAOS_STMT2_BIND* paramv[2] = {&params[0][0], &params[1][0]};
TAOS_STMT2_BINDV bindv = {2, &tbs[0], &tagv[0], &paramv[0]}; TAOS_STMT2_BINDV bindv = {2, &tbs[0], &tagv[0], &paramv[0]};
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option); TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
const char* sql = "insert into db.? using db.stb tags(?, ?) values(?,?)";
// Equivalent to :
// const char* sql = "insert into db.? using db.stb tags(?, ?) values(?,?)";
const char* sql = "insert into db.stb(tbname,ts,b,t1,t2) values(?,?,?,?,?)";
int code = taos_stmt2_prepare(stmt, sql, 0); int code = taos_stmt2_prepare(stmt, sql, 0);
if (code != 0) { if (code != 0) {
printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt)); printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt));
@ -52,11 +57,23 @@ void do_stmt(TAOS* taos) {
return; return;
} }
int fieldNum = 0;
TAOS_FIELD_STB* pFields = NULL;
code = taos_stmt2_get_stb_fields(stmt, &fieldNum, &pFields);
if (code != 0) {
printf("failed get col,ErrCode: 0x%x, ErrMessage: %s.\n", code, taos_stmt2_error(stmt));
} else {
printf("col nums:%d\n", fieldNum);
for (int i = 0; i < fieldNum; i++) {
printf("field[%d]: %s, data_type:%d, field_type:%d\n", i, pFields[i].name, pFields[i].type,
pFields[i].field_type);
}
}
int64_t ts = 1591060628000; int64_t ts = 1591060628000;
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
// v.ts[i] = ts++; v.ts[i] = ts++;
v.ts[i] = ts; t64_len[i] = sizeof(int64_t);
// t64_len[i] = sizeof(int64_t);
} }
strcpy(v.b, "abcdefg"); strcpy(v.b, "abcdefg");
b_len[0] = (int)strlen(v.b); b_len[0] = (int)strlen(v.b);