enh(stmt2): [TD-33660] interlace mode support auto create table (#30084)

* enh: remove interlace autocreate tb limit

* enh: autocreate tb with single ctb

* enh: multi-insert in one ctb

* enh: Function realization

* fix: some unit test error

fix: stmt1 unit test

* fix: review and autoCreateTb & interlace support insert into stb syntax

* fix: insert into tags fixed value auto create table

* fix: remove a wrong test

* fix: adapter segment fault

* fix: adaptor core

* fix: get fields make interlace error

* fix: adapter core problem 2

* fix: multi bind exec invalid message problem
This commit is contained in:
Mario Peng 2025-03-14 14:13:26 +08:00 committed by GitHub
parent 3d053e2c9b
commit f5a339f03e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 316 additions and 118 deletions

View File

@ -128,7 +128,7 @@ int32_t qInitKeywordsTable();
void qCleanupKeywordsTable();
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo);
SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq);
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks);
// int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx,
// int32_t tbNum);
@ -166,7 +166,8 @@ int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* b
int32_t qBindStmt2RowValue(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
STSchema** pTSchema, SBindInfo2* pBindInfos, void* charsetCxt);
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt);
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt,
SVCreateTbReq* pCreateTbReq);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
@ -196,7 +197,7 @@ int32_t serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
void destoryCatalogReq(SCatalogReq* pCatalogReq);
bool isPrimaryKeyImpl(SNode* pExpr);
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo);
SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq);
#ifdef __cplusplus
}

View File

@ -123,6 +123,7 @@ typedef struct SStmtStatInfo {
typedef struct SStmtQNode {
bool restoreTbCols;
STableColsData tblData;
SVCreateTbReq *pCreateTbReq;
struct SStmtQNode* next;
} SStmtQNode;

View File

@ -102,6 +102,8 @@ typedef struct {
SHashObj *pVgHash;
SBindInfo2 *pBindInfo;
bool bindRowFormat;
bool fixValueTags;
SVCreateTbReq *fixValueTbReq;
SStbInterlaceInfo siInfo;
} SStmtSQLInfo2;
@ -234,8 +236,9 @@ int stmtClose2(TAOS_STMT2 *stmt);
int stmtExec2(TAOS_STMT2 *stmt, int *affected_rows);
int stmtPrepare2(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
int stmtSetTbName2(TAOS_STMT2 *stmt, const char *tbName);
int stmtSetTbTags2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *tags);
int stmtBindBatch2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *bind, int32_t colIdx);
int stmtSetTbTags2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *tags, SVCreateTbReq **pCreateTbReq);
int stmtCheckTags2(TAOS_STMT2 *stmt, SVCreateTbReq **pCreateTbReq);
int stmtBindBatch2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *bind, int32_t colIdx, SVCreateTbReq *pCreateTbReq);
int stmtGetStbColFields2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_ALL **fields);
int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums);
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);

View File

@ -2220,16 +2220,17 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
}
}
SVCreateTbReq *pCreateTbReq = NULL;
if (bindv->tags && bindv->tags[i]) {
code = stmtSetTbTags2(stmt, bindv->tags[i]);
if (code) {
goto out;
}
} else if (pStmt->bInfo.tbType == TSDB_CHILD_TABLE && pStmt->sql.autoCreateTbl) {
code = stmtSetTbTags2(stmt, NULL);
if (code) {
return code;
}
code = stmtSetTbTags2(stmt, bindv->tags[i], &pCreateTbReq);
} else if (pStmt->sql.autoCreateTbl || pStmt->bInfo.needParse) {
code = stmtCheckTags2(stmt, &pCreateTbReq);
} else {
pStmt->sql.autoCreateTbl = false;
}
if (code) {
goto out;
}
if (bindv->bind_cols && bindv->bind_cols[i]) {
@ -2249,7 +2250,7 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
goto out;
}
code = stmtBindBatch2(stmt, bind, col_idx);
code = stmtBindBatch2(stmt, bind, col_idx, pCreateTbReq);
if (TSDB_CODE_SUCCESS != code) {
goto out;
}

View File

@ -762,7 +762,7 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
} else {
STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
&pStmt->sql.siInfo));
&pStmt->sql.siInfo, NULL));
// taosMemoryFree(pParam->pTbData);

View File

