diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 4c13f76e24..62ba1b6fb9 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -85,11 +85,12 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid); void qCleanupKeywordsTable(); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); -int32_t qResetStmtDataBlock(void* block, bool keepBuf); -int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset); -int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId); -void qDestroyStmtDataBlock(void* pBlock); -STableMeta* qGetTableMetaInDataBlock(void* pDataBlock); +int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf); +int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset); +int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb); +void qDestroyStmtDataBlock(STableDataCxt* pBlock); +STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock); +int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData); int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx); int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 6319281212..1eb2a009dd 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -163,6 +163,25 @@ typedef struct STargetInfo { int32_t vgId; } STargetInfo; + +typedef struct SBoundColInfo { + int16_t *pColIndex; // bound index => schema index + int32_t numOfCols; + int32_t numOfBound; +} SBoundColInfo; + + +typedef struct STableDataCxt { + STableMeta *pMeta; + STSchema *pSchema; + SBoundColInfo boundColsInfo; + SArray *pValues; + SSubmitTbData *pData; + TSKEY lastTs; + bool ordered; + bool duplicateTs; +} STableDataCxt; + typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); @@ -238,6 +257,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t char* parseTagDatatoJson(void* p); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); +int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen, void* (*mallocFp)(int64_t)); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 1bda1684c4..2b42de93e3 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -21,8 +21,6 @@ extern "C" { #endif #include "catalog.h" -typedef void STableDataCxt; - typedef enum { STMT_TYPE_INSERT = 1, STMT_TYPE_MULTI_INSERT, @@ -71,10 +69,11 @@ typedef struct SStmtBindInfo { } SStmtBindInfo; typedef struct SStmtExecInfo { - int32_t affectedRows; - SRequestObj *pRequest; - SHashObj *pBlockHash; - bool autoCreateTbl; + int32_t affectedRows; + SRequestObj *pRequest; + SHashObj *pBlockHash; + STableDataCxt *pCurrBlock; + SSubmitTbData *pCurrTbData; } SStmtExecInfo; typedef struct SStmtSQLInfo { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index b9887f28d4..1ef70b96e5 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -171,12 +171,11 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) { +int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) { STscStmt* pStmt = (STscStmt*)stmt; pStmt->sql.pVgHash = pVgHash; pStmt->exec.pBlockHash = pBlockHash; - pStmt->exec.autoCreateTbl = autoCreateTbl; return TSDB_CODE_SUCCESS; } @@ -186,7 +185,7 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SNam STscStmt* pStmt = (STscStmt*)stmt; STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl)); - STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl)); + STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash)); pStmt->sql.autoCreateTbl = autoCreateTbl; @@ -293,14 +292,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { char* key = taosHashGetKey(pIter, &keyLen); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); -/* - if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { + if (keepTable && pBlocks == pStmt->exec.pCurrBlock) { + ASSERT(NULL == pBlocks->pData); + TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData); STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); continue; } -*/ qDestroyStmtDataBlock(pBlocks); taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); @@ -308,13 +307,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); } - pStmt->exec.autoCreateTbl = false; - if (!keepTable) { taosHashCleanup(pStmt->exec.pBlockHash); pStmt->exec.pBlockHash = NULL; } + tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_DECODE); + pStmt->exec.pCurrTbData = NULL; + STMT_ERR_RET(stmtCleanBindInfo(pStmt)); return TSDB_CODE_SUCCESS; @@ -362,7 +362,7 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD STMT_ERR_RET( taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); - STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId)); + STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId, pStmt->sql.autoCreateTbl)); return TSDB_CODE_SUCCESS; } @@ -371,11 +371,13 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; - STableDataCxt* pCxtInExec = + STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (pCxtInExec) { pStmt->bInfo.needParse = false; pStmt->bInfo.inExecCache = true; + + pStmt->exec.pCurrBlock = *pCxtInExec; if (pStmt->sql.autoCreateTbl) { tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName); @@ -403,8 +405,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid)); if (pCache) { pStmt->bInfo.needParse = false; - pStmt->exec.autoCreateTbl = true; - pStmt->bInfo.tbUid = 0; STableDataCxt* pNewBlock = NULL; @@ -415,6 +415,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + pStmt->exec.pCurrBlock = pNewBlock; + tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid); return TSDB_CODE_SUCCESS; @@ -493,6 +495,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + pStmt->exec.pCurrBlock = pNewBlock; + tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName); return TSDB_CODE_SUCCESS; @@ -590,6 +594,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STMT_ERR_RET(stmtGetFromCache(pStmt)); if (pStmt->bInfo.needParse) { + pStmt->exec.pCurrBlock = NULL; + strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; @@ -622,8 +628,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - pStmt->exec.autoCreateTbl = true; - return TSDB_CODE_SUCCESS; } @@ -725,11 +729,17 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { return TSDB_CODE_SUCCESS; } - STableDataCxt** pDataBlock = - (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); - if (NULL == pDataBlock) { - tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); - STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + STableDataCxt** pDataBlock = NULL; + + if (pStmt->exec.pCurrBlock) { + pDataBlock = &pStmt->exec.pCurrBlock; + } else { + pDataBlock= (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (NULL == pDataBlock) { + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); + STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + pStmt->exec.pCurrBlock = *pDataBlock; } if (colIdx < 0) { @@ -857,7 +867,6 @@ int stmtExec(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; SSubmitRsp* pRsp = NULL; - bool autoCreateTbl = pStmt->exec.autoCreateTbl; STMT_DLOG_E("start to exec"); @@ -866,8 +875,10 @@ int stmtExec(TAOS_STMT* stmt) { if (STMT_TYPE_QUERY == pStmt->sql.type) { launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { + STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); + STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL)); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { @@ -890,15 +901,6 @@ _return: stmtCleanExecInfo(pStmt, (code ? false : true), false); - if (TSDB_CODE_SUCCESS == code && autoCreateTbl) { - if (NULL == pRsp) { - tscError("no submit resp got for auto create table"); - code = TSDB_CODE_TSC_APP_ERROR; - } else { - code = stmtUpdateTableUid(pStmt, pRsp); - } - } - tFreeSSubmitRsp(pRsp); ++pStmt->sql.runTimes; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8dc4633443..24cf6fbf5a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6867,6 +6867,10 @@ _exit: } void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { + if (NULL == pTbData) { + return; + } + if (pTbData->pCreateTbReq) { taosMemoryFree(pTbData->pCreateTbReq); } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index d64d1ce0c0..4e098a096f 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -141,23 +141,6 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start); int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyDataBlock(STableDataBlocks *pDataBlock); -typedef struct SBoundColInfo { - int16_t *pColIndex; // bound index => schema index - int32_t numOfCols; - int32_t numOfBound; -} SBoundColInfo; - -typedef struct STableDataCxt { - STableMeta *pMeta; - STSchema *pSchema; - SBoundColInfo boundColsInfo; - SArray *pValues; - SSubmitTbData *pData; - TSKEY lastTs; - bool ordered; - bool duplicateTs; -} STableDataCxt; - typedef struct SVgroupDataCxt { int32_t vgId; SSubmitReq2 *pData; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index bd74789d9a..badb3803cc 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -29,6 +29,29 @@ typedef struct SKvParam { char buf[TSDB_MAX_TAGS_LEN]; } SKvParam; +int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData) { + *pData = taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (NULL == *pData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSubmitTbData *pNew = *pData; + + *pNew = *pDataBlock->pData; + + cloneSVreateTbReq(pDataBlock->pData->pCreateTbReq, &pNew->pCreateTbReq); + pNew->aCol = taosArrayDup(pDataBlock->pData->aCol, NULL); + + int32_t colNum = taosArrayGetSize(pNew->aCol); + for (int32_t i = 0; i < colNum; ++i) { + SColData *pCol = (SColData*)taosArrayGet(pNew->aCol, i); + tColDataDeepClear(pCol); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; @@ -357,7 +380,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel return TSDB_CODE_SUCCESS; } -int32_t qResetStmtDataBlock(void* block, bool deepClear) { +int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) { STableDataCxt* pBlock = (STableDataCxt*)block; int32_t colNum = taosArrayGetSize(pBlock->pData->aCol); @@ -373,7 +396,7 @@ int32_t qResetStmtDataBlock(void* block, bool deepClear) { return TSDB_CODE_SUCCESS; } -int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) { +int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset) { int32_t code = 0; *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt)); @@ -436,7 +459,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) { return code; } -int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { +int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb) { int32_t code = qCloneStmtDataBlock(pDst, pSrc, false); if (code) { return code; @@ -448,12 +471,19 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgI pBlock->pMeta->vgId = vgId; } + if (rebuildCreateTb && pBlock->pData->pCreateTbReq) { + pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (NULL == pBlock->pData->pCreateTbReq) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return TSDB_CODE_SUCCESS; } -STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; } +STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; } -void qDestroyStmtDataBlock(void* pBlock) { +void qDestroyStmtDataBlock(STableDataCxt* pBlock) { if (pBlock == NULL) { return; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 55c47f9e55..190d6869dc 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -486,3 +486,43 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { return TSDB_CODE_SUCCESS; } + +int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) { + if (NULL == pSrc) { + *pDst = NULL; + return TSDB_CODE_SUCCESS; + } + + *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (NULL == *pDst) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + (*pDst)->flags = pSrc->flags; + (*pDst)->name = strdup(pSrc->name); + (*pDst)->uid = pSrc->uid; + (*pDst)->ctime = pSrc->ctime; + (*pDst)->ttl = pSrc->ttl; + (*pDst)->commentLen = pSrc->commentLen; + (*pDst)->comment = strdup(pSrc->comment); + (*pDst)->type = pSrc->type; + + if (pSrc->type == TSDB_CHILD_TABLE) { + (*pDst)->ctb.stbName = strdup(pSrc->ctb.stbName); + (*pDst)->ctb.tagNum = pSrc->ctb.tagNum; + (*pDst)->ctb.suid = pSrc->ctb.suid; + (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL); + STag* pTag = (STag *)pSrc->ctb.pTag; + (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len); + memcpy((*pDst)->ctb.pTag, pTag, pTag->len); + } else { + (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols; + (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols; + (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema)); + memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema)); + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index f48f438d9f..7e543abdd6 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = { .funcIdxList = NULL, .checkParamNum = false, .runTimes = 0, - .caseIdx = 0, + .caseIdx = 9, .caseNum = 1, .caseRunIdx = -1, .caseRunNum = -1,