morefix: not allow duplicate table names in stmt2 bind with interlace mode

This commit is contained in:
Haolin Wang 2024-12-16 15:50:54 +08:00
parent 074c3e7eb2
commit 02bd68be22
5 changed files with 78 additions and 41 deletions

View File

@ -378,7 +378,7 @@ typedef struct {
TAOS_MULTI_BIND *bind;
} SBindInfo;
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray, bool *orderedDup);
SArray *rowArray, bool *pOrdered, bool *pDupTs);
// stmt2 binding
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
@ -392,7 +392,7 @@ typedef struct {
} SBindInfo2;
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray, bool *orderedDup);
SArray *rowArray, bool *pOrdered, bool *pDupTs);
#endif

View File

@ -2166,19 +2166,38 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
pStmt->semWaited = true;
}
int32_t code = 0;
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
if (NULL == hashTbnames) {
tscError("stmt2 bind failed: %s", tstrerror(terrno));
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < bindv->count; ++i) {
if (bindv->tbnames && bindv->tbnames[i]) {
if (pStmt->sql.stbInterlaceMode) {
if (tSimpleHashGet(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i])) != NULL) {
code = terrno = TSDB_CODE_PAR_TBNAME_DUPLICATED;
tscError("stmt2 bind failed: %s %s", tstrerror(terrno), bindv->tbnames[i]);
goto out;
}
code = tSimpleHashPut(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i]), NULL, 0);
if (code) {
goto out;
}
}
code = stmtSetTbName2(stmt, bindv->tbnames[i]);
if (code) {
return code;
goto out;
}
}
if (bindv->tags && bindv->tags[i]) {
code = stmtSetTbTags2(stmt, bindv->tags[i]);
if (code) {
return code;
goto out;
}
}
@ -2187,26 +2206,29 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
return terrno;
code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
goto out;
}
int32_t insert = 0;
(void)stmtIsInsert2(stmt, &insert);
if (0 == insert && bind->num > 1) {
tscError("only one row data allowed for query");
terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
return terrno;
code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
goto out;
}
code = stmtBindBatch2(stmt, bind, col_idx);
if (TSDB_CODE_SUCCESS != code) {
return code;
goto out;
}
}
}
return TSDB_CODE_SUCCESS;
out:
tSimpleHashCleanup(hashTbnames);
return code;
}
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {

View File

@ -449,10 +449,11 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
* `pOrdered` is the pointer to store ordered
* `pDupTs` is the pointer to store duplicateTs
*/
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray, bool *orderedDup) {
SArray *rowArray, bool *pOrdered, bool *pDupTs) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -510,19 +511,17 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
goto _exit;
}
if (orderedDup) {
if (pOrdered && pDupTs) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
*pOrdered = true;
*pDupTs = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
if (*pOrdered && !*pDupTs) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
*pOrdered = (code >= 0);
*pDupTs = (code == 0);
}
}
lastRowKey = rowKey;
@ -3255,10 +3254,11 @@ _exit:
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `orderedDup` is an array to store ordered and duplicateTs
* `pOrdered` is the pointer to store ordered
* `pDupTs` is the pointer to store duplicateTs
*/
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray, bool *orderedDup) {
SArray *rowArray, bool *pOrdered, bool *pDupTs) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -3340,19 +3340,17 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
goto _exit;
}
if (orderedDup) {
if (pOrdered && pDupTs) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
// init to ordered by default
orderedDup[0] = true;
// init to non-duplicate by default
orderedDup[1] = false;
*pOrdered = true;
*pDupTs = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (orderedDup[0] && !orderedDup[1]) {
if (*pOrdered && !*pDupTs) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
orderedDup[0] = (code >= 0);
orderedDup[1] = (code == 0);
*pOrdered = (code >= 0);
*pDupTs = (code == 0);
}
}
lastRowKey = rowKey;

View File

@ -323,7 +323,6 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
int32_t code = 0;
int16_t lastColId = -1;
bool colInOrder = true;
bool orderedDup[2];
if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
@ -369,9 +368,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
// }
}
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs);
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
@ -692,7 +689,6 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
int16_t lastColId = -1;
bool colInOrder = true;
int ncharColNums = 0;
bool orderedDup[2];
if (NULL == *pTSchema) {
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
@ -749,9 +745,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
pBindInfos[c].bytes = pColSchema->bytes;
}
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
pDataBlock->ordered = orderedDup[0];
pDataBlock->duplicateTs = orderedDup[1];
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs);
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

View File

@ -145,7 +145,7 @@ void insert_dist(TAOS* taos, const char *sql) {
UINIT(tbs, ts, ts_len, b, b_len, tags, paramv);
}
void insert_dup(TAOS* taos, const char *sql) {
void insert_dup_rows(TAOS* taos, const char *sql) {
char **tbs, **b;
int64_t **ts;
int *ts_len, *b_len;
@ -169,6 +169,27 @@ void insert_dup(TAOS* taos, const char *sql) {
UINIT(tbs, ts, ts_len, b, b_len, tags, paramv);
}
void insert_dup_tables(TAOS* taos, const char *sql) {
char **tbs, **b;
int64_t **ts;
int *ts_len, *b_len;
TAOS_STMT2_BIND **paramv, **tags;
INIT(tbs, ts, ts_len, b, b_len, tags, paramv);
for (int i = 0; i < CTB_NUMS; i++) {
sprintf(tbs[i], "ctb_%d", i % 2);
}
for (int i = 0; i < CTB_NUMS; i++) {
paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS};
paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS};
}
insert(taos, tbs, tags, paramv, sql);
UINIT(tbs, ts, ts_len, b, b_len, tags, paramv);
}
int main() {
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);
if (!taos) {
@ -180,7 +201,9 @@ int main() {
// insert distinct rows
insert_dist(taos, "insert into db.? using db.stb tags(?,?)values(?,?)");
// insert duplicate rows
insert_dup(taos, "insert into db.? values(?,?)");
insert_dup_rows(taos, "insert into db.? values(?,?)");
// insert duplicate tables
insert_dup_tables(taos, "insert into db.? values(?,?)");
taos_close(taos);
taos_cleanup();