From 1c499c94a71d6ee678fae31c5cbe1ca0d04125a4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 16 Apr 2022 17:47:42 +0800 Subject: [PATCH] stmt --- source/client/inc/clientStmt.h | 6 ---- source/client/src/clientStmt.c | 27 ++++++++++------ source/libs/parser/src/parInsert.c | 33 +++++++++++++++---- source/libs/parser/src/parser.c | 51 +++++++++++++++++++++++++++++- 4 files changed, 94 insertions(+), 23 deletions(-) diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 7fb23840fe..cbfe380a03 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -51,7 +51,6 @@ typedef struct SStmtBindInfo { void* boundTags; char* tbName; SName sname; - TAOS_BIND* bindTags; } SStmtBindInfo; typedef struct SStmtExecInfo { @@ -78,11 +77,6 @@ typedef struct STscStmt { SStmtSQLInfo sql; SStmtExecInfo exec; SStmtBindInfo bind; - - //SMultiTbStmt mtb; - //SNormalStmt normal; - - //int numOfRows; } STscStmt; diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index f2797adcc1..54abeccc6a 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -85,7 +85,7 @@ void stmtResetDataBlock(STableDataBlocks* pBlock) { pBlock->size = sizeof(SSubmitBlk); pBlock->tsSource = -1; pBlock->numOfTables = 1; - pBlock->nAllocSize = 0; + pBlock->nAllocSize = TSDB_PAYLOAD_SIZE; pBlock->headerSize = pBlock->size; memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder)); @@ -237,6 +237,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STableDataBlocks* pNewBlock = NULL; STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock)); + pNewBlock->pData = taosMemoryMalloc(pNewBlock->nAllocSize); + if (NULL == pNewBlock->pData) { + stmtFreeDataBlock(pNewBlock); + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -284,7 +290,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { } -int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { +int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) { STscStmt* pStmt = (STscStmt*)stmt; STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); @@ -304,20 +310,23 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { return TSDB_CODE_SUCCESS; } -int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) { +int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND *tags) { STscStmt* pStmt = (STscStmt*)stmt; STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR); if (pStmt->bind.needParse) { - taosMemoryFree(pStmt->bind.bindTags); - pStmt->bind.bindTags = tags; - STMT_ERR_RET(stmtParseSql(pStmt)); - } else { - //TODO BIND TAG DATA } + STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid)); + if (NULL == pDataBlock) { + tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid); + STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + STMT_ERR_RET(qBindStmtTagsValue(pDataBlock, pStmt->bind.boundTags, pStmt->bind.tbSuid, &pStmt->bind.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); + return TSDB_CODE_SUCCESS; } @@ -395,7 +404,7 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - qBindStmtData(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + qBindStmtColsValue(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 6b7109fea3..e16e19b78a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -753,14 +753,14 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi return TSDB_CODE_SUCCESS; } -static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, SKVRow row) { +static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRow row, int64_t suid) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pName, dbFName); - pCxt->createTblReq.type = TD_CHILD_TABLE; - pCxt->createTblReq.dbFName = strdup(dbFName); - pCxt->createTblReq.name = strdup(pName->tname); - pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid; - pCxt->createTblReq.ctbCfg.pTag = row; + pTbReq->type = TD_CHILD_TABLE; + pTbReq->dbFName = strdup(dbFName); + pTbReq->name = strdup(pName->tname); + pTbReq->ctbCfg.suid = suid; + pTbReq->ctbCfg.pTag = row; return TSDB_CODE_SUCCESS; } @@ -773,21 +773,40 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint SKvParam param = {.builder = &pCxt->tagsBuilder}; SToken sToken; + bool isParseBindParam = false; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" for (int i = 0; i < pCxt->tags.numOfBound; ++i) { NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); + + if (sToken.type == TK_NK_QUESTION) { + isParseBindParam = true; + if (NULL == pCxt->pStmtCb) { + return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z); + } + + continue; + } + + if (isParseBindParam) { + return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values"); + } + SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1 param.schema = pTagSchema; CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg)); } + if (isParseBindParam) { + return TSDB_CODE_SUCCESS; + } + SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder); if (NULL == row) { return buildInvalidOperationMsg(&pCxt->msg, "tag value expected"); } tdSortKVRowByColIdx(row); - return buildCreateTbReq(pCxt, pName, row); + return buildCreateTbReq(&pCxt->createTblReq, pName, row, pCxt->pTableMeta->suid); } static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 68e9961b6d..5c57c873ee 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -77,8 +77,57 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam return TSDB_CODE_SUCCESS; } +int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){ + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + if (NULL == tags) { + return TSDB_CODE_QRY_APP_ERROR; + } -int32_t qBindStmtData(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { + SKVRowBuilder tagBuilder; + if (tdInitKVRowBuilder(&tagBuilder) < 0) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta); + SKvParam param = {.builder = &tagBuilder}; + + for (int c = 0; c < tags->numOfBound; ++c) { + if (bind[c].is_null && bind[c].is_null[0]) { + KvRowAppend(&pBuf, NULL, 0, ¶m); + continue; + } + + SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1 + param.schema = pTagSchema; + + int32_t colLen = pTagSchema->bytes; + if (IS_VAR_DATA_TYPE(pTagSchema->type)) { + colLen = bind[c].length[0]; + } + + CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m)); + } + + SKVRow row = tdGetKVRowFromBuilder(&tagBuilder); + if (NULL == row) { + tdDestroyKVRowBuilder(&tagBuilder); + return buildInvalidOperationMsg(&pBuf, "tag value expected"); + } + tdSortKVRowByColIdx(row); + + SVCreateTbReq tbReq = {0}; + CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid)); + CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); + + destroyCreateSubTbReq(&tbReq); + tdDestroyKVRowBuilder(&tagBuilder); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock); SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;