@ -183,7 +183,7 @@ static int32_t stmtGetTbName(TAOS_STMT2* stmt, char** tbName) {
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
if ('\0' == pStmt->bInfo.tbName[0]) {
tscError("no table name set");
tscWarn("no table name set, OK if it is a stmt get fields");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
@ -240,9 +240,6 @@ static int32_t stmtUpdateInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void* ta
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
pStmt->sql.autoCreateTbl = autoCreateTbl;
if (pStmt->sql.autoCreateTbl) {
pStmt->sql.stbInterlaceMode = false;
}
return TSDB_CODE_SUCCESS;
}
@ -294,18 +291,18 @@ static int32_t stmtParseSql(STscStmt2* pStmt) {
}
STableDataCxt* pTableCtx = *pSrc;
if (pStmt->sql.stbInterlaceMode) {
int16_t lastIdx = -1;
// if (pStmt->sql.stbInterlaceMode) {
// int16_t lastIdx = -1;
for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
pStmt->sql.stbInterlaceMode = false;
break;
}
// for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
// if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
// pStmt->sql.stbInterlaceMode = false;
// break;
// }
lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
}
}
// lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
// }
// }
if (NULL == pStmt->sql.pBindInfo) {
pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
@ -319,7 +316,6 @@ static int32_t stmtParseSql(STscStmt2* pStmt) {
static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
pStmt->bInfo.tbUid = 0;
pStmt->bInfo.tbSuid = 0;
pStmt->bInfo.tbVgId = -1;
pStmt->bInfo.tbType = 0;
pStmt->bInfo.needParse = true;
@ -331,7 +327,10 @@ static int32_t stmtCleanBindInfo(STscStmt2* pStmt) {
qDestroyBoundColInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
}
pStmt->bInfo.stbFName[0] = 0;
if (!pStmt->sql.autoCreateTbl) {
pStmt->bInfo.stbFName[0] = 0;
pStmt->bInfo.tbSuid = 0;
}
return TSDB_CODE_SUCCESS;
}
@ -439,6 +438,9 @@ static int32_t stmtCleanSQLInfo(STscStmt2* pStmt) {
taosArrayDestroy(pStmt->sql.nodeList);
taosHashCleanup(pStmt->sql.pVgHash);
pStmt->sql.pVgHash = NULL;
if (pStmt->sql.fixValueTags) {
tdDestroySVCreateTbReq(pStmt->sql.fixValueTbReq);
}
void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
while (pIter) {
@ -691,12 +693,11 @@ static int32_t stmtAsyncOutput(STscStmt2* pStmt, void* param) {
atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true);
} else {
STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
&pStmt->sql.siInfo));
int code = qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock,
&pStmt->sql.siInfo, pParam->pCreateTbReq);
// taosMemoryFree(pParam->pTbData);
(void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
STMT_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
}
@ -1019,7 +1020,7 @@ int stmtSetTbName2(TAOS_STMT2* stmt, const char* tbName) {
return TSDB_CODE_SUCCESS;
}
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) {
int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags, SVCreateTbReq** pCreateTbReq) {
STscStmt2* pStmt = (STscStmt2*)stmt;
STMT_DLOG_E("start to set tbTags");
@ -1048,22 +1049,125 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) {
// tscWarn("no tags or cols bound in sql, will not bound tags");
// return TSDB_CODE_SUCCESS;
// }
STableDataCxt** pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
if (pStmt->sql.autoCreateTbl && pStmt->sql.stbInterlaceMode) {
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
}
STableDataCxt** pDataBlock = NULL;
if (pStmt->exec.pCurrBlock) {
pDataBlock = &pStmt->exec.pCurrBlock;
} else {
pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
}
// pStmt->exec.pCurrBlock = *pDataBlock;
// if (pStmt->sql.stbInterlaceMode) {
// taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
// (*pDataBlock)->pData->aCol = NULL;
// }
}
if (pStmt->bInfo.inExecCache && !pStmt->sql.autoCreateTbl) {
return TSDB_CODE_SUCCESS;
}
tscDebug("start to bind stmt tag values");
STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
void* boundTags = NULL;
if (pStmt->sql.stbInterlaceMode) {
boundTags = pStmt->sql.siInfo.boundTags;
*pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pCreateTbReq) {
return terrno;
}
int32_t vgId = -1;
STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
(*pCreateTbReq)->uid = vgId;
} else {
boundTags = pStmt->bInfo.boundTags;
}
STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt));
pStmt->exec.pRequest->msgBufLen, pStmt->taos->optionInfo.charsetCxt, *pCreateTbReq));
return TSDB_CODE_SUCCESS;
}
int stmtCheckTags2(TAOS_STMT2* stmt, SVCreateTbReq** pCreateTbReq) {
STscStmt2* pStmt = (STscStmt2*)stmt;
STMT_DLOG_E("start to set tbTags");
if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode;
}
if (!pStmt->sql.stbInterlaceMode) {
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
if (!pStmt->sql.autoCreateTbl) {
return TSDB_CODE_SUCCESS;
}
}
if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, pStmt->bInfo.tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName));
STableDataCxt** pDataBlock = NULL;
if (pStmt->exec.pCurrBlock) {
pDataBlock = &pStmt->exec.pCurrBlock;
} else {
pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
}
}
if (!((*pDataBlock)->pData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE)) {
return TSDB_CODE_SUCCESS;
}
if (pStmt->sql.fixValueTags) {
STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
if ((*pCreateTbReq)->name) {
taosMemoryFree((*pCreateTbReq)->name);
}
(*pCreateTbReq)->name = taosStrdup(pStmt->bInfo.tbName);
int32_t vgId = -1;
STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
(*pCreateTbReq)->uid = vgId;
return TSDB_CODE_SUCCESS;
}
if ((*pDataBlock)->pData->pCreateTbReq) {
pStmt->sql.fixValueTags = true;
STMT_ERR_RET(cloneSVreateTbReq((*pDataBlock)->pData->pCreateTbReq, &pStmt->sql.fixValueTbReq));
STMT_ERR_RET(cloneSVreateTbReq(pStmt->sql.fixValueTbReq, pCreateTbReq));
(*pCreateTbReq)->uid = (*pDataBlock)->pMeta->vgId;
}
return TSDB_CODE_SUCCESS;
}
@ -1110,22 +1214,27 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL
}
STableDataCxt** pDataBlock = NULL;
bool cleanStb = false;
if (pStmt->sql.stbInterlaceMode) {
if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx != NULL) {
pDataBlock = &pStmt->sql.siInfo.pDataCtx;
} else {
cleanStb = true;
pDataBlock =
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
}
}
STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
if (NULL == pDataBlock || NULL == *pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
}
STMT_ERRI_JRET(
qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE && cleanStb) {
pStmt->bInfo.needParse = true;
qDestroyStmtDataBlock(*pDataBlock);
*pDataBlock = NULL;
if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
@ -1281,6 +1390,10 @@ static int stmtAddBatch2(TAOS_STMT2* stmt) {
param->restoreTbCols = true;
param->next = NULL;
if (pStmt->sql.autoCreateTbl) {
pStmt->bInfo.tagsCached = true;
}
stmtEnqueue(pStmt, param);
return TSDB_CODE_SUCCESS;
@ -1334,7 +1447,7 @@ static int32_t stmtRestoreQueryFields(STscStmt2* pStmt) {
return TSDB_CODE_SUCCESS;
}
*/
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx, SVCreateTbReq* pCreateTbReq) {
STscStmt2* pStmt = (STscStmt2*)stmt;
int32_t code = 0;
@ -1443,6 +1556,8 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
param->restoreTbCols = false;
tstrncpy(param->tblData.tbName, pStmt->bInfo.tbName, TSDB_TABLE_NAME_LEN);
param->pCreateTbReq = pCreateTbReq;
}
int64_t startUs3 = taosGetTimestampUs();
@ -1452,7 +1567,8 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
if (colIdx < 0) {
if (pStmt->sql.stbInterlaceMode) {
(*pDataBlock)->pData->flags = 0;
// (*pDataBlock)->pData->flags = 0;
(*pDataBlock)->pData->flags &= ~SUBMIT_REQ_COLUMN_DATA_FORMAT;
code = qBindStmtStbColsValue2(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo,
pStmt->taos->optionInfo.charsetCxt);

View File

@ -43,8 +43,7 @@ void checkError(TAOS_STMT2* stmt, int code) {
if (pStmt == nullptr || pStmt->sql.sqlStr == nullptr || pStmt->exec.pRequest == nullptr) {
printf("stmt api error\n stats : %d\n errstr : %s\n", pStmt->sql.status, taos_stmt_errstr(stmt));
} else {
printf("stmt api error\n sql : %s\n stats : %d\n errstr : %s\n", pStmt->sql.sqlStr, pStmt->sql.status,
taos_stmt_errstr(stmt));
printf("stmt api error\n sql : %s\n stats : %d\n", pStmt->sql.sqlStr, pStmt->sql.status);
}
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
}
@ -133,9 +132,9 @@ void do_query(TAOS* taos, const char* sql) {
taos_free_result(result);
}
void do_stmt(TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUMS, int ROW_NUMS, int CYC_NUMS,
bool hastags, bool createTable) {
printf("test sql : %s\n", sql);
void do_stmt(const char* msg, TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUMS, int ROW_NUMS,
int CYC_NUMS, bool hastags, bool createTable) {
printf("stmt2 [%s] : %s\n", msg, sql);
do_query(taos, "drop database if exists stmt2_testdb_1");
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_1");
do_query(taos, "create stable stmt2_testdb_1.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
@ -903,14 +902,14 @@ TEST(stmt2Case, stmt2_stb_insert) {
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);
ASSERT_NE(taos, nullptr);
// normal
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
TAOS_STMT2_OPTION option = {0, false, true, NULL, NULL};
{
do_stmt(taos, &option, "insert into `stmt2_testdb_1`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true,
true);
do_stmt("no-interlcace", taos, &option, "insert into `stmt2_testdb_1`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)",
3, 3, 3, true, true);
}
{
do_stmt(taos, &option, "insert into `stmt2_testdb_1`.? using `stmt2_testdb_1`.`stb` tags(?,?) values(?,?)", 3, 3, 3,
true, true);
do_stmt("no-interlcace", taos, &option,
"insert into `stmt2_testdb_1`.? using `stmt2_testdb_1`.`stb` tags(?,?) values(?,?)", 3, 3, 3, true, true);
}
// async
@ -918,28 +917,61 @@ TEST(stmt2Case, stmt2_stb_insert) {
aa->async_affected_rows = 0;
ASSERT_EQ(tsem_init(&aa->sem, 0, 0), TSDB_CODE_SUCCESS);
void* param = aa;
option = {0, true, true, stmtAsyncQueryCb, param};
option = {0, false, true, stmtAsyncQueryCb, param};
{
do_stmt(taos, &option, "insert into stmt2_testdb_1.stb (ts,b,tbname,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true, true);
do_stmt("no-interlcace & aync exec", taos, &option,
"insert into stmt2_testdb_1.stb (ts,b,tbname,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true, true);
}
{
do_stmt(taos, &option, "insert into stmt2_testdb_1.? using stmt2_testdb_1.stb (t1,t2)tags(?,?) (ts,b)values(?,?)",
3, 3, 3, true, true);
do_stmt("no-interlcace & aync exec", taos, &option,
"insert into stmt2_testdb_1.? using stmt2_testdb_1.stb (t1,t2)tags(?,?) (ts,b)values(?,?)", 3, 3, 3, true,
true);
}
// { do_stmt(taos, &option, "insert into db.? values(?,?)", 3, 3, 3, false, true); }
// interlace = 0 & use db]
do_query(taos, "use stmt2_testdb_1");
option = {0, false, false, NULL, NULL};
{ do_stmt(taos, &option, "insert into stb (tbname,ts,b) values(?,?,?)", 3, 3, 3, false, true); }
{ do_stmt(taos, &option, "insert into ? using stb (t1,t2)tags(?,?) (ts,b)values(?,?)", 3, 3, 3, true, true); }
{ do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
{
do_stmt("no-interlcace & no-db", taos, &option, "insert into stb (tbname,ts,b) values(?,?,?)", 3, 3, 3, false,
true);
}
{
do_stmt("no-interlcace & no-db", taos, &option, "insert into ? using stb (t1,t2)tags(?,?) (ts,b)values(?,?)", 3, 3,
3, true, true);
}
{ do_stmt("no-interlcace & no-db", taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
// interlace = 1
option = {0, true, true, stmtAsyncQueryCb, param};
{ do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
{ do_stmt("interlcace & preCreateTB", taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
option = {0, true, true, NULL, NULL};
{ do_stmt(taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
{ do_stmt("interlcace & preCreateTB", taos, &option, "insert into ? values(?,?)", 3, 3, 3, false, true); }
// auto create table
// interlace = 1
option = {0, true, true, NULL, NULL};
{
do_stmt("interlcace & no-preCreateTB", taos, &option, "insert into ? using stb (t1,t2)tags(?,?) (ts,b)values(?,?)",
3, 3, 3, true, false);
}
{
do_stmt("interlcace & no-preCreateTB", taos, &option,
"insert into stmt2_testdb_1.? using stb (t1,t2)tags(1,'abc') (ts,b)values(?,?)", 3, 3, 3, false, false);
}
{
do_stmt("interlcace & no-preCreateTB", taos, &option,
"insert into stmt2_testdb_1.stb (ts,b,tbname,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true, false);
}
// interlace = 0
option = {0, false, false, NULL, NULL};
{
do_stmt("no-interlcace & no-preCreateTB", taos, &option,
"insert into ? using stb (t1,t2)tags(?,?) (ts,b)values(?,?)", 3, 3, 3, true, false);
}
{
do_stmt("no-interlcace & no-preCreateTB", taos, &option,
"insert into stmt2_testdb_1.stb (ts,b,tbname,t1,t2) values(?,?,?,?,?)", 3, 3, 3, true, false);
}
do_query(taos, "drop database if exists stmt2_testdb_1");
(void)tsem_destroy(&aa->sem);
@ -967,7 +999,8 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
const char* sql = "INSERT INTO stmt2_testdb_6.? using stmt2_testdb_6.stb1 (int_tag)tags(1) (ts) VALUES (?)";
int code = taos_stmt2_prepare(stmt, sql, 0);
printf("stmt2 [%s] : %s\n", "less params", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
int total_affect_rows = 0;
@ -987,7 +1020,7 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2};
TAOS_STMT2_BIND* paramv[2] = {&params1, &params2};
char* tbname[2] = {"tb1", "tb2"};
TAOS_STMT2_BINDV bindv = {2, &tbname[0], NULL, &paramv[0]};
TAOS_STMT2_BINDV bindv = {2, &tbname[0], tagv, &paramv[0]};
code = taos_stmt2_bind_param(stmt, &bindv, -1);
checkError(stmt, code);
@ -1004,10 +1037,12 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
// less cols and tags
{
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL};
ASSERT_NE(stmt, nullptr);
const char* sql = "INSERT INTO stmt2_testdb_6.stb1 (ts,int_tag,tbname) VALUES (?,?,?)";
int code = taos_stmt2_prepare(stmt, sql, 0);
printf("stmt2 [%s] : %s\n", "less params", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
int total_affect_rows = 0;
@ -1026,7 +1061,7 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
TAOS_STMT2_BIND* tagv[2] = {&tags1, &tags2};
TAOS_STMT2_BIND* paramv[2] = {&params1, &params2};
char* tbname[2] = {"tb3", "tb4"};
char* tbname[2] = {"tb1", "tb2"};
TAOS_STMT2_BINDV bindv = {2, &tbname[0], &tagv[0], &paramv[0]};
code = taos_stmt2_bind_param(stmt, &bindv, -1);
checkError(stmt, code);
@ -1044,10 +1079,15 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
// disorder cols and tags
{
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL};
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
do_query(taos,
"INSERT INTO stmt2_testdb_6.stb1 (ts, int_tag, tbname) VALUES (1591060627000, 5, 'tb5')(1591060627000, "
"6,'tb6')");
const char* sql = "INSERT INTO stmt2_testdb_6.stb1 (binary_tag,int_col,tbname,ts,int_tag) VALUES (?,?,?,?,?)";
int code = taos_stmt2_prepare(stmt, sql, 0);
printf("stmt2 [%s] : %s\n", "disorder params", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
int tag_i = 0;
@ -1096,6 +1136,7 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
ASSERT_NE(stmt, nullptr);
const char* sql =
"INSERT INTO stmt2_testdb_6.? using stmt2_testdb_6.stb1 (int_tag)tags(1) (int_col,ts)VALUES (?,?)";
printf("stmt2 [%s] : %s\n", "PK error", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
@ -1131,11 +1172,10 @@ TEST(stmt2Case, stmt2_insert_db) {
ASSERT_NE(taos, nullptr);
do_query(taos, "drop database if exists stmt2_testdb_12");
do_query(taos, "create database IF NOT EXISTS stmt2_testdb_12");
do_query(taos, "create stable `stmt2_testdb_12`.`stb1`(ts timestamp, int_col int) tags(int_tag int)");
do_query(taos,
"create stable `stmt2_testdb_12`.`stb1` (ts timestamp, int_col int,long_col bigint,double_col "
"double,bool_col bool,binary_col binary(20),nchar_col nchar(20),varbinary_col varbinary(20),geometry_col "
"geometry(200)) tags(int_tag int,long_tag bigint,double_tag double,bool_tag bool,binary_tag "
"binary(20),nchar_tag nchar(20),varbinary_tag varbinary(20),geometry_tag geometry(200));");
"INSERT INTO `stmt2_testdb_12`.`stb1` (ts,int_tag,tbname) VALUES "
"(1591060627000,1,'tb1')(1591060627000,2,'tb2')");
TAOS_STMT2_OPTION option = {0, false, false, NULL, NULL};

View File

@ -11866,6 +11866,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
// auto create table
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if (!(pSubmitTbData->pCreateTbReq)) {
uError("auto create table but request is NULL");
return TSDB_CODE_INVALID_MSG;
}
TAOS_CHECK_EXIT(tEncodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq));
@ -12005,7 +12006,7 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pReq->aSubmitTbData, i);
if ((pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) && pSubmitTbData->pCreateTbReq == NULL) {
pSubmitTbData->flags = 0;
pSubmitTbData->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
}
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, pSubmitTbData));
}

View File

@ -1937,7 +1937,6 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
return buildInvalidOperationMsg(&pCxt->msg, "not support mixed bind and non-bind values");
}
pCxt->isStmtBind = true;
pStmt->usingTableProcessing = true;
if (pCols->pColIndex[i] == tbnameIdx) {
*bFoundTbName = true;
char* tbName = NULL;
@ -1972,6 +1971,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
if (!(tag_index < numOfTags)) {
return buildInvalidOperationMsg(&pCxt->msg, "not expected numOfTags");
}
pStmt->usingTableProcessing = true;
pCxt->tags.pColIndex[tag_index++] = pCols->pColIndex[i] - numOfCols;
pCxt->tags.mixTagsCols = true;
pCxt->tags.numOfBound++;

View File

@ -66,9 +66,9 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
}
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo) {
SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
// merge according to vgId
return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo);
return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo, ctbReq);
}
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
@ -487,7 +487,8 @@ _return:
}
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt) {
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen, void* charsetCxt,
SVCreateTbReq* pCreateTbReq) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS;
@ -607,6 +608,13 @@ int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const c
goto end;
}
if (pCreateTbReq){
code = insBuildCreateTbReq(pCreateTbReq, tName, pTag, suid, sTableName, tagName,
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
pTag = NULL;
goto end;
}
if (NULL == pDataBlock->pData->pCreateTbReq) {
pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pDataBlock->pData->pCreateTbReq) {

View File

@ -610,7 +610,7 @@ int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDa
}
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
SStbInterlaceInfo* pBuildInfo) {
SStbInterlaceInfo* pBuildInfo, SVCreateTbReq* ctbReq) {
int32_t code = TSDB_CODE_SUCCESS;
uint64_t uid;
int32_t vgId;
@ -618,14 +618,27 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
pTbCtx->pData->aRowP = pTbData->aCol;
code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
if (TSDB_CODE_SUCCESS != code) {
return code;
if (ctbReq !=NULL && code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
vgId = (int32_t)ctbReq->uid;
uid = 0;
pTbCtx->pMeta->vgId=(int32_t)ctbReq->uid;
ctbReq->uid=0;
pTbCtx->pData->pCreateTbReq = ctbReq;
code = TSDB_CODE_SUCCESS;
} else {
if (TSDB_CODE_SUCCESS != code) {
return code;
}
pTbCtx->pMeta->vgId = vgId;
pTbCtx->pMeta->uid = uid;
pTbCtx->pData->uid = uid;
pTbCtx->pData->pCreateTbReq = NULL;
if (ctbReq != NULL) {
tdDestroySVCreateTbReq(ctbReq);
}
}
pTbCtx->pMeta->vgId = vgId;
pTbCtx->pMeta->uid = uid;
pTbCtx->pData->uid = uid;
if (!pTbCtx->ordered) {
code = tRowSort(pTbCtx->pData->aRowP);
}

View File

@ -127,7 +127,7 @@ class TDTestCase(TBase):
def bugsTD(self, benchmark):
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TD-31490.json", checkStep = False)
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TD-31575.json")
self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TD-32846.json")
# self.testBenchmarkJson(benchmark, "./tools/benchmark/basic/json/TD-32846.json")
# no drop
db = "td32913db"

View File

@ -5,9 +5,9 @@
#include <unistd.h>
#include "taos.h"
int CTB_NUMS = 1;
int ROW_NUMS = 1;
int CYC_NUMS = 2;
int CTB_NUMS = 10000;
int ROW_NUMS = 10;
int CYC_NUMS = 1;
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* result = taos_query(taos, sql);
@ -44,10 +44,10 @@ void initEnv(TAOS* taos) {
do_query(taos, "use db");
}
void do_stmt(TAOS* taos, const char* sql) {
void do_stmt(TAOS* taos, const char* sql, bool hasTag) {
initEnv(taos);
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
TAOS_STMT2_OPTION option = {0, false, true, NULL, NULL};
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
int code = taos_stmt2_prepare(stmt, sql, 0);
@ -74,8 +74,11 @@ void do_stmt(TAOS* taos, const char* sql) {
for (int i = 0; i < CTB_NUMS; i++) {
tbs[i] = (char*)malloc(sizeof(char) * 20);
sprintf(tbs[i], "ctb_%d", i);
// createCtb(taos, tbs[i]);
createCtb(taos, tbs[i]);
}
double bind_time = 0;
double exec_time = 0;
for (int r = 0; r < CYC_NUMS; r++) {
// col params
int64_t** ts = (int64_t**)malloc(CTB_NUMS * sizeof(int64_t*));
@ -100,9 +103,6 @@ void do_stmt(TAOS* taos, const char* sql) {
int t2len = 3;
// TAOS_STMT2_BIND* tagv[2] = {&tags[0][0], &tags[1][0]};
clock_t start, end;
double cpu_time_used;
// bind params
TAOS_STMT2_BIND** paramv = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*));
TAOS_STMT2_BIND** tags = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*));
@ -118,26 +118,32 @@ void do_stmt(TAOS* taos, const char* sql) {
paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS};
}
// bind
start = clock();
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, tags, paramv};
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start); // 获取开始时间
TAOS_STMT2_BINDV bindv;
if (hasTag) {
bindv = (TAOS_STMT2_BINDV){CTB_NUMS, tbs, tags, paramv};
} else {
bindv = (TAOS_STMT2_BINDV){CTB_NUMS, tbs, NULL, paramv};
}
if (taos_stmt2_bind_param(stmt, &bindv, -1)) {
printf("failed to execute taos_stmt2_bind_param statement.error:%s\n", taos_stmt2_error(stmt));
taos_stmt2_close(stmt);
return;
}
end = clock();
cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("stmt2-bind [%s] insert Time used: %f seconds\n", sql, cpu_time_used);
start = clock();
clock_gettime(CLOCK_MONOTONIC, &end); // 获取开始时间 TAOS_STMT2_BINDV bindv;
bind_time += ((double)(end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9);
clock_gettime(CLOCK_MONOTONIC, &start); // 获取开始时间
// exec
if (taos_stmt2_exec(stmt, NULL)) {
printf("failed to execute taos_stmt2_exec statement.error:%s\n", taos_stmt2_error(stmt));
taos_stmt2_close(stmt);
return;
}
end = clock();
cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC;
printf("stmt2-exec [%s] insert Time used: %f seconds\n", sql, cpu_time_used);
clock_gettime(CLOCK_MONOTONIC, &end); // 获取开始时间
exec_time += ((double)(end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9);
for (int i = 0; i < CTB_NUMS; i++) {
free(tags[i]);
@ -152,6 +158,10 @@ void do_stmt(TAOS* taos, const char* sql) {
free(paramv);
free(tags);
}
printf("stmt2-bind [%s] insert Time used: %f seconds\n", sql, bind_time);
printf("stmt2-exec [%s] insert Time used: %f seconds\n", sql, exec_time);
for (int i = 0; i < CTB_NUMS; i++) {
free(tbs[i]);
}
@ -217,8 +227,12 @@ int main() {
exit(1);
}
do_stmt(taos, "insert into `db`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)");
// do_stmt(taos, "insert into db.? using db.stb tags(?,?)values(?,?)");
// do_stmt(taos, "insert into `db`.`stb` (tbname,ts,b,t1,t2) values(?,?,?,?,?)", true);
// printf("no interlace\n");
do_stmt(taos, "insert into db.? using db.stb tags(?,?)values(?,?)", true);
do_stmt(taos, "insert into db.? values(?,?)", false);
// do_taosc(taos);
taos_close(taos);
taos_cleanup();