feat(stmt2): initial commit for stmt2
This commit is contained in:
parent
084592dc34
commit
298c4b4812
|
@ -51,7 +51,7 @@ typedef void TAOS_SUB;
|
|||
#define TSDB_DATA_TYPE_BLOB 18 // binary
|
||||
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
|
||||
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
|
||||
#define TSDB_DATA_TYPE_GEOMETRY 20 // geometry
|
||||
#define TSDB_DATA_TYPE_GEOMETRY 20 // geometry
|
||||
#define TSDB_DATA_TYPE_MAX 21
|
||||
|
||||
typedef enum {
|
||||
|
@ -168,7 +168,7 @@ DLL_EXPORT const char *taos_data_type(int type);
|
|||
|
||||
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
||||
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
|
||||
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS* options);
|
||||
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options);
|
||||
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
||||
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
||||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||
|
@ -193,6 +193,49 @@ DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
|
|||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
|
||||
|
||||
typedef void TAOS_STMT2;
|
||||
|
||||
typedef enum {
|
||||
TAOS_FIELD_COL = 1,
|
||||
TAOS_FIELD_TAG,
|
||||
TAOS_FIELD_QUERY,
|
||||
} TAOS_FIELD_T;
|
||||
|
||||
typedef struct {
|
||||
int64_t reqid;
|
||||
bool singleStbInsert;
|
||||
bool singleTableBindOnce;
|
||||
__taos_async_fn_t asyncExecFn;
|
||||
void *userdata;
|
||||
} TAOS_STMT2_OPTION;
|
||||
|
||||
typedef struct {
|
||||
int buffer_type;
|
||||
void *buffer;
|
||||
uintptr_t buffer_length;
|
||||
int32_t *length;
|
||||
char *is_null;
|
||||
int num;
|
||||
} TAOS_STMT2_BIND;
|
||||
|
||||
typedef struct {
|
||||
int count;
|
||||
char **tbnames;
|
||||
TAOS_STMT2_BIND **tags;
|
||||
TAOS_STMT2_BIND **bind_cols;
|
||||
} TAOS_STMT2_BINDV;
|
||||
|
||||
DLL_EXPORT TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option);
|
||||
DLL_EXPORT int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
|
||||
DLL_EXPORT int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx);
|
||||
DLL_EXPORT int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows);
|
||||
DLL_EXPORT int taos_stmt2_close(TAOS_STMT2 *stmt);
|
||||
DLL_EXPORT int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert);
|
||||
DLL_EXPORT int taos_stmt2_get_fields(TAOS_STMT2 *stmt, TAOS_FIELD_T field_type, int *count, TAOS_FIELD_E **fields);
|
||||
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_E *fields);
|
||||
DLL_EXPORT TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt);
|
||||
DLL_EXPORT char *taos_stmt2_error(TAOS_STMT2 *stmt);
|
||||
|
||||
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
|
||||
DLL_EXPORT TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqId);
|
||||
|
||||
|
@ -245,14 +288,15 @@ DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
|
|||
|
||||
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
|
||||
|
||||
typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists);
|
||||
typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists,
|
||||
uint64_t *pWhiteLists);
|
||||
DLL_EXPORT void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param);
|
||||
|
||||
typedef enum {
|
||||
TAOS_CONN_MODE_BI = 0,
|
||||
} TAOS_CONN_MODE;
|
||||
|
||||
DLL_EXPORT int taos_set_conn_mode(TAOS* taos, int mode, int value);
|
||||
DLL_EXPORT int taos_set_conn_mode(TAOS *taos, int mode, int value);
|
||||
/* --------------------------schemaless INTERFACE------------------------------- */
|
||||
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||
|
@ -270,10 +314,13 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int
|
|||
int precision, int32_t ttl);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
|
||||
int protocol, int precision, int32_t ttl, int64_t reqid);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len, int32_t *totalRows,
|
||||
int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines, int protocol,
|
||||
int precision, int32_t ttl, int64_t reqid, char *tbnameKey);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid_tbname_key(TAOS *taos, char *lines, int len,
|
||||
int32_t *totalRows, int protocol,
|
||||
int precision, int32_t ttl, int64_t reqid,
|
||||
char *tbnameKey);
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl_with_reqid_tbname_key(TAOS *taos, char *lines[], int numLines,
|
||||
int protocol, int precision, int32_t ttl,
|
||||
int64_t reqid, char *tbnameKey);
|
||||
/* --------------------------TMQ INTERFACE------------------------------- */
|
||||
|
||||
typedef struct tmq_t tmq_t;
|
||||
|
@ -319,14 +366,17 @@ DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
|
|||
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
|
||||
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
||||
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); //Commit the msg’s offset + 1
|
||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); // Commit the msg’s offset + 1
|
||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
||||
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
|
||||
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
|
||||
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
|
||||
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset,
|
||||
tmq_commit_cb *cb, void *param);
|
||||
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
|
||||
int32_t *numOfAssignment);
|
||||
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment *pAssignment);
|
||||
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
||||
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1
|
||||
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName,
|
||||
int32_t vgId); // The current offset is the offset of the last consumed message + 1
|
||||
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
|
||||
|
||||
DLL_EXPORT TAOS *tmq_get_connect(tmq_t *tmq);
|
||||
|
@ -335,7 +385,7 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
|||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
|
||||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_err2str(int32_t code);
|
||||
|
||||
/* ------------------------------ TAOSX -----------------------------------*/
|
||||
|
@ -345,15 +395,16 @@ typedef struct tmq_raw_data {
|
|||
uint16_t raw_type;
|
||||
} tmq_raw_data;
|
||||
|
||||
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
||||
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
||||
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
|
||||
DLL_EXPORT int taos_write_raw_block_with_reqid(TAOS *taos, int numOfRows, char *pData, const char *tbname, int64_t reqid);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname,
|
||||
TAOS_FIELD *fields, int numFields);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields_with_reqid(TAOS *taos, int rows, char *pData, const char *tbname,
|
||||
TAOS_FIELD *fields, int numFields, int64_t reqid);
|
||||
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
||||
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
|
||||
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
|
||||
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
|
||||
DLL_EXPORT int taos_write_raw_block_with_reqid(TAOS *taos, int numOfRows, char *pData, const char *tbname,
|
||||
int64_t reqid);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname,
|
||||
TAOS_FIELD *fields, int numFields);
|
||||
DLL_EXPORT int taos_write_raw_block_with_fields_with_reqid(TAOS *taos, int rows, char *pData, const char *tbname,
|
||||
TAOS_FIELD *fields, int numFields, int64_t reqid);
|
||||
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
|
||||
|
||||
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
|
||||
|
@ -369,7 +420,7 @@ typedef enum {
|
|||
} TSDB_SERVER_STATUS;
|
||||
|
||||
DLL_EXPORT TSDB_SERVER_STATUS taos_check_server_status(const char *fqdn, int port, char *details, int maxlen);
|
||||
DLL_EXPORT char* getBuildInfo();
|
||||
DLL_EXPORT char *getBuildInfo();
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -378,6 +378,18 @@ typedef struct {
|
|||
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||
SArray *rowArray);
|
||||
|
||||
// stmt2 binding
|
||||
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen);
|
||||
|
||||
typedef struct {
|
||||
int32_t columnId;
|
||||
int32_t type;
|
||||
TAOS_STMT2_BIND *bind;
|
||||
} SBindInfo2;
|
||||
|
||||
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||
SArray *rowArray);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -20,9 +20,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "catalog.h"
|
||||
#include "query.h"
|
||||
#include "querynodes.h"
|
||||
#include "catalog.h"
|
||||
|
||||
typedef struct SStmtCallback {
|
||||
TAOS_STMT* pStmt;
|
||||
|
@ -37,15 +37,15 @@ typedef enum {
|
|||
} SParseResType;
|
||||
|
||||
typedef struct SParseSchemaRes {
|
||||
int8_t precision;
|
||||
int32_t numOfCols;
|
||||
SSchema* pSchema;
|
||||
int8_t precision;
|
||||
int32_t numOfCols;
|
||||
SSchema* pSchema;
|
||||
} SParseSchemaRes;
|
||||
|
||||
typedef struct SParseQueryRes {
|
||||
SNode* pQuery;
|
||||
SCatalogReq* pCatalogReq;
|
||||
SMetaData meta;
|
||||
SNode* pQuery;
|
||||
SCatalogReq* pCatalogReq;
|
||||
SMetaData meta;
|
||||
} SParseQueryRes;
|
||||
|
||||
typedef struct SParseSqlRes {
|
||||
|
@ -121,11 +121,13 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
|
|||
int32_t qInitKeywordsTable();
|
||||
void qCleanupKeywordsTable();
|
||||
|
||||
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo);
|
||||
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks);
|
||||
//int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum);
|
||||
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
|
||||
SStbInterlaceInfo* pBuildInfo);
|
||||
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks);
|
||||
// int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx,
|
||||
// int32_t tbNum);
|
||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
||||
int32_t qResetStmtColumns(SArray* pCols, bool deepClear);
|
||||
int32_t qResetStmtColumns(SArray* pCols, bool deepClear);
|
||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId,
|
||||
|
@ -136,39 +138,51 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData
|
|||
|
||||
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
|
||||
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
|
||||
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos);
|
||||
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo* pBindInfos);
|
||||
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||
int32_t rowNum);
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
int32_t colIdx, int32_t rowNum);
|
||||
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
|
||||
int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colIdx);
|
||||
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo2* pBindInfos);
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
int32_t colIdx, int32_t rowNum);
|
||||
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
|
||||
void destroyBoundColumnInfo(void* pBoundInfo);
|
||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||
int32_t msgBufLen);
|
||||
|
||||
void qDestroyBoundColInfo(void* pInfo);
|
||||
|
||||
int32_t smlInitHandle(SQuery** query);
|
||||
int32_t smlBuildRow(STableDataCxt* pTableCxt);
|
||||
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* kv, int32_t index);
|
||||
int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt);
|
||||
int32_t smlInitHandle(SQuery** query);
|
||||
int32_t smlBuildRow(STableDataCxt* pTableCxt);
|
||||
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* kv, int32_t index);
|
||||
int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt);
|
||||
|
||||
void clearColValArraySml(SArray* pCols);
|
||||
int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
|
||||
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
|
||||
char* msgBuf, int32_t msgBufLen);
|
||||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen);
|
||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* fields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen);
|
||||
|
||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
|
||||
int32_t serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
|
||||
void destoryCatalogReq(SCatalogReq *pCatalogReq);
|
||||
void destoryCatalogReq(SCatalogReq* pCatalogReq);
|
||||
bool isPrimaryKeyImpl(SNode* pExpr);
|
||||
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo);
|
||||
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx,
|
||||
SStbInterlaceInfo* pBuildInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -0,0 +1,233 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_CLIENTSTMT2_H
|
||||
#define TDENGINE_CLIENTSTMT2_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "catalog.h"
|
||||
/*
|
||||
typedef enum {
|
||||
STMT_TYPE_INSERT = 1,
|
||||
STMT_TYPE_MULTI_INSERT,
|
||||
STMT_TYPE_QUERY,
|
||||
} STMT_TYPE;
|
||||
|
||||
typedef enum {
|
||||
STMT_INIT = 1,
|
||||
STMT_PREPARE,
|
||||
STMT_SETTBNAME,
|
||||
STMT_SETTAGS,
|
||||
STMT_FETCH_FIELDS,
|
||||
STMT_BIND,
|
||||
STMT_BIND_COL,
|
||||
STMT_ADD_BATCH,
|
||||
STMT_EXECUTE,
|
||||
STMT_MAX,
|
||||
} STMT_STATUS;
|
||||
|
||||
#define STMT_TABLE_COLS_NUM 1000
|
||||
|
||||
typedef struct SStmtTableCache {
|
||||
STableDataCxt *pDataCtx;
|
||||
void *boundTags;
|
||||
} SStmtTableCache;
|
||||
|
||||
typedef struct SStmtQueryResInfo {
|
||||
TAOS_FIELD *fields;
|
||||
TAOS_FIELD *userFields;
|
||||
uint32_t numOfCols;
|
||||
int32_t precision;
|
||||
} SStmtQueryResInfo;
|
||||
|
||||
typedef struct SStmtBindInfo {
|
||||
bool needParse;
|
||||
bool inExecCache;
|
||||
uint64_t tbUid;
|
||||
uint64_t tbSuid;
|
||||
int32_t tbVgId;
|
||||
int32_t sBindRowNum;
|
||||
int32_t sBindLastIdx;
|
||||
int8_t tbType;
|
||||
bool tagsCached;
|
||||
void *boundTags;
|
||||
char tbName[TSDB_TABLE_FNAME_LEN];
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char stbFName[TSDB_TABLE_FNAME_LEN];
|
||||
SName sname;
|
||||
|
||||
char statbName[TSDB_TABLE_FNAME_LEN];
|
||||
} SStmtBindInfo;
|
||||
|
||||
typedef struct SStmtAsyncParam {
|
||||
STableColsData *pTbData;
|
||||
void* pStmt;
|
||||
} SStmtAsyncParam;
|
||||
|
||||
typedef struct SStmtExecInfo {
|
||||
int32_t affectedRows;
|
||||
SRequestObj *pRequest;
|
||||
SHashObj *pBlockHash;
|
||||
STableDataCxt *pCurrBlock;
|
||||
SSubmitTbData *pCurrTbData;
|
||||
} SStmtExecInfo;
|
||||
*/
|
||||
typedef struct {
|
||||
bool stbInterlaceMode;
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
uint64_t suid;
|
||||
uint64_t runTimes;
|
||||
SHashObj *pTableCache; // SHash<SStmtTableCache>
|
||||
SQuery *pQuery;
|
||||
char *sqlStr;
|
||||
int32_t sqlLen;
|
||||
SArray *nodeList;
|
||||
SStmtQueryResInfo queryRes;
|
||||
bool autoCreateTbl;
|
||||
SHashObj *pVgHash;
|
||||
SBindInfo2 *pBindInfo;
|
||||
|
||||
SStbInterlaceInfo siInfo;
|
||||
} SStmtSQLInfo2;
|
||||
/*
|
||||
typedef struct SStmtStatInfo {
|
||||
int64_t ctgGetTbMetaNum;
|
||||
int64_t getCacheTbInfo;
|
||||
int64_t parseSqlNum;
|
||||
int64_t bindDataNum;
|
||||
int64_t setTbNameUs;
|
||||
int64_t bindDataUs1;
|
||||
int64_t bindDataUs2;
|
||||
int64_t bindDataUs3;
|
||||
int64_t bindDataUs4;
|
||||
int64_t addBatchUs;
|
||||
int64_t execWaitUs;
|
||||
int64_t execUseUs;
|
||||
} SStmtStatInfo;
|
||||
|
||||
typedef struct SStmtQNode {
|
||||
bool restoreTbCols;
|
||||
STableColsData tblData;
|
||||
struct SStmtQNode* next;
|
||||
} SStmtQNode;
|
||||
|
||||
typedef struct SStmtQueue {
|
||||
bool stopQueue;
|
||||
SStmtQNode* head;
|
||||
SStmtQNode* tail;
|
||||
uint64_t qRemainNum;
|
||||
} SStmtQueue;
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
STscObj *taos;
|
||||
SCatalog *pCatalog;
|
||||
int32_t affectedRows;
|
||||
uint32_t seqId;
|
||||
uint32_t seqIds[STMT_MAX];
|
||||
bool bindThreadInUse;
|
||||
TdThread bindThread;
|
||||
TAOS_STMT2_OPTION options;
|
||||
bool stbInterlaceMode;
|
||||
SStmtQueue queue;
|
||||
|
||||
SStmtSQLInfo2 sql;
|
||||
SStmtExecInfo exec;
|
||||
SStmtBindInfo bInfo;
|
||||
|
||||
int64_t reqid;
|
||||
int32_t errCode;
|
||||
|
||||
SStmtStatInfo stat;
|
||||
} STscStmt2;
|
||||
/*
|
||||
extern char *gStmtStatusStr[];
|
||||
|
||||
#define STMT_LOG_SEQ(n) \
|
||||
do { \
|
||||
(pStmt)->seqId++; \
|
||||
(pStmt)->seqIds[n]++; \
|
||||
STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); \
|
||||
} while (0)
|
||||
|
||||
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
|
||||
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
|
||||
|
||||
#define STMT_ERR_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
pStmt->errCode = _code; \
|
||||
return _code; \
|
||||
} \
|
||||
} while (0)
|
||||
#define STMT_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
pStmt->errCode = _code; \
|
||||
} \
|
||||
return _code; \
|
||||
} while (0)
|
||||
#define STMT_ERR_JRET(c) \
|
||||
do { \
|
||||
code = c; \
|
||||
if (code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = code; \
|
||||
pStmt->errCode = code; \
|
||||
goto _return; \
|
||||
} \
|
||||
} while (0)
|
||||
#define STMT_ERRI_JRET(c) \
|
||||
do { \
|
||||
code = c; \
|
||||
if (code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = code; \
|
||||
goto _return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
#define STMT_FLOG(param, ...) qFatal("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||
|
||||
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
|
||||
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
|
||||
*/
|
||||
TAOS_STMT2 *stmtInit2(STscObj *taos, TAOS_STMT2_OPTION *pOptions);
|
||||
int stmtClose2(TAOS_STMT2 *stmt);
|
||||
int stmtExec2(TAOS_STMT2 *stmt, int *affected_rows);
|
||||
int stmtPrepare2(TAOS_STMT2 *stmt, const char *sql, unsigned long length);
|
||||
int stmtSetTbName2(TAOS_STMT2 *stmt, const char *tbName);
|
||||
int stmtSetTbTags2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *tags);
|
||||
int stmtGetTagFields2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_E **fields);
|
||||
int stmtGetColFields2(TAOS_STMT2 *stmt, int *nums, TAOS_FIELD_E **fields);
|
||||
int stmtIsInsert2(TAOS_STMT2 *stmt, int *insert);
|
||||
int stmtGetParamNum2(TAOS_STMT2 *stmt, int *nums);
|
||||
TAOS_RES *stmtUseResult2(TAOS_STMT2 *stmt);
|
||||
int stmtBindBatch2(TAOS_STMT2 *stmt, TAOS_STMT2_BIND *bind, int32_t colIdx);
|
||||
const char *stmtErrstr2(TAOS_STMT2 *stmt);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_CLIENTSTMT2_H
|
|
@ -16,19 +16,20 @@
|
|||
#include "catalog.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "clientStmt.h"
|
||||
#include "clientMonitor.h"
|
||||
#include "clientStmt.h"
|
||||
#include "clientStmt2.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "scheduler.h"
|
||||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "version.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
#define TSC_VAR_NOT_RELEASE 1
|
||||
#define TSC_VAR_RELEASED 0
|
||||
|
@ -124,7 +125,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
|||
}
|
||||
|
||||
STscObj *pObj = NULL;
|
||||
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
|
||||
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
if (NULL == rid) {
|
||||
|
@ -187,15 +188,15 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct SFetchWhiteListInfo{
|
||||
int64_t connId;
|
||||
typedef struct SFetchWhiteListInfo {
|
||||
int64_t connId;
|
||||
__taos_async_whitelist_fn_t userCbFn;
|
||||
void* userParam;
|
||||
void *userParam;
|
||||
} SFetchWhiteListInfo;
|
||||
|
||||
int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
SFetchWhiteListInfo* pInfo = (SFetchWhiteListInfo*)param;
|
||||
TAOS* taos = &pInfo->connId;
|
||||
int32_t fetchWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) {
|
||||
SFetchWhiteListInfo *pInfo = (SFetchWhiteListInfo *)param;
|
||||
TAOS *taos = &pInfo->connId;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
@ -213,7 +214,7 @@ int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
|
||||
uint64_t *pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
|
||||
if (pWhiteLists == NULL) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
|
@ -242,7 +243,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
|
|||
return;
|
||||
}
|
||||
|
||||
int64_t connId = *(int64_t*)taos;
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
|
||||
STscObj *pTsc = acquireTscObj(connId);
|
||||
if (NULL == pTsc) {
|
||||
|
@ -259,7 +260,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
|
|||
return;
|
||||
}
|
||||
|
||||
void* pReq = taosMemoryMalloc(msgLen);
|
||||
void *pReq = taosMemoryMalloc(msgLen);
|
||||
if (pReq == NULL) {
|
||||
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
|
||||
releaseTscObj(connId);
|
||||
|
@ -273,7 +274,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
|
|||
return;
|
||||
}
|
||||
|
||||
SFetchWhiteListInfo* pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
|
||||
SFetchWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
|
||||
if (pParam == NULL) {
|
||||
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
|
||||
taosMemoryFree(pReq);
|
||||
|
@ -284,9 +285,9 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
|
|||
pParam->connId = connId;
|
||||
pParam->userCbFn = fp;
|
||||
pParam->userParam = param;
|
||||
SMsgSendInfo* pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (pSendInfo == NULL) {
|
||||
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
|
||||
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
|
||||
taosMemoryFree(pParam);
|
||||
taosMemoryFree(pReq);
|
||||
releaseTscObj(connId);
|
||||
|
@ -301,7 +302,7 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
|
|||
pSendInfo->msgType = TDMT_MND_GET_USER_WHITELIST;
|
||||
|
||||
int64_t transportId = 0;
|
||||
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
|
||||
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
|
||||
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) {
|
||||
tscWarn("failed to async send msg to server");
|
||||
}
|
||||
|
@ -447,7 +448,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if(pRequest->inCallback) {
|
||||
if (pRequest->inCallback) {
|
||||
tscError("can not call taos_fetch_row before query callback ends.");
|
||||
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
return NULL;
|
||||
|
@ -458,7 +459,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
SReqResultInfo *pResultInfo = NULL;
|
||||
if (msg->common.resIter == -1) {
|
||||
if(tmqGetNextResInfo(res, true, &pResultInfo) != 0){
|
||||
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
|
@ -470,7 +471,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
pResultInfo->current += 1;
|
||||
return pResultInfo->row;
|
||||
} else {
|
||||
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0){
|
||||
if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -545,22 +546,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
|||
len += sprintf(str + len, "%lf", dv);
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_VARBINARY:{
|
||||
void* data = NULL;
|
||||
case TSDB_DATA_TYPE_VARBINARY: {
|
||||
void *data = NULL;
|
||||
uint32_t size = 0;
|
||||
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
|
||||
if(taosAscii2Hex(row[i], charLen, &data, &size) < 0){
|
||||
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
|
||||
if (taosAscii2Hex(row[i], charLen, &data, &size) < 0) {
|
||||
break;
|
||||
}
|
||||
(void)memcpy(str + len, data, size);
|
||||
len += size;
|
||||
taosMemoryFree(data);
|
||||
}break;
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_GEOMETRY: {
|
||||
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
|
||||
if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY || fields[i].type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
fields[i].type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
if (ASSERT(charLen <= fields[i].bytes && charLen >= 0)) {
|
||||
tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
|
||||
}
|
||||
|
@ -667,7 +669,8 @@ const char *taos_get_client_info() { return version; }
|
|||
|
||||
// return int32_t
|
||||
int taos_affected_rows(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
|
||||
TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -678,7 +681,8 @@ int taos_affected_rows(TAOS_RES *res) {
|
|||
|
||||
// return int64_t
|
||||
int64_t taos_affected_rows64(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
|
||||
TD_RES_TMQ_BATCH_META(res)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -728,7 +732,8 @@ int taos_select_db(TAOS *taos, const char *db) {
|
|||
}
|
||||
|
||||
void taos_stop_query(TAOS_RES *res) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res)) {
|
||||
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
|
||||
TD_RES_TMQ_BATCH_META(res)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -787,7 +792,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
|||
return pRequest->code;
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SReqResultInfo *pResultInfo = NULL;
|
||||
int32_t code = tmqGetNextResInfo(res, true, &pResultInfo);
|
||||
int32_t code = tmqGetNextResInfo(res, true, &pResultInfo);
|
||||
if (code != 0) return code;
|
||||
|
||||
pResultInfo->current = pResultInfo->numOfRows;
|
||||
|
@ -810,7 +815,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
|
||||
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SReqResultInfo *pResultInfo = NULL;
|
||||
int32_t code = tmqGetNextResInfo(res, false, &pResultInfo);
|
||||
int32_t code = tmqGetNextResInfo(res, false, &pResultInfo);
|
||||
if (code != 0) {
|
||||
(*numOfRows) = 0;
|
||||
return 0;
|
||||
|
@ -956,7 +961,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
|||
(void)memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
|
||||
(void)memset(pResultMeta, 0, sizeof(*pResultMeta));
|
||||
}
|
||||
|
||||
|
||||
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
|
||||
}
|
||||
|
||||
|
@ -1002,7 +1007,7 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
|
|||
}
|
||||
|
||||
pNewRequest->pQuery = NULL;
|
||||
code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pNewRequest->pQuery);
|
||||
code = nodesMakeNode(QUERY_NODE_QUERY, (SNode **)&pNewRequest->pQuery);
|
||||
if (pNewRequest->pQuery) {
|
||||
pNewRequest->pQuery->pRoot = pRoot;
|
||||
pRoot = NULL;
|
||||
|
@ -1274,7 +1279,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
(void)refreshMeta(pRequest->pTscObj, pRequest); //ignore return code,try again
|
||||
(void)refreshMeta(pRequest->pTscObj, pRequest); // ignore return code,try again
|
||||
pRequest->prevCode = code;
|
||||
doAsyncQuery(pRequest, true);
|
||||
return;
|
||||
|
@ -1288,7 +1293,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
|
||||
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
|
||||
tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
|
||||
SRequestObj* pUserReq = pRequest;
|
||||
SRequestObj *pUserReq = pRequest;
|
||||
(void)acquireRequest(pRequest->self);
|
||||
while (pUserReq) {
|
||||
if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
|
||||
|
@ -1634,7 +1639,6 @@ TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) {
|
|||
return pStmt;
|
||||
}
|
||||
|
||||
|
||||
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
|
||||
if (stmt == NULL || sql == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
|
@ -1877,7 +1881,161 @@ int taos_stmt_close(TAOS_STMT *stmt) {
|
|||
return stmtClose(stmt);
|
||||
}
|
||||
|
||||
int taos_set_conn_mode(TAOS* taos, int mode, int value) {
|
||||
TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option) {
|
||||
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
||||
if (NULL == pObj) {
|
||||
tscError("invalid parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_STMT2 *pStmt = stmtInit2(pObj, option);
|
||||
|
||||
releaseTscObj(*(int64_t *)taos);
|
||||
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length) {
|
||||
if (stmt == NULL || sql == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return stmtPrepare2(stmt, sql, length);
|
||||
}
|
||||
|
||||
int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx) {
|
||||
if (stmt == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
for (int i = 0; i < bindv->count; ++i) {
|
||||
char *tbname = bindv->tbnames[i];
|
||||
TAOS_STMT2_BIND *tags = bindv->tags[i];
|
||||
TAOS_STMT2_BIND *bind = bindv->bind_cols[i];
|
||||
|
||||
if (tbname) {
|
||||
code = stmtSetTbName2(stmt, tbname);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (tags) {
|
||||
code = stmtSetTbTags2(stmt, tags);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (bind) {
|
||||
if (bind->num <= 0 || bind->num > INT16_MAX) {
|
||||
tscError("invalid bind num %d", bind->num);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t insert = 0;
|
||||
(void)stmtIsInsert(stmt, &insert);
|
||||
if (0 == insert && bind->num > 1) {
|
||||
tscError("only one row data allowed for query");
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = stmtBindBatch2(stmt, bind, col_idx);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
|
||||
if (stmt == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return stmtExec2(stmt, affected_rows);
|
||||
}
|
||||
|
||||
int taos_stmt2_close(TAOS_STMT2 *stmt) {
|
||||
if (stmt == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return stmtClose2(stmt);
|
||||
}
|
||||
/*
|
||||
int taos_stmt2_param_count(TAOS_STMT2 *stmt, int *nums) {
|
||||
if (stmt == NULL || nums == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
return stmtGetParamNum2(stmt, nums);
|
||||
}
|
||||
*/
|
||||
int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert) {
|
||||
if (stmt == NULL || insert == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return stmtIsInsert2(stmt, insert);
|
||||
}
|
||||
|
||||
int taos_stmt2_get_fields(TAOS_STMT2 *stmt, TAOS_FIELD_T field_type, int *count, TAOS_FIELD_E **fields) {
|
||||
if (stmt == NULL || NULL == count) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (field_type == TAOS_FIELD_COL) {
|
||||
return stmtGetColFields2(stmt, count, fields);
|
||||
} else if (field_type == TAOS_FIELD_TAG) {
|
||||
return stmtGetTagFields2(stmt, count, fields);
|
||||
} else if (field_type == TAOS_FIELD_QUERY) {
|
||||
return stmtGetParamNum2(stmt, count);
|
||||
} else {
|
||||
tscError("invalid parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_E *fields) {
|
||||
(void)stmt;
|
||||
if (!fields) return;
|
||||
taosMemoryFree(fields);
|
||||
}
|
||||
|
||||
TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt) {
|
||||
if (stmt == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return stmtUseResult2(stmt);
|
||||
}
|
||||
|
||||
char *taos_stmt2_error(TAOS_STMT2 *stmt) { return (char *)stmtErrstr2(stmt); }
|
||||
|
||||
int taos_set_conn_mode(TAOS *taos, int mode, int value) {
|
||||
if (taos == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
|
@ -1900,6 +2058,4 @@ int taos_set_conn_mode(TAOS* taos, int mode, int value) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
char* getBuildInfo(){
|
||||
return buildinfo;
|
||||
}
|
||||
char *getBuildInfo() { return buildinfo; }
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -3119,6 +3119,143 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) {
|
||||
ASSERT(pColData->type == pBind->buffer_type);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
|
||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||
if (pBind->is_null && pBind->is_null[i]) {
|
||||
if (pColData->cflag & COL_IS_KEY) {
|
||||
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
goto _exit;
|
||||
}
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||
if (code) goto _exit;
|
||||
} else if (pBind->length[i] > buffMaxLen) {
|
||||
uError("var data length too big, len:%d, max:%d", pBind->length[i], buffMaxLen);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
} else {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||
pColData, (uint8_t *)pBind->buffer + pBind->buffer_length * i, pBind->length[i]);
|
||||
}
|
||||
}
|
||||
} else { // fixed-length data type
|
||||
bool allValue;
|
||||
bool allNull;
|
||||
if (pBind->is_null) {
|
||||
bool same = (memcmp(pBind->is_null, pBind->is_null + 1, pBind->num - 1) == 0);
|
||||
allNull = (same && pBind->is_null[0] != 0);
|
||||
allValue = (same && pBind->is_null[0] == 0);
|
||||
} else {
|
||||
allNull = false;
|
||||
allValue = true;
|
||||
}
|
||||
|
||||
if ((pColData->cflag & COL_IS_KEY) && !allValue) {
|
||||
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (allValue) {
|
||||
// optimize (todo)
|
||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||
pColData, (uint8_t *)pBind->buffer + TYPE_BYTES[pColData->type] * i, pBind->buffer_length);
|
||||
}
|
||||
} else if (allNull) {
|
||||
// optimize (todo)
|
||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||
if (pBind->is_null[i]) {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||
pColData, (uint8_t *)pBind->buffer + TYPE_BYTES[pColData->type] * i, pBind->buffer_length);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
/* build rows to `rowArray` from bind
|
||||
* `infos` is the bind information array
|
||||
* `numOfInfos` is the number of bind information
|
||||
* `infoSorted` is whether the bind information is sorted by column id
|
||||
* `pTSchema` is the schema of the table
|
||||
* `rowArray` is the array to store the rows
|
||||
*/
|
||||
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||
SArray *rowArray) {
|
||||
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (!infoSorted) {
|
||||
taosqsort_r(infos, numOfInfos, sizeof(SBindInfo), NULL, tBindInfoCompare);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t numOfRows = infos[0].bind->num;
|
||||
SArray *colValArray;
|
||||
SColVal colVal;
|
||||
|
||||
if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
|
||||
taosArrayClear(colValArray);
|
||||
|
||||
for (int32_t iInfo = 0; iInfo < numOfInfos; iInfo++) {
|
||||
if (infos[iInfo].bind->is_null && infos[iInfo].bind->is_null[iRow]) {
|
||||
colVal = COL_VAL_NULL(infos[iInfo].columnId, infos[iInfo].type);
|
||||
} else {
|
||||
SValue value = {
|
||||
.type = infos[iInfo].type,
|
||||
};
|
||||
if (IS_VAR_DATA_TYPE(infos[iInfo].type)) {
|
||||
value.nData = infos[iInfo].bind->length[iRow];
|
||||
value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow;
|
||||
} else {
|
||||
(void)memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow,
|
||||
infos[iInfo].bind->buffer_length);
|
||||
}
|
||||
colVal = COL_VAL_VALUE(infos[iInfo].columnId, value);
|
||||
}
|
||||
if (taosArrayPush(colValArray, &colVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
SRow *row;
|
||||
if ((code = tRowBuild(colValArray, pTSchema, &row))) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if ((taosArrayPush(rowArray, &row)) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(colValArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -448,7 +448,348 @@ int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bi
|
|||
pBind = bind;
|
||||
}
|
||||
|
||||
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
|
||||
code = tColDataAddValueByBind(pCol, pBind,
|
||||
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
|
||||
|
||||
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(ncharBind.buffer);
|
||||
taosMemoryFree(ncharBind.length);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t convertStmtNcharCol2(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_STMT2_BIND* src, TAOS_STMT2_BIND* dst) {
|
||||
int32_t output = 0;
|
||||
int32_t newBuflen = (pSchema->bytes - VARSTR_HEADER_SIZE) * src->num;
|
||||
if (dst->buffer_length < newBuflen) {
|
||||
dst->buffer = taosMemoryRealloc(dst->buffer, newBuflen);
|
||||
if (NULL == dst->buffer) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == dst->length) {
|
||||
dst->length = taosMemoryRealloc(dst->length, sizeof(int32_t) * src->num);
|
||||
if (NULL == dst->length) {
|
||||
taosMemoryFreeClear(dst->buffer);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
dst->buffer_length = pSchema->bytes - VARSTR_HEADER_SIZE;
|
||||
|
||||
for (int32_t i = 0; i < src->num; ++i) {
|
||||
if (src->is_null && src->is_null[i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!taosMbsToUcs4(((char*)src->buffer) + src->buffer_length * i, src->length[i],
|
||||
(TdUcs4*)(((char*)dst->buffer) + dst->buffer_length * i), dst->buffer_length, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
|
||||
}
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return buildSyntaxErrMsg(pMsgBuf, buf, NULL);
|
||||
}
|
||||
|
||||
dst->length[i] = output;
|
||||
}
|
||||
|
||||
dst->buffer_type = src->buffer_type;
|
||||
dst->is_null = src->is_null;
|
||||
dst->num = src->num;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
STSchema** pTSchema, SBindInfo2* pBindInfos) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t rowNum = bind->num;
|
||||
TAOS_STMT2_BIND ncharBind = {0};
|
||||
TAOS_STMT2_BIND* pBind = NULL;
|
||||
int32_t code = 0;
|
||||
int16_t lastColId = -1;
|
||||
bool colInOrder = true;
|
||||
|
||||
if (NULL == *pTSchema) {
|
||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||
}
|
||||
|
||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||
if (pColSchema->colId <= lastColId) {
|
||||
colInOrder = false;
|
||||
} else {
|
||||
lastColId = pColSchema->colId;
|
||||
}
|
||||
// SColData* pCol = taosArrayGet(pCols, c);
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
|
||||
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
|
||||
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
|
||||
code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
pBind = &ncharBind;
|
||||
} else {
|
||||
pBind = bind + c;
|
||||
}
|
||||
|
||||
pBindInfos[c].columnId = pColSchema->colId;
|
||||
pBindInfos[c].bind = pBind;
|
||||
pBindInfos[c].type = pColSchema->type;
|
||||
|
||||
// code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes -
|
||||
// VARSTR_HEADER_SIZE: -1); if (code) {
|
||||
// goto _return;
|
||||
// }
|
||||
}
|
||||
|
||||
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
|
||||
|
||||
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(ncharBind.buffer);
|
||||
taosMemoryFree(ncharBind.length);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SBoundColInfo* tags = (SBoundColInfo*)boundTags;
|
||||
if (NULL == tags) {
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
}
|
||||
|
||||
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||
if (!tagName) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
goto end;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
|
||||
|
||||
bool isJson = false;
|
||||
STag* pTag = NULL;
|
||||
|
||||
for (int c = 0; c < tags->numOfBound; ++c) {
|
||||
if (bind[c].is_null && bind[c].is_null[0]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
|
||||
int32_t colLen = pTagSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||
colLen = bind[c].length[0];
|
||||
if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
if (NULL == taosArrayPush(tagName, pTagSchema->name)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
|
||||
if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||
code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
|
||||
goto end;
|
||||
}
|
||||
|
||||
isJson = true;
|
||||
char* tmp = taosMemoryCalloc(1, colLen + 1);
|
||||
memcpy(tmp, bind[c].buffer, colLen);
|
||||
code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
|
||||
taosMemoryFree(tmp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
} else {
|
||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||
// strcpy(val.colName, pTagSchema->name);
|
||||
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||
pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||
val.pData = (uint8_t*)bind[c].buffer;
|
||||
val.nData = colLen;
|
||||
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
int32_t output = 0;
|
||||
void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
taosMemoryFree(p);
|
||||
code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
|
||||
goto end;
|
||||
}
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
|
||||
taosMemoryFree(p);
|
||||
code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
|
||||
goto end;
|
||||
}
|
||||
val.pData = p;
|
||||
val.nData = output;
|
||||
} else {
|
||||
memcpy(&val.i64, bind[c].buffer, colLen);
|
||||
}
|
||||
if (NULL == taosArrayPush(pTagArray, &val)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (NULL == pDataBlock->pData->pCreateTbReq) {
|
||||
pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pDataBlock->pData->pCreateTbReq) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
|
||||
pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
|
||||
pTag = NULL;
|
||||
|
||||
end:
|
||||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
||||
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
taosMemoryFreeClear(p->pData);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosArrayDestroy(tagName);
|
||||
taosMemoryFree(pTag);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmtColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t rowNum = bind->num;
|
||||
TAOS_STMT2_BIND ncharBind = {0};
|
||||
TAOS_STMT2_BIND* pBind = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||
SColData* pCol = taosArrayGet(pCols, c);
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) &&
|
||||
bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
|
||||
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
|
||||
code = convertStmtNcharCol2(&pBuf, pColSchema, bind + c, &ncharBind);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
pBind = &ncharBind;
|
||||
} else {
|
||||
pBind = bind + c;
|
||||
}
|
||||
|
||||
code = tColDataAddValueByBind2(pCol, pBind,
|
||||
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
||||
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(ncharBind.buffer);
|
||||
taosMemoryFree(ncharBind.length);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmtSingleColValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bind, char* msgBuf, int32_t msgBufLen,
|
||||
int32_t colIdx, int32_t rowNum) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
|
||||
SColData* pCol = taosArrayGet(pCols, colIdx);
|
||||
TAOS_STMT2_BIND ncharBind = {0};
|
||||
TAOS_STMT2_BIND* pBind = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
if (bind->num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
// Column index exceeds the number of columns
|
||||
if (colIdx >= pCols->size && pCol == NULL) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
|
||||
}
|
||||
|
||||
if (bind->buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
|
||||
code = convertStmtNcharCol2(&pBuf, pColSchema, bind, &ncharBind);
|
||||
if (code) {
|
||||
goto _return;
|
||||
}
|
||||
pBind = &ncharBind;
|
||||
} else {
|
||||
pBind = bind;
|
||||
}
|
||||
|
||||
code = tColDataAddValueByBind2(pCol, pBind,
|
||||
IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
|
||||
|
||||
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);
|
||||
|
||||
|
@ -533,7 +874,7 @@ int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
|
|||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pCols, i);
|
||||
if (pCol == NULL){
|
||||
if (pCol == NULL) {
|
||||
qError("qResetStmtColumns column is NULL");
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -553,7 +894,7 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
|
|||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
|
||||
if (pCol == NULL){
|
||||
if (pCol == NULL) {
|
||||
qError("qResetStmtDataBlock column is NULL");
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -76,33 +76,33 @@ bool qIsCreateTbFromFileSql(const char* pStr, size_t length) {
|
|||
}
|
||||
|
||||
bool qParseDbName(const char* pStr, size_t length, char** pDbName) {
|
||||
(void) length;
|
||||
(void)length;
|
||||
int32_t index = 0;
|
||||
SToken t;
|
||||
SToken t;
|
||||
|
||||
if (NULL == pStr) {
|
||||
*pDbName = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
t = tStrGetToken((char *) pStr, &index, false, NULL);
|
||||
t = tStrGetToken((char*)pStr, &index, false, NULL);
|
||||
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
|
||||
*pDbName = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
t = tStrGetToken((char *) pStr, &index, false, NULL);
|
||||
t = tStrGetToken((char*)pStr, &index, false, NULL);
|
||||
if (TK_INTO != t.type) {
|
||||
*pDbName = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
t = tStrGetToken((char *) pStr, &index, false, NULL);
|
||||
t = tStrGetToken((char*)pStr, &index, false, NULL);
|
||||
if (t.n == 0 || t.z == NULL) {
|
||||
*pDbName = NULL;
|
||||
return false;
|
||||
}
|
||||
char *dotPos = strnchr(t.z, '.', t.n, true);
|
||||
char* dotPos = strnchr(t.z, '.', t.n, true);
|
||||
if (dotPos != NULL) {
|
||||
int dbNameLen = dotPos - t.z;
|
||||
*pDbName = taosMemoryMalloc(dbNameLen + 1);
|
||||
|
@ -331,13 +331,12 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, SSDataBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static void destoryTablesReq(void *p) {
|
||||
STablesReq *pRes = (STablesReq *)p;
|
||||
static void destoryTablesReq(void* p) {
|
||||
STablesReq* pRes = (STablesReq*)p;
|
||||
taosArrayDestroy(pRes->pTables);
|
||||
}
|
||||
|
||||
void destoryCatalogReq(SCatalogReq *pCatalogReq) {
|
||||
void destoryCatalogReq(SCatalogReq* pCatalogReq) {
|
||||
if (NULL == pCatalogReq) {
|
||||
return;
|
||||
}
|
||||
|
@ -369,7 +368,6 @@ void destoryCatalogReq(SCatalogReq *pCatalogReq) {
|
|||
taosArrayDestroy(pCatalogReq->pTableTag);
|
||||
}
|
||||
|
||||
|
||||
void tfreeSParseQueryRes(void* p) {
|
||||
if (NULL == p) {
|
||||
return;
|
||||
|
@ -410,9 +408,7 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t qInitKeywordsTable() {
|
||||
return taosInitKeywordsTable();
|
||||
}
|
||||
int32_t qInitKeywordsTable() { return taosInitKeywordsTable(); }
|
||||
|
||||
void qCleanupKeywordsTable() { taosCleanupKeywordsTable(); }
|
||||
|
||||
|
@ -445,6 +441,98 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t setValueByBindParam2(SValueNode* pVal, TAOS_STMT2_BIND* pParam) {
|
||||
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
|
||||
taosMemoryFreeClear(pVal->datum.p);
|
||||
}
|
||||
|
||||
if (pParam->is_null && 1 == *(pParam->is_null)) {
|
||||
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
|
||||
pVal->node.resType.type = pParam->buffer_type;
|
||||
pVal->node.resType.bytes = inputSize;
|
||||
|
||||
switch (pParam->buffer_type) {
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
||||
if (NULL == pVal->datum.p) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
|
||||
memcpy(varDataVal(pVal->datum.p), pParam->buffer, pVal->node.resType.bytes);
|
||||
pVal->node.resType.bytes += VARSTR_HEADER_SIZE;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_GEOMETRY:
|
||||
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
||||
if (NULL == pVal->datum.p) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
|
||||
strncpy(varDataVal(pVal->datum.p), (const char*)pParam->buffer, pVal->node.resType.bytes);
|
||||
pVal->node.resType.bytes += VARSTR_HEADER_SIZE;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
pVal->node.resType.bytes *= TSDB_NCHAR_SIZE;
|
||||
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
||||
if (NULL == pVal->datum.p) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t output = 0;
|
||||
if (!taosMbsToUcs4(pParam->buffer, inputSize, (TdUcs4*)varDataVal(pVal->datum.p), pVal->node.resType.bytes,
|
||||
&output)) {
|
||||
return errno;
|
||||
}
|
||||
varDataSetLen(pVal->datum.p, output);
|
||||
pVal->node.resType.bytes = output + VARSTR_HEADER_SIZE;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
int32_t code = nodesSetValueNodeValue(pVal, pParam->buffer);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pVal->translate = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qStmtBindParams2(SQuery* pQuery, TAOS_STMT2_BIND* pParams, int32_t colIdx) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (colIdx < 0) {
|
||||
int32_t size = taosArrayGetSize(pQuery->pPlaceholderValues);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
code = setValueByBindParam2((SValueNode*)taosArrayGetP(pQuery->pPlaceholderValues, i), pParams + i);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
code = setValueByBindParam2((SValueNode*)taosArrayGetP(pQuery->pPlaceholderValues, colIdx), pParams);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && (colIdx < 0 || colIdx + 1 == pQuery->placeholderNum)) {
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = NULL;
|
||||
code = nodesCloneNode(pQuery->pPrepareRoot, &pQuery->pRoot);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
code = code;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
rewriteExprAlias(pQuery->pRoot);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = translate(pCxt, pQuery, NULL);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
Loading…
Reference in New Issue