fix(stmt2):sort stmt2 bind rowkey (#30309)

This commit is contained in:
Mario Peng 2025-03-24 14:43:13 +08:00 committed by GitHub
parent c5841939d8
commit ce5e0d6c0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 61 additions and 14 deletions

View File

@ -252,6 +252,8 @@ typedef struct STableColsData {
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SArray* aCol; SArray* aCol;
bool getFromHash; bool getFromHash;
bool isOrdered;
bool isDuplicateTs;
} STableColsData; } STableColsData;
typedef struct STableVgUid { typedef struct STableVgUid {

View File

@ -86,6 +86,7 @@ static int32_t stmtCreateRequest(STscStmt2* pStmt) {
if (pStmt->reqid != 0) { if (pStmt->reqid != 0) {
pStmt->reqid++; pStmt->reqid++;
} }
pStmt->exec.pRequest->type = RES_TYPE__QUERY;
if (pStmt->db != NULL) { if (pStmt->db != NULL) {
taosMemoryFreeClear(pStmt->exec.pRequest->pDb); taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db); pStmt->exec.pRequest->pDb = taosStrdup(pStmt->db);
@ -1560,6 +1561,8 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCr
// param->tblData.aCol = taosArrayInit(20, POINTER_BYTES); // param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
param->restoreTbCols = false; param->restoreTbCols = false;
param->tblData.isOrdered = true;
param->tblData.isDuplicateTs = false;
tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN); tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
param->pCreateTbReq = pCreateTbReq; param->pCreateTbReq = pCreateTbReq;
@ -1577,6 +1580,8 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCr
code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo, pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
pStmt->taos->optionInfo.charsetCxt); pStmt->taos->optionInfo.charsetCxt);
param->tblData.isOrdered = (*pDataBlock)->ordered;
param->tblData.isDuplicateTs = (*pDataBlock)->duplicateTs;
} else { } else {
if (colIdx == -1) { if (colIdx == -1) {
if (pStmt->sql.bindRowFormat) { if (pStmt->sql.bindRowFormat) {

View File

@ -1212,6 +1212,45 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
taos_stmt2_close(stmt); taos_stmt2_close(stmt);
} }
// TD-34123 disorder pk ts
{
do_query(taos, "create stable stmt2_testdb_6.stb2 (ts timestamp, int_col int PRIMARY KEY) tags(int_tag int);");
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
const char* sql =
"INSERT INTO stmt2_testdb_6.? using stmt2_testdb_6.stb2 (int_tag)tags(1) (ts,int_col)VALUES (?,?)";
printf("stmt2 [%s] : %s\n", "disorder pk ts", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
int tag_i = 0;
int tag_l = sizeof(int);
int tag_bl = 3;
int64_t ts[5] = {1591060628003, 1591060628002, 1591060628002, 1591060628002, 1591060628001};
int t64_len[5] = {sizeof(int64_t), sizeof(int64_t), sizeof(int64_t), sizeof(int64_t), sizeof(int64_t)};
int coli[5] = {1, 4, 4, 3, 2};
int ilen[5] = {sizeof(int), sizeof(int), sizeof(int), sizeof(int), sizeof(int)};
int total_affect_rows = 0;
char is_null[2] = {1, 1};
TAOS_STMT2_BIND params1[2] = {
{TSDB_DATA_TYPE_TIMESTAMP, &ts, &t64_len[0], NULL, 5},
{TSDB_DATA_TYPE_INT, &coli, &ilen[0], NULL, 5},
};
TAOS_STMT2_BIND* paramv = &params1[0];
char* tbname = "tb3";
TAOS_STMT2_BINDV bindv = {1, &tbname, NULL, &paramv};
code = taos_stmt2_bind_param(stmt, &bindv, -1);
checkError(stmt, code);
int affected_rows;
taos_stmt2_exec(stmt, &affected_rows);
checkError(stmt, code);
ASSERT_EQ(affected_rows, 4);
taos_stmt2_close(stmt);
}
// get fields insert into ? valuse // get fields insert into ? valuse
{ {

View File

@ -520,11 +520,12 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
*pOrdered = true; *pOrdered = true;
*pDupTs = false; *pDupTs = false;
} else { } else {
// no more compare if we already get disordered or duplicate rows if (*pOrdered) {
if (*pOrdered && !*pDupTs) { int32_t res = tRowKeyCompare(&rowKey, &lastRowKey);
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); *pOrdered = (res >= 0);
*pOrdered = (code >= 0); if (!*pDupTs) {
*pDupTs = (code == 0); *pDupTs = (res == 0);
}
} }
} }
lastRowKey = rowKey; lastRowKey = rowKey;
@ -3349,17 +3350,17 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
*pOrdered = true; *pOrdered = true;
*pDupTs = false; *pDupTs = false;
} else { } else {
// no more compare if we already get disordered or duplicate rows if (*pOrdered) {
if (*pOrdered && !*pDupTs) { int32_t res = tRowKeyCompare(&rowKey, &lastRowKey);
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey); *pOrdered = (res >= 0);
*pOrdered = (code >= 0); if (!*pDupTs) {
*pDupTs = (code == 0); *pDupTs = (res == 0);
}
} }
lastRowKey = rowKey;
} }
lastRowKey = rowKey;
} }
} }
_exit: _exit:
taosArrayDestroy(colValArray); taosArrayDestroy(colValArray);
taosArrayDestroy(bufArray); taosArrayDestroy(bufArray);

View File

@ -639,10 +639,10 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
} }
} }
if (!pTbCtx->ordered) { if (!pTbData->isOrdered) {
code = tRowSort(pTbCtx->pData->aRowP); code = tRowSort(pTbCtx->pData->aRowP);
} }
if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) { if (code == TSDB_CODE_SUCCESS && (!pTbData->isOrdered || pTbData->isDuplicateTs)) {
code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0); code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
} }