diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f0fa977608..640b4181cd 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "querynodes.h" +#include "query.h" typedef struct SStmtCallback { TAOS_STMT* pStmt; diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index cbfe380a03..d84bbaeba1 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -23,7 +23,7 @@ extern "C" { typedef enum { STMT_TYPE_INSERT = 1, STMT_TYPE_MULTI_INSERT, - STMT_TYPE_QUERY, + STMT_TYPE_QUERY } STMT_TYPE; typedef enum { @@ -84,21 +84,24 @@ typedef struct STscStmt { #define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) do { - switch (_newstatus) { - case STMT_INIT: - if ((_stmt)->status != 0) return (_errcode); - break; - case STMT_PREPARE: - if ((_stmt)->status != STMT_INIT) STMT_ERR_RET(_errcode); - break; - case STMT_SETTBNAME: - break; - default: - STMT_ERR_RET(_errcode); - break; - } -} while (0) +#define STMT_SWITCH_STATUS(_stmt, _newstatus, _errcode) \ + do { \ + switch (_newstatus) { \ + case STMT_INIT: \ + if ((_stmt)->sql.status != 0) return (_errcode); \ + break; \ + case STMT_PREPARE: \ + if ((_stmt)->sql.status != STMT_INIT) STMT_ERR_RET(_errcode); \ + break; \ + case STMT_SETTBNAME: \ + break; \ + default: \ + STMT_ERR_RET(_errcode); \ + break; \ + } \ + \ + (_stmt)->sql.status = _newstatus; \ + } while (0) TAOS_STMT *stmtInit(TAOS *taos); @@ -106,7 +109,6 @@ int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); char *stmtErrstr(TAOS_STMT *stmt); int stmtAffectedRows(TAOS_STMT *stmt); -int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind); int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length); int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); int stmtIsInsert(TAOS_STMT *stmt, int *insert); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 914f58c2e0..91607b5fef 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -615,7 +615,15 @@ int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { return terrno; } - return stmtBind(stmt, bind); + TAOS_MULTI_BIND mbind = {0}; + mbind.buffer_type = bind->buffer_type; + mbind.buffer = bind->buffer; + mbind.buffer_length = bind->buffer_length; + mbind.length = bind->length; + mbind.is_null = bind->is_null; + mbind.num = 1; + + return stmtBindBatch(stmt, &mbind); } int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { @@ -634,6 +642,10 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { return stmtBindBatch(stmt, bind); } +int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx) { + return stmtBindBatch(stmt, bind); /* TODO */ +} + int taos_stmt_add_batch(TAOS_STMT *stmt) { if (stmt == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 54abeccc6a..2c0085a600 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -75,6 +75,8 @@ int32_t stmtParseSql(STscStmt* pStmt) { STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); } + STMT_ERR_RET(stmtCacheBlock(pStmt)); + return TSDB_CODE_SUCCESS; } @@ -152,8 +154,20 @@ int32_t stmtCacheBlock(STscStmt *pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) { - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pStmt->sql.pQuery->pRoot; +int32_t stmtCleanBindInfo(STscStmt* pStmt) { + pStmt->bind.tbUid = 0; + pStmt->bind.tbSuid = 0; + pStmt->bind.tbType = 0; + pStmt->bind.needParse = true; + + taosMemoryFreeClear(pStmt->bind.tbName); + destroyBoundColumnInfo(pStmt->bind.boundTags); + taosMemoryFreeClear(pStmt->bind.boundTags); +} + +int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable) { + taos_free_result(pStmt->exec.pRequest); + pStmt->exec.pRequest = NULL; void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { @@ -178,13 +192,33 @@ int32_t stmtCleanExecCtx(STscStmt* pStmt, bool keepTable) { taosHashCleanup(pStmt->exec.pBlockHash); pStmt->exec.pBlockHash = NULL; - - pStmt->bind.tbUid = 0; - pStmt->bind.tbSuid = 0; - pStmt->bind.tbType = 0; + + STMT_ERR_RET(stmtCleanBindInfo(pStmt)); + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtCleanSQLInfo(STscStmt* pStmt) { + taosMemoryFree(pStmt->sql.sqlStr); + qDestroyQuery(pStmt->sql.pQuery); + + void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL); + while (pIter) { + SStmtTableCache* pCache = *(SStmtTableCache**)pIter; + + pCache->pDataBlock->cloned = false; + destroyDataBlock(pCache->pDataBlock); + destroyBoundColumnInfo(pCache->boundTags); - destroyBoundColumnInfo(pStmt->bind.boundTags); - taosMemoryFreeClear(pStmt->bind.boundTags); + pIter = taosHashIterate(pStmt->sql.pTableCache, pIter); + } + taosHashCleanup(pStmt->sql.pTableCache); + pStmt->sql.pTableCache = NULL; + + memset(&pStmt->sql, 0, sizeof(pStmt->sql)); + + STMT_ERR_RET(stmtCleanExecInfo(pStmt, false)); + STMT_ERR_RET(stmtCleanBindInfo(pStmt)); return TSDB_CODE_SUCCESS; } @@ -281,6 +315,10 @@ TAOS_STMT *stmtInit(TAOS *taos) { int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { STscStmt* pStmt = (STscStmt*)stmt; + if (pStmt->sql.status >= STMT_PREPARE) { + STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); + } + STMT_SWITCH_STATUS(stmt, STMT_PREPARE, TSDB_CODE_TSC_STMT_API_ERROR); pStmt->sql.sqlStr = strndup(sql, length); @@ -434,12 +472,7 @@ int stmtExec(TAOS_STMT *stmt) { _return: - stmtCleanExecCtx(pStmt, (code ? false : true)); - - taos_free_result(pStmt->exec.pRequest); - pStmt->exec.pRequest = NULL; - - pStmt->bind.needParse = true; + stmtCleanExecInfo(pStmt, (code ? false : true)); ++pStmt->sql.runTimes; @@ -466,6 +499,10 @@ int stmtAffectedRows(TAOS_STMT *stmt) { } int stmtIsInsert(TAOS_STMT *stmt, int *insert) { + STscStmt* pStmt = (STscStmt*)stmt; + + *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index b3440eff70..f0fffff38b 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -131,7 +131,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* int32_t schemaIdxCompar(const void *lhs, const void *rhs); int32_t boundIdxCompar(const void *lhs, const void *rhs); void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols); -void destroyBoundColumnInfo(SParsedDataColInfo* pColList); +void destroyBoundColumnInfo(void* pBoundInfo); void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockHashmap(SHashObj* pDataBlockHash); int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); diff --git a/source/libs/parser/inc/parInt.h b/source/libs/parser/inc/parInt.h index 8f88a05f77..6b9b90c513 100644 --- a/source/libs/parser/inc/parInt.h +++ b/source/libs/parser/inc/parInt.h @@ -21,13 +21,30 @@ extern "C" { #endif #include "parser.h" +#include "parToken.h" +#include "parUtil.h" + +typedef struct SKvParam { + SKVRowBuilder *builder; + SSchema *schema; + char buf[TSDB_MAX_TAGS_LEN]; +} SKvParam; + +#define CHECK_CODE(expr) \ + do { \ + int32_t code = expr; \ + if (TSDB_CODE_SUCCESS != code) { \ + return code; \ + } \ + } while (0) int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery); int32_t parse(SParseContext* pParseCxt, SQuery** pQuery); int32_t translate(SParseContext* pParseCxt, SQuery* pQuery); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); -int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf); +int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf); +int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index e16e19b78a..4d89f70a59 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -40,14 +40,6 @@ sToken = tStrGetToken(pSql, &index, false); \ } while (0) -#define CHECK_CODE(expr) \ - do { \ - int32_t code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - return code; \ - } \ - } while (0) - typedef struct SInsertParseContext { SParseContext* pComCxt; // input char *pSql; // input @@ -163,7 +155,8 @@ static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullD return TSDB_CODE_SUCCESS; } -int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, char* dbName, SMsgBuf* pMsgBuf) { + +int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) { const char* msg1 = "name too long"; const char* msg2 = "invalid database name"; const char* msg3 = "db is not specified"; @@ -720,13 +713,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -typedef struct SKvParam { - SKVRowBuilder *builder; - SSchema *schema; - char buf[TSDB_MAX_TAGS_LEN]; -} SKvParam; - -static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) { +int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, void *param) { SKvParam* pa = (SKvParam*) param; int8_t type = pa->schema->type; @@ -1195,3 +1182,52 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { destroyInsertParseContext(&context); return code; } + + +int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) { + SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen}; + SToken sToken; + int32_t code = 0; + char *tbName = NULL; + + NEXT_TOKEN(pTableName, sToken); + + if (sToken.n == 0) { + return buildInvalidOperationMsg(&msg, "empty table name"); + } + + code = createSName(pName, &sToken, acctId, dbName, &msg); + if (code) { + return code; + } + + NEXT_TOKEN(pTableName, sToken); + + if (SToken.n > 0) { + return buildInvalidOperationMsg(&msg, "table name format is wrong"); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { + SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; + int32_t code = 0; + SInsertParseContext insertCtx = { + .pVgroupsHashObj = pVgHash, + .pTableBlockHashObj = pBlockHash, + .pOutput = pQuery->pRoot + }; + + // merge according to vgId + if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { + CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); + } + + CHECK_CODE(buildOutput(&insertCtx)); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index c601acf7bd..9b31924c5e 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -18,6 +18,7 @@ #include "catalog.h" #include "parUtil.h" #include "querynodes.h" +#include "parInt.h" #define IS_RAW_PAYLOAD(t) \ (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert @@ -102,10 +103,12 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) { } } -void destroyBoundColumnInfo(SParsedDataColInfo* pColList) { - if (NULL == pColList) { +void destroyBoundColumnInfo(void* pBoundInfo) { + if (NULL == pBoundInfo) { return; } + + SParsedDataColInfo* pColList = (SParsedDataColInfo*)pBoundInfo; taosMemoryFreeClear(pColList->boundColumns); taosMemoryFreeClear(pColList->cols); @@ -567,3 +570,167 @@ int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo pColInfo->boundNullLen); return TSDB_CODE_SUCCESS; } + +int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){ + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SKVRowBuilder tagBuilder; + if (tdInitKVRowBuilder(&tagBuilder) < 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + SKvParam param = {.builder = &tagBuilder}; + + for (int c = 0; c < tags->numOfBound; ++c) { + if (bind[c].is_null && bind[c].is_null[0]) { + KvRowAppend(&pBuf, NULL, 0, ¶m); + continue; + } + + SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1 + param.schema = pTagSchema; + + int32_t colLen = pTagSchema->bytes; + if (IS_VAR_DATA_TYPE(pTagSchema->type)) { + colLen = bind[c].length[0]; + } + + CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m)); + } + + SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); + if (NULL == row) { + tdDestroyKVRowBuilder(&tagBuilder); + return buildInvalidOperationMsg(&pBuf, "tag value expected"); + } + tdSortKVRowByColIdx(row); + + SVCreateTbReq tbReq = {0}; + CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid)); + CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); + + destroyCreateSubTbReq(&tbReq); + tdDestroyKVRowBuilder(&tagBuilder); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + int32_t extendedRowSize = getExtendedRowSize(pDataBlock); + SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; + SRowBuilder* pBuilder = &pDataBlock->rowBuilder; + SMemParam param = {.rb = pBuilder}; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + + CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num); + + for (int32_t r = 0; r < bind->num; ++r) { + STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header + tdSRowResetBuf(pBuilder, row); + + // 1. set the parsed value from sql string + for (int c = 0; c < spd->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; + + param.schema = pColSchema; + getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); + + if (bind[c].is_null && bind[c].is_null[r]) { + CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); + } else { + int32_t colLen = pColSchema->bytes; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + colLen = bind[c].length[r]; + } + + CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); + } + + if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { + TSKEY tsKey = TD_ROW_KEY(row); + checkTimestamp(pDataBlock, (const char *)&tsKey); + } + } + + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE + tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i, + spd->cols[i].toffset); + } + } + } + + pDataBlock->size += extendedRowSize; + } + + SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData); + if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { + return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { + *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); + if (NULL == *fields) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { + SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1]; + strcpy((*fields)[i].name, pTagSchema->name); + (*fields)[i].type = pTagSchema->type; + (*fields)[i].bytes = pTagSchema->bytes; + } + + *fieldNum = boundInfo->numOfBound; + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) { + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + if (tags->numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) { + SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); + if (pDataBlock->boundColumnInfo.numOfBound <= 0) { + *fieldNum = 0; + *fields = NULL; + + return TSDB_CODE_SUCCESS; + } + + CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields)); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 5c57c873ee..1674ef0ace 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -51,212 +51,6 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { return code; } -int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen) { - SMsgBuf msg = {.buf = msgBuf, .len =msgBufLen}; - SToken sToken; - int32_t code = 0; - char *tbName = NULL; - - NEXT_TOKEN(pTableName, sToken); - - if (sToken.n == 0) { - return buildInvalidOperationMsg(&msg, "empty table name"); - } - - code = createSName(pName, &sToken, acctId, dbName, &msg); - if (code) { - return code; - } - - NEXT_TOKEN(pTableName, sToken); - - if (SToken.n > 0) { - return buildInvalidOperationMsg(&msg, "table name format is wrong"); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){ - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; - } - - SKVRowBuilder tagBuilder; - if (tdInitKVRowBuilder(&tagBuilder) < 0) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - SKvParam param = {.builder = &tagBuilder}; - - for (int c = 0; c < tags->numOfBound; ++c) { - if (bind[c].is_null && bind[c].is_null[0]) { - KvRowAppend(&pBuf, NULL, 0, ¶m); - continue; - } - - SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1 - param.schema = pTagSchema; - - int32_t colLen = pTagSchema->bytes; - if (IS_VAR_DATA_TYPE(pTagSchema->type)) { - colLen = bind[c].length[0]; - } - - CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m)); - } - - SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); - if (NULL == row) { - tdDestroyKVRowBuilder(&tagBuilder); - return buildInvalidOperationMsg(&pBuf, "tag value expected"); - } - tdSortKVRowByColIdx(row); - - SVCreateTbReq tbReq = {0}; - CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid)); - CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); - - destroyCreateSubTbReq(&tbReq); - tdDestroyKVRowBuilder(&tagBuilder); - - return TSDB_CODE_SUCCESS; -} - - -int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; - SRowBuilder* pBuilder = &pDataBlock->rowBuilder; - SMemParam param = {.rb = pBuilder}; - SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - - CHECK_CODE(allocateMemForSize(pDataBlock, extendedRowSize * bind->num); - - for (int32_t r = 0; r < bind->num; ++r) { - STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header - tdSRowResetBuf(pBuilder, row); - - // 1. set the parsed value from sql string - for (int c = 0; c < spd->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; - - param.schema = pColSchema; - getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - - if (bind[c].is_null && bind[c].is_null[r]) { - CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); - } else { - int32_t colLen = pColSchema->bytes; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - colLen = bind[c].length[r]; - } - - CHECK_CODE(MemRowAppend(&pBuf, (char *)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m)); - } - - if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) { - TSKEY tsKey = TD_ROW_KEY(row); - checkTimestamp(pDataBlock, (const char *)&tsKey); - } - } - - // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { - for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE - tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(pSchema[i].type), true, pSchema[i].type, i, - spd->cols[i].toffset); - } - } - } - - pDataBlock->size += extendedRowSize; - } - - SSubmitBlk *pBlocks = (SSubmitBlk *)(pDataBlock->pData); - if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { - return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { - SVnodeModifOpStmt *modifyNode = (SVnodeModifOpStmt *)pQuery->pRoot; - int32_t code = 0; - SInsertParseContext insertCtx = { - .pVgroupsHashObj = pVgHash, - .pTableBlockHashObj = pBlockHash, - .pOutput = pQuery->pRoot - }; - - // merge according to vgId - if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { - CHECK_CODE_GOTO(mergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks), _return); - } - - CHECK_CODE(buildOutput(&insertCtx)); - - return TSDB_CODE_SUCCESS; -} - -int32_t buildBoundFields(SParsedDataColInfo *boundInfo, SSchema *pSchema, int32_t *fieldNum, TAOS_FIELD** fields) { - *fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD)); - if (NULL == *fields) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - for (int32_t i = 0; i < boundInfo->numOfBound; ++i) { - SSchema* pTagSchema = &pSchema[boundInfo->boundColumns[i] - 1]; - strcpy((*fields)[i].name, pTagSchema->name); - (*fields)[i].type = pTagSchema->type; - (*fields)[i].bytes = pTagSchema->bytes; - } - - *fieldNum = boundInfo->numOfBound; - - return TSDB_CODE_SUCCESS; -} - - -int32_t qBuildStmtTagFields(STableDataBlocks *pDataBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields) { - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; - if (NULL == tags) { - return TSDB_CODE_QRY_APP_ERROR; - } - - SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); - if (tags->numOfBound <= 0) { - *fieldNum = 0; - *fields = NULL; - - return TSDB_CODE_SUCCESS; - } - - CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields)); - - return TSDB_CODE_SUCCESS; -} - -int32_t qBuildStmtColFields(STableDataBlocks *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields) { - SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); - if (pDataBlock->boundColumnInfo.numOfBound <= 0) { - *fieldNum = 0; - *fields = NULL; - - return TSDB_CODE_SUCCESS; - } - - CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields)); - - return TSDB_CODE_SUCCESS; -} - void qDestroyQuery(SQuery* pQueryNode) { if (NULL == pQueryNode) { return;