stmt
This commit is contained in:
parent
9547c0a09b
commit
1c499c94a7
|
@ -51,7 +51,6 @@ typedef struct SStmtBindInfo {
|
||||||
void* boundTags;
|
void* boundTags;
|
||||||
char* tbName;
|
char* tbName;
|
||||||
SName sname;
|
SName sname;
|
||||||
TAOS_BIND* bindTags;
|
|
||||||
} SStmtBindInfo;
|
} SStmtBindInfo;
|
||||||
|
|
||||||
typedef struct SStmtExecInfo {
|
typedef struct SStmtExecInfo {
|
||||||
|
@ -78,11 +77,6 @@ typedef struct STscStmt {
|
||||||
SStmtSQLInfo sql;
|
SStmtSQLInfo sql;
|
||||||
SStmtExecInfo exec;
|
SStmtExecInfo exec;
|
||||||
SStmtBindInfo bind;
|
SStmtBindInfo bind;
|
||||||
|
|
||||||
//SMultiTbStmt mtb;
|
|
||||||
//SNormalStmt normal;
|
|
||||||
|
|
||||||
//int numOfRows;
|
|
||||||
} STscStmt;
|
} STscStmt;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ void stmtResetDataBlock(STableDataBlocks* pBlock) {
|
||||||
pBlock->size = sizeof(SSubmitBlk);
|
pBlock->size = sizeof(SSubmitBlk);
|
||||||
pBlock->tsSource = -1;
|
pBlock->tsSource = -1;
|
||||||
pBlock->numOfTables = 1;
|
pBlock->numOfTables = 1;
|
||||||
pBlock->nAllocSize = 0;
|
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
|
||||||
pBlock->headerSize = pBlock->size;
|
pBlock->headerSize = pBlock->size;
|
||||||
|
|
||||||
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
|
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
|
||||||
|
@ -237,6 +237,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
STableDataBlocks* pNewBlock = NULL;
|
STableDataBlocks* pNewBlock = NULL;
|
||||||
STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock));
|
STMT_ERR_RET(stmtCloneDataBlock(&pNewBlock, pCache->pDataBlock));
|
||||||
|
|
||||||
|
pNewBlock->pData = taosMemoryMalloc(pNewBlock->nAllocSize);
|
||||||
|
if (NULL == pNewBlock->pData) {
|
||||||
|
stmtFreeDataBlock(pNewBlock);
|
||||||
|
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) {
|
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid), &pNewBlock, POINTER_BYTES)) {
|
||||||
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -284,7 +290,7 @@ int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
|
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
|
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
|
||||||
|
@ -304,20 +310,23 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int stmtSetTbTags(TAOS_STMT *stmt, const char *tbName, TAOS_BIND *tags) {
|
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_BIND *tags) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
|
STMT_SWITCH_STATUS(stmt, STMT_SETTBNAME, TSDB_CODE_TSC_STMT_API_ERROR);
|
||||||
|
|
||||||
if (pStmt->bind.needParse) {
|
if (pStmt->bind.needParse) {
|
||||||
taosMemoryFree(pStmt->bind.bindTags);
|
|
||||||
pStmt->bind.bindTags = tags;
|
|
||||||
|
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
} else {
|
|
||||||
//TODO BIND TAG DATA
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableDataBlocks *pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bind.tbUid, sizeof(pStmt->bind.tbUid));
|
||||||
|
if (NULL == pDataBlock) {
|
||||||
|
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bind.tbUid);
|
||||||
|
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
STMT_ERR_RET(qBindStmtTagsValue(pDataBlock, pStmt->bind.boundTags, pStmt->bind.tbSuid, &pStmt->bind.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +404,7 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
||||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
qBindStmtData(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
|
qBindStmtColsValue(pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -753,14 +753,14 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void *value, int32_t len, voi
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, SKVRow row) {
|
static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRow row, int64_t suid) {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pName, dbFName);
|
tNameGetFullDbName(pName, dbFName);
|
||||||
pCxt->createTblReq.type = TD_CHILD_TABLE;
|
pTbReq->type = TD_CHILD_TABLE;
|
||||||
pCxt->createTblReq.dbFName = strdup(dbFName);
|
pTbReq->dbFName = strdup(dbFName);
|
||||||
pCxt->createTblReq.name = strdup(pName->tname);
|
pTbReq->name = strdup(pName->tname);
|
||||||
pCxt->createTblReq.ctbCfg.suid = pCxt->pTableMeta->suid;
|
pTbReq->ctbCfg.suid = suid;
|
||||||
pCxt->createTblReq.ctbCfg.pTag = row;
|
pTbReq->ctbCfg.pTag = row;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -773,21 +773,40 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
||||||
|
|
||||||
SKvParam param = {.builder = &pCxt->tagsBuilder};
|
SKvParam param = {.builder = &pCxt->tagsBuilder};
|
||||||
SToken sToken;
|
SToken sToken;
|
||||||
|
bool isParseBindParam = false;
|
||||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||||
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
|
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
|
||||||
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
||||||
|
|
||||||
|
if (sToken.type == TK_NK_QUESTION) {
|
||||||
|
isParseBindParam = true;
|
||||||
|
if (NULL == pCxt->pStmtCb) {
|
||||||
|
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isParseBindParam) {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
|
||||||
|
}
|
||||||
|
|
||||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i] - 1]; // colId starts with 1
|
||||||
param.schema = pTagSchema;
|
param.schema = pTagSchema;
|
||||||
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, ¶m, &pCxt->msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isParseBindParam) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||||
if (NULL == row) {
|
if (NULL == row) {
|
||||||
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
|
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
|
||||||
}
|
}
|
||||||
tdSortKVRowByColIdx(row);
|
tdSortKVRowByColIdx(row);
|
||||||
|
|
||||||
return buildCreateTbReq(pCxt, pName, row);
|
return buildCreateTbReq(&pCxt->createTblReq, pName, row, pCxt->pTableMeta->suid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
||||||
|
|
|
@ -77,8 +77,57 @@ int32_t qCreateSName(SName* pName, char* pTableName, int32_t acctId, char* dbNam
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qBindStmtTagsValue(STableDataBlocks *pDataBlock, void *boundTags, int64_t suid, SName *pName, TAOS_BIND *bind, char *msgBuf, int32_t msgBufLen){
|
||||||
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
|
||||||
|
if (NULL == tags) {
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qBindStmtData(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
|
SKVRowBuilder tagBuilder;
|
||||||
|
if (tdInitKVRowBuilder(&tagBuilder) < 0) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
|
||||||
|
SKvParam param = {.builder = &tagBuilder};
|
||||||
|
|
||||||
|
for (int c = 0; c < tags->numOfBound; ++c) {
|
||||||
|
if (bind[c].is_null && bind[c].is_null[0]) {
|
||||||
|
KvRowAppend(&pBuf, NULL, 0, ¶m);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchema* pTagSchema = &pSchema[tags->boundColumns[c] - 1]; // colId starts with 1
|
||||||
|
param.schema = pTagSchema;
|
||||||
|
|
||||||
|
int32_t colLen = pTagSchema->bytes;
|
||||||
|
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||||
|
colLen = bind[c].length[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
CHECK_CODE(KvRowAppend(&pBuf, (char *)bind[c].buffer, colLen, ¶m));
|
||||||
|
}
|
||||||
|
|
||||||
|
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
|
||||||
|
if (NULL == row) {
|
||||||
|
tdDestroyKVRowBuilder(&tagBuilder);
|
||||||
|
return buildInvalidOperationMsg(&pBuf, "tag value expected");
|
||||||
|
}
|
||||||
|
tdSortKVRowByColIdx(row);
|
||||||
|
|
||||||
|
SVCreateTbReq tbReq = {0};
|
||||||
|
CHECK_CODE(buildCreateTbReq(&tbReq, pName, row, suid));
|
||||||
|
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));
|
||||||
|
|
||||||
|
destroyCreateSubTbReq(&tbReq);
|
||||||
|
tdDestroyKVRowBuilder(&tagBuilder);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qBindStmtColsValue(STableDataBlocks *pDataBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) {
|
||||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||||
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||||
|
|
Loading…
Reference in New Issue