From c8fe5bc88b18278849e98ec46da4a205208e2280 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 18 Apr 2022 10:19:15 +0800 Subject: [PATCH] stmt --- include/client/taos.h | 36 +--- include/libs/parser/parser.h | 14 ++ source/client/inc/clientInt.h | 6 +- source/client/inc/clientStmt.h | 10 +- source/client/src/clientMain.c | 24 ++- source/client/src/clientStmt.c | 159 +++++++----------- source/client/src/tmq.c | 2 +- source/libs/parser/inc/parInsertData.h | 3 +- source/libs/parser/inc/parInt.h | 16 -- source/libs/parser/src/parInsert.c | 219 +++++++++++++++++++++++-- source/libs/parser/src/parInsertData.c | 189 ++++++--------------- 11 files changed, 354 insertions(+), 324 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index d3856d432e..ac7a671edc 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,38 +92,14 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -typedef struct TAOS_BIND { - int buffer_type; - void *buffer; - uintptr_t buffer_length; // unused - uintptr_t *length; - int *is_null; - - int is_unsigned; // unused - int *error; // unused - union { - int64_t ts; - int8_t b; - int8_t v1; - int16_t v2; - int32_t v4; - int64_t v8; - float f4; - double f8; - unsigned char *bin; - char *nchar; - } u; - unsigned int allocated; -} TAOS_BIND; - -typedef struct TAOS_MULTI_BIND { +typedef struct TAOS_BIND_v2 { int buffer_type; void *buffer; uintptr_t buffer_length; int32_t *length; char *is_null; int num; -} TAOS_MULTI_BIND; +} TAOS_BIND_v2; typedef enum { SET_CONF_RET_SUCC = 0, @@ -154,16 +130,16 @@ const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx); DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 640b4181cd..30e9b199f1 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -80,6 +80,20 @@ void qDestroyQuery(SQuery* pQueryNode); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); +int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); +void qResetStmtDataBlock(void* pBlock, bool freeData); +int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); +void qFreeStmtDataBlock(void* pDataBlock); +int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc); +void qDestroyStmtDataBlock(void* pBlock); +int32_t qBindStmtColsValue(void *pDataBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen); +int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields); +int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields); +int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen); +void destroyBoundColumnInfo(void* pBoundInfo); +int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen); + + #ifdef __cplusplus } #endif diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b352b65bea..e2a89b8e42 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -280,7 +280,8 @@ void initMsgHandleFp(); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port); -int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); +int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); + int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); @@ -311,6 +312,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v // --- mq void hbMgrInitMqHbRspHandle(); +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery); + + #ifdef __cplusplus } #endif diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index d84bbaeba1..4d7d578f70 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -19,6 +19,9 @@ #ifdef __cplusplus extern "C" { #endif +#include "catalog.h" + +typedef void STableDataBlocks; typedef enum { STMT_TYPE_INSERT = 1, @@ -107,15 +110,16 @@ typedef struct STscStmt { TAOS_STMT *stmtInit(TAOS *taos); int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); -char *stmtErrstr(TAOS_STMT *stmt); +const char *stmtErrstr(TAOS_STMT *stmt); int stmtAffectedRows(TAOS_STMT *stmt); int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +int stmtSetTbName(TAOS_STMT *stmt, const char *tbName); +int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags); int stmtIsInsert(TAOS_STMT *stmt, int *insert); int stmtGetParamNum(TAOS_STMT *stmt, int *nums); int stmtAddBatch(TAOS_STMT *stmt); TAOS_RES *stmtUseResult(TAOS_STMT *stmt); -int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); #ifdef __cplusplus diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 91607b5fef..5d1065b59b 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -579,7 +579,7 @@ int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { return stmtPrepare(stmt, sql, length); } -int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { +int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags) { if (stmt == NULL || name == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; @@ -608,25 +608,23 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { return stmtSetTbName(stmt, name); } -int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { +int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) { if (stmt == NULL || bind == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; return terrno; } - TAOS_MULTI_BIND mbind = {0}; - mbind.buffer_type = bind->buffer_type; - mbind.buffer = bind->buffer; - mbind.buffer_length = bind->buffer_length; - mbind.length = bind->length; - mbind.is_null = bind->is_null; - mbind.num = 1; + if (bind->num > 1) { + tscError("invalid bind number %d for %s", bind->num, __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } - return stmtBindBatch(stmt, &mbind); + return stmtBindBatch(stmt, bind); } -int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { +int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) { if (stmt == NULL || bind == NULL) { tscError("NULL parameter for %s", __FUNCTION__); terrno = TSDB_CODE_INVALID_PARA; @@ -642,7 +640,7 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { return stmtBindBatch(stmt, bind); } -int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx) { +int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx) { return stmtBindBatch(stmt, bind); /* TODO */ } @@ -703,7 +701,7 @@ char *taos_stmt_errstr(TAOS_STMT *stmt) { return NULL; } - return stmtErrstr(stmt); + return (char *)stmtErrstr(stmt); } int taos_stmt_affected_rows(TAOS_STMT *stmt) { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 2c0085a600..5a3f232da7 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -48,6 +48,42 @@ int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHa return TSDB_CODE_SUCCESS; } +int32_t stmtCacheBlock(STscStmt *pStmt) { + if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) { + return TSDB_CODE_SUCCESS; + } + + uint64_t uid; + if (TSDB_CHILD_TABLE == pStmt->bind.tbType) { + uid = pStmt->bind.tbSuid; + } else { + ASSERT(TSDB_NORMAL_TABLE == pStmt->bind.tbType); + uid = pStmt->bind.tbUid; + } + + if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) { + return TSDB_CODE_SUCCESS; + } + + STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid)); + STableDataBlocks* pDst = NULL; + + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc)); + + SStmtTableCache cache = { + .pDataBlock = pDst, + .boundTags = pStmt->bind.boundTags, + }; + + if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pStmt->bind.boundTags = NULL; + + return TSDB_CODE_SUCCESS; +} + int32_t stmtParseSql(STscStmt* pStmt) { SStmtCallback stmtCb = { .pStmt = pStmt, @@ -79,81 +115,6 @@ int32_t stmtParseSql(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - -void stmtResetDataBlock(STableDataBlocks* pBlock) { - pBlock->pData = NULL; - pBlock->ordered = true; - pBlock->prevTS = INT64_MIN; - pBlock->size = sizeof(SSubmitBlk); - pBlock->tsSource = -1; - pBlock->numOfTables = 1; - pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; - pBlock->headerSize = pBlock->size; - - memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); -} - - -int32_t stmtCloneDataBlock(STableDataBlocks** pDst, STableDataBlocks* pSrc) { - *pDst = (STableDataBlocks*)taosMemoryMalloc(sizeof(STableDataBlocks)); - if (NULL == *pDst) { - STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); - } - - memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); - (*pDst)->cloned = true; - - stmtResetDataBlock(*pDst); - - return TSDB_CODE_SUCCESS; -} - -void stmtFreeDataBlock(STableDataBlocks* pDataBlock) { - if (pDataBlock == NULL) { - return; - } - - taosMemoryFreeClear(pDataBlock->pData); - taosMemoryFreeClear(pDataBlock); -} - - -int32_t stmtCacheBlock(STscStmt *pStmt) { - if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) { - return TSDB_CODE_SUCCESS; - } - - uint64_t uid; - if (TSDB_CHILD_TABLE == pStmt->bind.tbType) { - uid = pStmt->bind.tbSuid; - } else { - ASSERT(TSDB_NORMAL_TABLE == pStmt->bind.tbType); - uid = pStmt->bind.tbUid; - } - - if (taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid))) { - return TSDB_CODE_SUCCESS; - } - - STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid)); - STableDataBlocks* pDst = NULL; - - STMT_ERR_RET(stmtCloneDataBlock(&pDst, *pSrc)); - - SStmtTableCache cache = { - .pDataBlock = pDst, - .boundTags = pStmt->bind.boundTags, - }; - - if (taosHashPut(pStmt->sql.pTableCache, &uid, sizeof(uid), &cache, sizeof(cache))) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pStmt->bind.boundTags = NULL; - - return TSDB_CODE_SUCCESS; -} - int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bind.tbUid = 0; pStmt->bind.tbSuid = 0; @@ -163,6 +124,8 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { taosMemoryFreeClear(pStmt->bind.tbName); destroyBoundColumnInfo(pStmt->bind.boundTags); taosMemoryFreeClear(pStmt->bind.boundTags); + + return TSDB_CODE_SUCCESS; } int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) { @@ -172,16 +135,17 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) { void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter; - - if (keepTable && (*(uint64_t*)taosHashGetKey(pIter, NULL) == pStmt->bind.tbUid)) { - taosMemoryFreeClear(pBlocks->pData); - stmtResetDataBlock(pBlocks); + uint64_t *key = taosHashGetKey(pIter, NULL); + + if (keepTable && (*key == pStmt->bind.tbUid)) { + qResetStmtDataBlock(pBlocks, true); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); continue; } - stmtFreeDataBlock(pBlocks); + qFreeStmtDataBlock(pBlocks); + taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); } @@ -206,8 +170,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { while (pIter) { SStmtTableCache* pCache = *(SStmtTableCache**)pIter; - pCache->pDataBlock->cloned = false; - destroyDataBlock(pCache->pDataBlock); + qDestroyStmtDataBlock(pCache->pDataBlock); destroyBoundColumnInfo(pCache->boundTags); pIter = taosHashIterate(pStmt->sql.pTableCache, pIter); @@ -269,13 +232,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bind.boundTags = pCache->boundTags; STableDataBlocks* pNewBlock = NULL; - STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock)); - - pNewBlock->pData = taosMemoryMalloc(pNewBlock->nAllocSize); - if (NULL == pNewBlock->pData) { - stmtFreeDataBlock(pNewBlock); - STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); - } + STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock)); if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -319,7 +276,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); } - STMT_SWITCH_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR); pStmt->sql.sqlStr = strndup(sql, length); pStmt->sql.sqlLen = length; @@ -331,7 +288,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); taosMemoryFree(pStmt->bind.tbName); @@ -348,10 +305,10 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) { return TSDB_CODE_SUCCESS; } -int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND *tags) { +int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND_v2 *tags) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); @@ -372,7 +329,7 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND *tags) { int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_FETCH_TAG_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); @@ -394,10 +351,10 @@ int32_t stmtFetchTagFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fiel return TSDB_CODE_SUCCESS; } -int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* fields) { +int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD** fields) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_FETCH_COL_FIELDS, TSDB_CODE_TSC_STMT_API_ERROR); if (pStmt->bind.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); @@ -419,10 +376,10 @@ int32_t stmtFetchColFields(TAOS_STMT *stmt, int32_t *fieldNum, TAOS_FIELD* field return TSDB_CODE_SUCCESS; } -int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { +int stmtBindBatch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_BIND, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_BIND, TSDB_CODE_TSC_STMT_API_ERROR); if (pStmt->bind.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { pStmt->bind.needParse = false; @@ -451,7 +408,7 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { int stmtAddBatch(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_SWITCH_STATUS(stmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_ADD_BATCH, TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(stmtCacheBlock(pStmt)); @@ -462,7 +419,7 @@ int stmtExec(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; - STMT_SWITCH_STATUS(stmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_API_ERROR); + STMT_SWITCH_STATUS(pStmt, STMT_EXECUTE, TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); @@ -484,7 +441,7 @@ int stmtClose(TAOS_STMT *stmt) { return TSDB_CODE_SUCCESS; } -char *stmtErrstr(TAOS_STMT *stmt) { +const char *stmtErrstr(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; if (stmt == NULL) { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index dbe78782f5..70b87f51c0 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -582,7 +582,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa int32_t code = 0; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return); + CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return); // todo check for invalid sql statement and return with error code diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index f0fffff38b..6b45d16d1a 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -131,7 +131,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* int32_t schemaIdxCompar(const void *lhs, const void *rhs); int32_t boundIdxCompar(const void *lhs, const void *rhs); void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols); -void destroyBoundColumnInfo(void* pBoundInfo); void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockHashmap(SHashObj* pDataBlockHash); int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); @@ -139,5 +138,7 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq); int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks); +int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq); +int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); #endif // TDENGINE_DATABLOCKMGT_H diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 6b9b90c513..87348e292e 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -24,27 +24,11 @@ extern "C" { #include "parToken.h" #include "parUtil.h" -typedef struct SKvParam { - SKVRowBuilder *builder; - SSchema *schema; - char buf[TSDB_MAX_TAGS_LEN]; -} SKvParam; - -#define CHECK_CODE(expr) \ - do { \ - int32_t code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - return code; \ - } \ - } while (0) - int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); -int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf); -int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 4d89f70a59..1917b8372e 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -62,6 +62,29 @@ typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void *value, int32_t static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; +typedef struct SKvParam { + SKVRowBuilder *builder; + SSchema *schema; + char buf[TSDB_MAX_TAGS_LEN]; +} SKvParam; + +typedef struct SMemParam { + SRowBuilder* rb; + SSchema* schema; + int32_t toffset; + col_id_t colIdx; +} SMemParam; + + +#define CHECK_CODE(expr) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + return code; \ + } \ + } while (0) + + static int32_t skipInsertInto(SInsertParseContext* pCxt) { SToken sToken; NEXT_TOKEN(pCxt->pSql, sToken); @@ -156,7 +179,7 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD } -int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) { +static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) { const char* msg1 = "name too long"; const char* msg2 = "invalid database name"; const char* msg3 = "db is not specified"; @@ -294,7 +317,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) { return TSDB_CODE_SUCCESS; } -static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) { +int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start) { // once the data block is disordered, we do NOT keep previous timestamp any more if (!pDataBlocks->ordered) { return TSDB_CODE_SUCCESS; @@ -600,13 +623,6 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int return TSDB_CODE_FAILED; } -typedef struct SMemParam { - SRowBuilder* rb; - SSchema* schema; - int32_t toffset; - col_id_t colIdx; -} SMemParam; - static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) { SMemParam* pa = (SMemParam*)param; SRowBuilder* rb = pa->rb; @@ -713,7 +729,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) { +static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) { SKvParam* pa = (SKvParam*) param; int8_t type = pa->schema->type; @@ -979,7 +995,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da return TSDB_CODE_SUCCESS; } -static void destroyCreateSubTbReq(SVCreateTbReq* pReq) { +void destroyCreateSubTbReq(SVCreateTbReq* pReq) { taosMemoryFreeClear(pReq->dbFName); taosMemoryFreeClear(pReq->name); taosMemoryFreeClear(pReq->ctbCfg.pTag); @@ -1111,7 +1127,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pTableMeta, tags); + (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); (*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); @@ -1149,8 +1165,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (pContext->pStmtCb && *pQuery) { (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj); } else { - context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), - context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), + context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || @@ -1184,7 +1200,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { } -int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) { +int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) { SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen}; SToken sToken; int32_t code = 0; @@ -1203,7 +1219,7 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam NEXT_TOKEN(pTableName, sToken); - if (SToken.n > 0) { + if (sToken.n > 0) { return buildInvalidOperationMsg(&msg, "table name format is wrong"); } @@ -1217,12 +1233,12 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash SInsertParseContext insertCtx = { .pVgroupsHashObj = pVgHash, .pTableBlockHashObj = pBlockHash, - .pOutput = pQuery->pRoot + .pOutput = (SVnodeModifOpStmt*)pQuery->pRoot, }; // merge according to vgId if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { - CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); + CHECK_CODE(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks)); } CHECK_CODE(buildOutput(&insertCtx)); @@ -1230,4 +1246,171 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash return TSDB_CODE_SUCCESS; } +int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen){ + STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SKVRowBuilder tagBuilder; + if (tdInitKVRowBuilder(&tagBuilder) < 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + SKvParam param = {.builder = &tagBuilder}; + + for (int c = 0; c < tags->numOfBound; ++c) { + if (bind[c].is_null && bind[c].is_null[0]) { + KvRowAppend(&pBuf, NULL, 0, ¶m); + continue; + } + + SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1 + param.schema = pTagSchema; + + int32_t colLen = pTagSchema->bytes; + if (IS_VAR_DATA_TYPE(pTagSchema->type)) { + colLen = bind[c].length[0]; + } + + CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m)); + } + + SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); + if (NULL == row) { + tdDestroyKVRowBuilder(&tagBuilder); + return buildInvalidOperationMsg(&pBuf, "tag value expected"); + } + tdSortKVRowByColIdx(row); + + SVCreateTbReq tbReq = {0}; + CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid)); + CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); + + destroyCreateSubTbReq(&tbReq); + tdDestroyKVRowBuilder(&tagBuilder); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen) { + STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + int32_t extendedRowSize = getExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + + CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num)); + + for (int32_t r = 0; r < bind->num; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header + tdSRowResetBuf(pBuilder, row); + + // 1. set the parsed value from sql string + for (int c = 0; c < spd->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; + + param.schema = pColSchema; + getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); + + if (bind[c].is_null && bind[c].is_null[r]) { + CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); + } else { + int32_t colLen = pColSchema->bytes; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + colLen = bind[c].length[r]; + } + + CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + checkTimestamp(pDataBlock, (const char *)&tsKey); + } + } + + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE + tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i, + spd->cols[i].toffset); + } + } + } + + pDataBlock->size += extendedRowSize; + } + + SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { + *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); + if (NULL == *fields) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { + SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1]; + strcpy((*fields)[i].name, pTagSchema->name); + (*fields)[i].type = pTagSchema->type; + (*fields)[i].bytes = pTagSchema->bytes; + } + + *fieldNum = boundInfo->numOfBound; + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) { + STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + if (tags->numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtColFields(void *pBlock, int32_t *fieldNum, TAOS_FIELD** fields) { + STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + if (pDataBlock->boundColumnInfo.numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index 9b31924c5e..a7481109fa 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -156,7 +156,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star return TSDB_CODE_SUCCESS; } -static int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { +int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) { int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq); if (pBlocks->nAllocSize - pBlocks->size < len) { pBlocks->nAllocSize += len + pBlocks->rowSize; @@ -571,166 +571,75 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo return TSDB_CODE_SUCCESS; } -int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){ - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; + +void qResetStmtDataBlock(void* block, bool freeData) { + STableDataBlocks* pBlock = (STableDataBlocks*)block; + + if (freeData) { + taosMemoryFree(pBlock->pData); } - SKVRowBuilder tagBuilder; - if (tdInitKVRowBuilder(&tagBuilder) < 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + pBlock->pData = NULL; + pBlock->ordered = true; + pBlock->prevTS = INT64_MIN; + pBlock->size = sizeof(SSubmitBlk); + pBlock->tsSource = -1; + pBlock->numOfTables = 1; + pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; + pBlock->headerSize = pBlock->size; + + memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); +} + + +int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { + *pDst = taosMemoryMalloc(sizeof(STableDataBlocks)); + if (NULL == *pDst) { + return TSDB_CODE_OUT_OF_MEMORY; } - - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - SKvParam param = {.builder = &tagBuilder}; - - for (int c = 0; c < tags->numOfBound; ++c) { - if (bind[c].is_null && bind[c].is_null[0]) { - KvRowAppend(&pBuf, NULL, 0, ¶m); - continue; - } - - SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1 - param.schema = pTagSchema; - - int32_t colLen = pTagSchema->bytes; - if (IS_VAR_DATA_TYPE(pTagSchema->type)) { - colLen = bind[c].length[0]; - } - - CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m)); - } - - SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); - if (NULL == row) { - tdDestroyKVRowBuilder(&tagBuilder); - return buildInvalidOperationMsg(&pBuf, "tag value expected"); - } - tdSortKVRowByColIdx(row); - - SVCreateTbReq tbReq = {0}; - CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid)); - CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); - - destroyCreateSubTbReq(&tbReq); - tdDestroyKVRowBuilder(&tagBuilder); + + memcpy(*pDst, pSrc, sizeof(STableDataBlocks)); + ((STableDataBlocks*)(*pDst))->cloned = true; + + qResetStmtDataBlock(*pDst, false); return TSDB_CODE_SUCCESS; } - -int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - - CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num); - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); - - // 1. set the parsed value from sql string - for (int c = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; - - param.schema = pColSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - - if (bind[c].is_null && bind[c].is_null[r]) { - CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind[c].length[r]; - } - - CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlock, (const char *)&tsKey); - } - } - - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE - tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i, - spd->cols[i].toffset); - } - } - } - - pDataBlock->size += extendedRowSize; - } - - SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); +int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) { + int32_t code = qCloneStmtDataBlock(pDst, pSrc); + if (code) { + return code; } - return TSDB_CODE_SUCCESS; -} - - -int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { - *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); - if (NULL == *fields) { + STableDataBlocks *pBlock = (STableDataBlocks*)*pDst; + pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize); + if (NULL == pBlock->pData) { + qFreeStmtDataBlock(pBlock); return TSDB_CODE_OUT_OF_MEMORY; } - for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { - SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1]; - strcpy((*fields)[i].name, pTagSchema->name); - (*fields)[i].type = pTagSchema->type; - (*fields)[i].bytes = pTagSchema->bytes; - } - - *fieldNum = boundInfo->numOfBound; - return TSDB_CODE_SUCCESS; } -int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) { - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; - } - - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - if (tags->numOfBound <= 0) { - *fieldNum = 0; - *fields = NULL; - - return TSDB_CODE_SUCCESS; +void qFreeStmtDataBlock(void* pDataBlock) { + if (pDataBlock == NULL) { + return; } - CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields)); - - return TSDB_CODE_SUCCESS; + taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData); + taosMemoryFreeClear(pDataBlock); } -int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) { - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - if (pDataBlock->boundColumnInfo.numOfBound <= 0) { - *fieldNum = 0; - *fields = NULL; - - return TSDB_CODE_SUCCESS; +void qDestroyStmtDataBlock(void* pBlock) { + if (pBlock == NULL) { + return; } - CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields)); - - return TSDB_CODE_SUCCESS; + STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; + + pDataBlock->cloned = false; + destroyDataBlock(pDataBlock); } -