enh: optimize stmt performance
This commit is contained in:
parent
5887b3753e
commit
c0e4ab20a4
|
@ -150,6 +150,12 @@ typedef struct TAOS_DB_ROUTE_INFO {
|
||||||
TAOS_VGROUP_HASH_INFO *vgHash;
|
TAOS_VGROUP_HASH_INFO *vgHash;
|
||||||
} TAOS_DB_ROUTE_INFO;
|
} TAOS_DB_ROUTE_INFO;
|
||||||
|
|
||||||
|
typedef struct TAOS_STMT_OPTIONS {
|
||||||
|
int64_t reqId;
|
||||||
|
bool singleStbInsert;
|
||||||
|
bool singleTableBindOnce;
|
||||||
|
} TAOS_STMT_OPTIONS;
|
||||||
|
|
||||||
DLL_EXPORT void taos_cleanup(void);
|
DLL_EXPORT void taos_cleanup(void);
|
||||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||||
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
DLL_EXPORT setConfRet taos_set_config(const char *config);
|
||||||
|
@ -162,6 +168,7 @@ DLL_EXPORT const char *taos_data_type(int type);
|
||||||
|
|
||||||
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
|
||||||
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
|
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
|
||||||
|
DLL_EXPORT TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS* options);
|
||||||
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
|
||||||
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
|
||||||
|
|
|
@ -118,7 +118,11 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p
|
||||||
int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
|
int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
|
||||||
void qCleanupKeywordsTable();
|
void qCleanupKeywordsTable();
|
||||||
|
|
||||||
|
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo);
|
||||||
|
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks);
|
||||||
|
//int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum);
|
||||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
||||||
|
int32_t qResetStmtColumns(SArray* pCols, bool deepClear);
|
||||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
|
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
|
||||||
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
|
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
|
||||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId,
|
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId,
|
||||||
|
@ -129,8 +133,8 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData
|
||||||
|
|
||||||
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
|
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
|
||||||
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
|
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
|
||||||
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||||
int32_t rowNum);
|
int32_t rowNum);
|
||||||
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
|
||||||
|
@ -160,6 +164,7 @@ SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
||||||
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
|
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
|
||||||
void destoryCatalogReq(SCatalogReq *pCatalogReq);
|
void destoryCatalogReq(SCatalogReq *pCatalogReq);
|
||||||
bool isPrimaryKeyImpl(SNode* pExpr);
|
bool isPrimaryKeyImpl(SNode* pExpr);
|
||||||
|
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tsimplehash.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
|
||||||
|
@ -193,6 +194,27 @@ typedef struct SBoundColInfo {
|
||||||
int32_t numOfBound;
|
int32_t numOfBound;
|
||||||
} SBoundColInfo;
|
} SBoundColInfo;
|
||||||
|
|
||||||
|
typedef struct STableColsData {
|
||||||
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
|
SArray* aCol;
|
||||||
|
bool getFromHash;
|
||||||
|
} STableColsData;
|
||||||
|
|
||||||
|
typedef struct STableVgUid {
|
||||||
|
uint64_t uid;
|
||||||
|
int32_t vgid;
|
||||||
|
} STableVgUid;
|
||||||
|
|
||||||
|
typedef struct STableBufInfo {
|
||||||
|
void* pCurBuff;
|
||||||
|
SArray* pBufList;
|
||||||
|
int64_t buffUnit;
|
||||||
|
int64_t buffSize;
|
||||||
|
int64_t buffIdx;
|
||||||
|
int64_t buffOffset;
|
||||||
|
} STableBufInfo;
|
||||||
|
|
||||||
|
|
||||||
typedef struct STableDataCxt {
|
typedef struct STableDataCxt {
|
||||||
STableMeta* pMeta;
|
STableMeta* pMeta;
|
||||||
STSchema* pSchema;
|
STSchema* pSchema;
|
||||||
|
@ -204,6 +226,32 @@ typedef struct STableDataCxt {
|
||||||
bool duplicateTs;
|
bool duplicateTs;
|
||||||
} STableDataCxt;
|
} STableDataCxt;
|
||||||
|
|
||||||
|
typedef struct SStbInterlaceInfo {
|
||||||
|
void* pCatalog;
|
||||||
|
void* pQuery;
|
||||||
|
int32_t acctId;
|
||||||
|
char* dbname;
|
||||||
|
void* transport;
|
||||||
|
SEpSet mgmtEpSet;
|
||||||
|
void* pRequest;
|
||||||
|
uint64_t requestId;
|
||||||
|
int64_t requestSelf;
|
||||||
|
bool tbFromHash;
|
||||||
|
SHashObj* pVgroupHash;
|
||||||
|
SArray* pVgroupList;
|
||||||
|
SSHashObj* pTableHash;
|
||||||
|
int64_t tbRemainNum;
|
||||||
|
STableBufInfo tbBuf;
|
||||||
|
char firstName[TSDB_TABLE_NAME_LEN];
|
||||||
|
|
||||||
|
STableDataCxt *pDataCtx;
|
||||||
|
void *boundTags;
|
||||||
|
|
||||||
|
SArray *pTableCols;
|
||||||
|
int32_t pTableColsIdx;
|
||||||
|
} SStbInterlaceInfo;
|
||||||
|
|
||||||
|
|
||||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
|
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
|
||||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,8 @@ typedef enum {
|
||||||
STMT_MAX,
|
STMT_MAX,
|
||||||
} STMT_STATUS;
|
} STMT_STATUS;
|
||||||
|
|
||||||
|
#define STMT_TABLE_COLS_NUM 1000
|
||||||
|
|
||||||
typedef struct SStmtTableCache {
|
typedef struct SStmtTableCache {
|
||||||
STableDataCxt *pDataCtx;
|
STableDataCxt *pDataCtx;
|
||||||
void *boundTags;
|
void *boundTags;
|
||||||
|
@ -57,6 +59,7 @@ typedef struct SStmtBindInfo {
|
||||||
bool inExecCache;
|
bool inExecCache;
|
||||||
uint64_t tbUid;
|
uint64_t tbUid;
|
||||||
uint64_t tbSuid;
|
uint64_t tbSuid;
|
||||||
|
int32_t tbVgId;
|
||||||
int32_t sBindRowNum;
|
int32_t sBindRowNum;
|
||||||
int32_t sBindLastIdx;
|
int32_t sBindLastIdx;
|
||||||
int8_t tbType;
|
int8_t tbType;
|
||||||
|
@ -66,8 +69,15 @@ typedef struct SStmtBindInfo {
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
char stbFName[TSDB_TABLE_FNAME_LEN];
|
char stbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
SName sname;
|
SName sname;
|
||||||
|
|
||||||
|
char statbName[TSDB_TABLE_FNAME_LEN];
|
||||||
} SStmtBindInfo;
|
} SStmtBindInfo;
|
||||||
|
|
||||||
|
typedef struct SStmtAsyncParam {
|
||||||
|
STableColsData *pTbData;
|
||||||
|
void* pStmt;
|
||||||
|
} SStmtAsyncParam;
|
||||||
|
|
||||||
typedef struct SStmtExecInfo {
|
typedef struct SStmtExecInfo {
|
||||||
int32_t affectedRows;
|
int32_t affectedRows;
|
||||||
SRequestObj *pRequest;
|
SRequestObj *pRequest;
|
||||||
|
@ -77,8 +87,10 @@ typedef struct SStmtExecInfo {
|
||||||
} SStmtExecInfo;
|
} SStmtExecInfo;
|
||||||
|
|
||||||
typedef struct SStmtSQLInfo {
|
typedef struct SStmtSQLInfo {
|
||||||
|
bool stbInterlaceMode;
|
||||||
STMT_TYPE type;
|
STMT_TYPE type;
|
||||||
STMT_STATUS status;
|
STMT_STATUS status;
|
||||||
|
uint64_t suid;
|
||||||
uint64_t runTimes;
|
uint64_t runTimes;
|
||||||
SHashObj *pTableCache; // SHash<SStmtTableCache>
|
SHashObj *pTableCache; // SHash<SStmtTableCache>
|
||||||
SQuery *pQuery;
|
SQuery *pQuery;
|
||||||
|
@ -88,14 +100,49 @@ typedef struct SStmtSQLInfo {
|
||||||
SStmtQueryResInfo queryRes;
|
SStmtQueryResInfo queryRes;
|
||||||
bool autoCreateTbl;
|
bool autoCreateTbl;
|
||||||
SHashObj *pVgHash;
|
SHashObj *pVgHash;
|
||||||
|
|
||||||
|
SStbInterlaceInfo siInfo;
|
||||||
} SStmtSQLInfo;
|
} SStmtSQLInfo;
|
||||||
|
|
||||||
|
typedef struct SStmtStatInfo {
|
||||||
|
int64_t ctgGetTbMetaNum;
|
||||||
|
int64_t getCacheTbInfo;
|
||||||
|
int64_t parseSqlNum;
|
||||||
|
int64_t bindDataNum;
|
||||||
|
int64_t setTbNameUs;
|
||||||
|
int64_t bindDataUs1;
|
||||||
|
int64_t bindDataUs2;
|
||||||
|
int64_t bindDataUs3;
|
||||||
|
int64_t bindDataUs4;
|
||||||
|
int64_t addBatchUs;
|
||||||
|
int64_t execWaitUs;
|
||||||
|
int64_t execUseUs;
|
||||||
|
} SStmtStatInfo;
|
||||||
|
|
||||||
|
typedef struct SStmtQNode {
|
||||||
|
STableColsData tblData;
|
||||||
|
struct SStmtQNode* next;
|
||||||
|
} SStmtQNode;
|
||||||
|
|
||||||
|
typedef struct SStmtQueue {
|
||||||
|
bool stopQueue;
|
||||||
|
SStmtQNode* head;
|
||||||
|
SStmtQNode* tail;
|
||||||
|
uint64_t qRemainNum;
|
||||||
|
} SStmtQueue;
|
||||||
|
|
||||||
|
|
||||||
typedef struct STscStmt {
|
typedef struct STscStmt {
|
||||||
STscObj *taos;
|
STscObj *taos;
|
||||||
SCatalog *pCatalog;
|
SCatalog *pCatalog;
|
||||||
int32_t affectedRows;
|
int32_t affectedRows;
|
||||||
uint32_t seqId;
|
uint32_t seqId;
|
||||||
uint32_t seqIds[STMT_MAX];
|
uint32_t seqIds[STMT_MAX];
|
||||||
|
bool bindThreadInUse;
|
||||||
|
TdThread bindThread;
|
||||||
|
TAOS_STMT_OPTIONS options;
|
||||||
|
bool stbInterlaceMode;
|
||||||
|
SStmtQueue queue;
|
||||||
|
|
||||||
SStmtSQLInfo sql;
|
SStmtSQLInfo sql;
|
||||||
SStmtExecInfo exec;
|
SStmtExecInfo exec;
|
||||||
|
@ -103,6 +150,8 @@ typedef struct STscStmt {
|
||||||
|
|
||||||
int64_t reqid;
|
int64_t reqid;
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
|
|
||||||
|
SStmtStatInfo stat;
|
||||||
} STscStmt;
|
} STscStmt;
|
||||||
|
|
||||||
extern char *gStmtStatusStr[];
|
extern char *gStmtStatusStr[];
|
||||||
|
@ -154,13 +203,14 @@ extern char *gStmtStatusStr[];
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
|
#define STMT_FLOG(param, ...) qFatal("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||||
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
|
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||||
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
|
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
|
||||||
|
|
||||||
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
|
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
|
||||||
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
|
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
|
||||||
|
|
||||||
TAOS_STMT *stmtInit(STscObj *taos, int64_t reqid);
|
TAOS_STMT *stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions);
|
||||||
int stmtClose(TAOS_STMT *stmt);
|
int stmtClose(TAOS_STMT *stmt);
|
||||||
int stmtExec(TAOS_STMT *stmt);
|
int stmtExec(TAOS_STMT *stmt);
|
||||||
const char *stmtErrstr(TAOS_STMT *stmt);
|
const char *stmtErrstr(TAOS_STMT *stmt);
|
||||||
|
|
|
@ -1548,7 +1548,7 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT *pStmt = stmtInit(pObj, 0);
|
TAOS_STMT *pStmt = stmtInit(pObj, 0, NULL);
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
@ -1563,13 +1563,29 @@ TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT *pStmt = stmtInit(pObj, reqid);
|
TAOS_STMT *pStmt = stmtInit(pObj, reqid, NULL);
|
||||||
|
|
||||||
releaseTscObj(*(int64_t *)taos);
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
return pStmt;
|
return pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) {
|
||||||
|
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
|
||||||
|
if (NULL == pObj) {
|
||||||
|
tscError("invalid parameter for %s", __FUNCTION__);
|
||||||
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT *pStmt = stmtInit(pObj, options->reqId, options);
|
||||||
|
|
||||||
|
releaseTscObj(*(int64_t *)taos);
|
||||||
|
|
||||||
|
return pStmt;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
|
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
|
||||||
if (stmt == NULL || sql == NULL) {
|
if (stmt == NULL || sql == NULL) {
|
||||||
tscError("NULL parameter for %s", __FUNCTION__);
|
tscError("NULL parameter for %s", __FUNCTION__);
|
||||||
|
|
|
@ -8,6 +8,60 @@
|
||||||
char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags",
|
char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags",
|
||||||
"fetchFields", "bind", "bindCol", "addBatch", "exec"};
|
"fetchFields", "bind", "bindCol", "addBatch", "exec"};
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) {
|
||||||
|
if (pTblBuf->buffOffset < pTblBuf->buffSize) {
|
||||||
|
*pBuf = pTblBuf->pCurBuff + pTblBuf->buffOffset;
|
||||||
|
pTblBuf->buffOffset += pTblBuf->buffUnit;
|
||||||
|
} else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) {
|
||||||
|
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++);
|
||||||
|
*pBuf = pTblBuf->pCurBuff;
|
||||||
|
pTblBuf->buffOffset = pTblBuf->buffUnit;
|
||||||
|
} else {
|
||||||
|
void *buff = taosMemoryMalloc(pTblBuf->buffSize);
|
||||||
|
if (NULL == buff) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pTblBuf->pBufList, &buff);
|
||||||
|
|
||||||
|
pTblBuf->buffIdx++;
|
||||||
|
pTblBuf->pCurBuff = buff;
|
||||||
|
*pBuf = buff;
|
||||||
|
pTblBuf->buffOffset = pTblBuf->buffUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool stmtDequeue(STscStmt* pStmt, SStmtQNode **param) {
|
||||||
|
while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) {
|
||||||
|
taosUsleep(1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStmtQNode *orig = pStmt->queue.head;
|
||||||
|
|
||||||
|
SStmtQNode *node = pStmt->queue.head->next;
|
||||||
|
pStmt->queue.head = pStmt->queue.head->next;
|
||||||
|
|
||||||
|
//taosMemoryFreeClear(orig);
|
||||||
|
|
||||||
|
*param = node;
|
||||||
|
|
||||||
|
atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) {
|
||||||
|
pStmt->queue.tail->next = param;
|
||||||
|
pStmt->queue.tail = param;
|
||||||
|
|
||||||
|
pStmt->stat.bindDataNum++;
|
||||||
|
atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t stmtCreateRequest(STscStmt* pStmt) {
|
static int32_t stmtCreateRequest(STscStmt* pStmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -42,7 +96,10 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
|
||||||
pStmt->errCode = 0;
|
pStmt->errCode = 0;
|
||||||
break;
|
break;
|
||||||
case STMT_SETTBNAME:
|
case STMT_SETTBNAME:
|
||||||
if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) {
|
if (STMT_STATUS_EQ(INIT)) {
|
||||||
|
code = TSDB_CODE_TSC_STMT_API_ERROR;
|
||||||
|
}
|
||||||
|
if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) {
|
||||||
code = TSDB_CODE_TSC_STMT_API_ERROR;
|
code = TSDB_CODE_TSC_STMT_API_ERROR;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -170,6 +227,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
|
||||||
|
|
||||||
pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
|
pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
|
||||||
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
||||||
|
pStmt->bInfo.tbVgId = pTableMeta->vgId;
|
||||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||||
pStmt->bInfo.boundTags = tags;
|
pStmt->bInfo.boundTags = tags;
|
||||||
pStmt->bInfo.tagsCached = false;
|
pStmt->bInfo.tagsCached = false;
|
||||||
|
@ -261,14 +319,18 @@ int32_t stmtParseSql(STscStmt* pStmt) {
|
||||||
|
|
||||||
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
||||||
|
|
||||||
|
pStmt->stat.parseSqlNum++;
|
||||||
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
|
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
|
||||||
|
pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery;
|
||||||
|
|
||||||
pStmt->bInfo.needParse = false;
|
pStmt->bInfo.needParse = false;
|
||||||
|
|
||||||
if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
|
if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
|
||||||
pStmt->sql.type = STMT_TYPE_INSERT;
|
pStmt->sql.type = STMT_TYPE_INSERT;
|
||||||
|
pStmt->sql.stbInterlaceMode = false;
|
||||||
} else if (pStmt->sql.pQuery->pPrepareRoot) {
|
} else if (pStmt->sql.pQuery->pPrepareRoot) {
|
||||||
pStmt->sql.type = STMT_TYPE_QUERY;
|
pStmt->sql.type = STMT_TYPE_QUERY;
|
||||||
|
pStmt->sql.stbInterlaceMode = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -277,6 +339,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
|
||||||
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
|
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
|
||||||
pStmt->bInfo.tbUid = 0;
|
pStmt->bInfo.tbUid = 0;
|
||||||
pStmt->bInfo.tbSuid = 0;
|
pStmt->bInfo.tbSuid = 0;
|
||||||
|
pStmt->bInfo.tbVgId = -1;
|
||||||
pStmt->bInfo.tbType = 0;
|
pStmt->bInfo.tbType = 0;
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
pStmt->bInfo.inExecCache = false;
|
pStmt->bInfo.inExecCache = false;
|
||||||
|
@ -287,11 +350,30 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
|
||||||
qDestroyBoundColInfo(pStmt->bInfo.boundTags);
|
qDestroyBoundColInfo(pStmt->bInfo.boundTags);
|
||||||
taosMemoryFreeClear(pStmt->bInfo.boundTags);
|
taosMemoryFreeClear(pStmt->bInfo.boundTags);
|
||||||
}
|
}
|
||||||
memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN);
|
pStmt->bInfo.stbFName[0] = 0;;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void stmtFreeTableBlkList(STableColsData* pTb) {
|
||||||
|
qResetStmtColumns(pTb->aCol, true);
|
||||||
|
taosArrayDestroy(pTb->aCol);
|
||||||
|
}
|
||||||
|
|
||||||
|
void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
|
||||||
|
pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0);
|
||||||
|
pTblBuf->buffIdx = 1;
|
||||||
|
pTblBuf->buffOffset = sizeof(*pQueue->head);
|
||||||
|
|
||||||
|
pQueue->head = pQueue->tail = pTblBuf->pCurBuff;
|
||||||
|
pQueue->qRemainNum = 0;
|
||||||
|
pQueue->head->next = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
||||||
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
|
pStmt->sql.siInfo.pTableColsIdx = 0;
|
||||||
|
stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
|
||||||
|
} else {
|
||||||
if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
|
if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
|
||||||
taos_free_result(pStmt->exec.pRequest);
|
taos_free_result(pStmt->exec.pRequest);
|
||||||
pStmt->exec.pRequest = NULL;
|
pStmt->exec.pRequest = NULL;
|
||||||
|
@ -327,6 +409,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
||||||
|
|
||||||
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
||||||
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
||||||
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
|
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
|
||||||
|
|
||||||
|
@ -367,27 +450,49 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
|
|
||||||
uint64_t suid) {
|
int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) {
|
||||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||||
.requestId = pStmt->exec.pRequest->requestId,
|
.requestId = pStmt->exec.pRequest->requestId,
|
||||||
.requestObjRefId = pStmt->exec.pRequest->self,
|
.requestObjRefId = pStmt->exec.pRequest->self,
|
||||||
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
||||||
|
|
||||||
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo));
|
int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo);
|
||||||
STMT_ERR_RET(
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
|
code = taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId);
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
|
||||||
|
uint64_t suid, int32_t vgId) {
|
||||||
|
STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId));
|
||||||
|
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl));
|
||||||
|
|
||||||
|
STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtGetFromCache(STscStmt* pStmt) {
|
int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
|
if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) {
|
||||||
|
pStmt->bInfo.needParse = false;
|
||||||
|
pStmt->bInfo.inExecCache = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
pStmt->bInfo.inExecCache = false;
|
pStmt->bInfo.inExecCache = false;
|
||||||
|
|
||||||
|
@ -404,6 +509,11 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL == pStmt->pCatalog) {
|
||||||
|
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
||||||
|
pStmt->sql.siInfo.pCatalog = pStmt->pCatalog;
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
||||||
if (pStmt->bInfo.inExecCache) {
|
if (pStmt->bInfo.inExecCache) {
|
||||||
pStmt->bInfo.needParse = false;
|
pStmt->bInfo.needParse = false;
|
||||||
|
@ -415,9 +525,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pStmt->pCatalog) {
|
|
||||||
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStmt->sql.autoCreateTbl) {
|
if (pStmt->sql.autoCreateTbl) {
|
||||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
|
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
|
||||||
|
@ -426,7 +533,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
pStmt->bInfo.tbUid = 0;
|
pStmt->bInfo.tbUid = 0;
|
||||||
|
|
||||||
STableDataCxt* pNewBlock = NULL;
|
STableDataCxt* pNewBlock = NULL;
|
||||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid));
|
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1));
|
||||||
|
|
||||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||||
POINTER_BYTES)) {
|
POINTER_BYTES)) {
|
||||||
|
@ -443,12 +550,19 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
STMT_RET(stmtCleanBindInfo(pStmt));
|
STMT_RET(stmtCleanBindInfo(pStmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t uid, suid;
|
||||||
|
int32_t vgId;
|
||||||
|
int8_t tableType;
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||||
.requestId = pStmt->exec.pRequest->requestId,
|
.requestId = pStmt->exec.pRequest->requestId,
|
||||||
.requestObjRefId = pStmt->exec.pRequest->self,
|
.requestObjRefId = pStmt->exec.pRequest->self,
|
||||||
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
||||||
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
||||||
|
|
||||||
|
pStmt->stat.ctgGetTbMetaNum++;
|
||||||
|
|
||||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
|
tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
|
||||||
stmtCleanBindInfo(pStmt);
|
stmtCleanBindInfo(pStmt);
|
||||||
|
@ -458,10 +572,14 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
|
|
||||||
STMT_ERR_RET(code);
|
STMT_ERR_RET(code);
|
||||||
|
|
||||||
uint64_t uid = pTableMeta->uid;
|
uid = pTableMeta->uid;
|
||||||
uint64_t suid = pTableMeta->suid;
|
suid = pTableMeta->suid;
|
||||||
int8_t tableType = pTableMeta->tableType;
|
tableType = pTableMeta->tableType;
|
||||||
|
pStmt->bInfo.tbVgId = pTableMeta->vgId;
|
||||||
|
vgId = pTableMeta->vgId;
|
||||||
|
|
||||||
taosMemoryFree(pTableMeta);
|
taosMemoryFree(pTableMeta);
|
||||||
|
|
||||||
uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
|
uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
|
||||||
|
|
||||||
if (uid == pStmt->bInfo.tbUid) {
|
if (uid == pStmt->bInfo.tbUid) {
|
||||||
|
@ -505,7 +623,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
pStmt->bInfo.tagsCached = true;
|
pStmt->bInfo.tagsCached = true;
|
||||||
|
|
||||||
STableDataCxt* pNewBlock = NULL;
|
STableDataCxt* pNewBlock = NULL;
|
||||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid));
|
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId));
|
||||||
|
|
||||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||||
POINTER_BYTES)) {
|
POINTER_BYTES)) {
|
||||||
|
@ -538,9 +656,94 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
|
|
||||||
|
int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) {
|
||||||
|
SStmtQNode* pParam = (SStmtQNode*)param;
|
||||||
|
|
||||||
|
STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock, &pStmt->sql.siInfo));
|
||||||
|
|
||||||
|
//taosMemoryFree(pParam->pTbData);
|
||||||
|
|
||||||
|
atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void *stmtBindThreadFunc(void *param) {
|
||||||
|
setThreadName("stmtBind");
|
||||||
|
|
||||||
|
qInfo("stmt bind thread started");
|
||||||
|
|
||||||
|
STscStmt* pStmt = (STscStmt*)param;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (atomic_load_8((int8_t *)&pStmt->queue.stopQueue)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStmtQNode *asyncParam = NULL;
|
||||||
|
if (!stmtDequeue(pStmt, &asyncParam)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
stmtAsyncOutput(pStmt, asyncParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
qInfo("stmt bind thread stopped");
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t stmtStartBindThread(STscStmt* pStmt) {
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
STMT_ERR_RET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->bindThreadInUse = true;
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t stmtInitQueue(STscStmt* pStmt) {
|
||||||
|
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head));
|
||||||
|
pStmt->queue.tail = pStmt->queue.head;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) {
|
||||||
|
pTblBuf->buffUnit = sizeof(SStmtQNode);
|
||||||
|
pTblBuf->buffSize = pTblBuf->buffUnit * 1000;
|
||||||
|
pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES);
|
||||||
|
if (NULL == pTblBuf->pBufList) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
void *buff = taosMemoryMalloc(pTblBuf->buffSize);
|
||||||
|
if (NULL == buff) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pTblBuf->pBufList, &buff);
|
||||||
|
|
||||||
|
pTblBuf->pCurBuff = buff;
|
||||||
|
pTblBuf->buffIdx = 1;
|
||||||
|
pTblBuf->buffOffset = 0;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) {
|
||||||
STscObj* pObj = (STscObj*)taos;
|
STscObj* pObj = (STscObj*)taos;
|
||||||
STscStmt* pStmt = NULL;
|
STscStmt* pStmt = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
|
pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
|
||||||
if (NULL == pStmt) {
|
if (NULL == pStmt) {
|
||||||
|
@ -560,6 +763,43 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
|
||||||
pStmt->sql.status = STMT_INIT;
|
pStmt->sql.status = STMT_INIT;
|
||||||
pStmt->reqid = reqid;
|
pStmt->reqid = reqid;
|
||||||
|
|
||||||
|
if (NULL != pOptions) {
|
||||||
|
memcpy(&pStmt->options, pOptions, sizeof(pStmt->options));
|
||||||
|
if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) {
|
||||||
|
pStmt->stbInterlaceMode = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->stbInterlaceMode) {
|
||||||
|
pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter;
|
||||||
|
pStmt->sql.siInfo.acctId = taos->acctId;
|
||||||
|
pStmt->sql.siInfo.dbname = taos->db;
|
||||||
|
pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||||
|
pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
||||||
|
if (NULL == pStmt->sql.siInfo.pTableHash) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stmtClose(pStmt);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES);
|
||||||
|
if (NULL == pStmt->sql.siInfo.pTableCols) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stmtClose(pStmt);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
stmtInitQueue(pStmt);
|
||||||
|
code = stmtStartBindThread(pStmt);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
terrno = code;
|
||||||
|
stmtClose(pStmt);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
STMT_LOG_SEQ(STMT_INIT);
|
STMT_LOG_SEQ(STMT_INIT);
|
||||||
|
|
||||||
tscDebug("stmt:%p initialized", pStmt);
|
tscDebug("stmt:%p initialized", pStmt);
|
||||||
|
@ -584,6 +824,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
|
|
||||||
pStmt->sql.sqlStr = strndup(sql, length);
|
pStmt->sql.sqlStr = strndup(sql, length);
|
||||||
pStmt->sql.sqlLen = length;
|
pStmt->sql.sqlLen = length;
|
||||||
|
pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -591,6 +832,8 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
|
int64_t startUs = taosGetTimestampUs();
|
||||||
|
|
||||||
STMT_DLOG("start to set tbName: %s", tbName);
|
STMT_DLOG("start to set tbName: %s", tbName);
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
|
||||||
|
@ -602,8 +845,12 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
||||||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) {
|
||||||
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
||||||
|
|
||||||
|
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
||||||
|
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
||||||
|
|
||||||
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
|
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
|
||||||
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
|
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
|
||||||
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
|
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
|
||||||
|
@ -611,11 +858,40 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
||||||
STMT_ERR_RET(stmtGetFromCache(pStmt));
|
STMT_ERR_RET(stmtGetFromCache(pStmt));
|
||||||
|
|
||||||
if (pStmt->bInfo.needParse) {
|
if (pStmt->bInfo.needParse) {
|
||||||
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
|
||||||
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
|
||||||
|
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
||||||
|
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
||||||
|
pStmt->exec.pRequest->requestId++;
|
||||||
|
pStmt->bInfo.needParse = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
|
||||||
|
STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||||
|
if (!pSrc) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
STableDataCxt* pDst = NULL;
|
||||||
|
|
||||||
|
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
|
||||||
|
pStmt->sql.siInfo.pDataCtx = pDst;
|
||||||
|
|
||||||
|
SArray* pTblCols = NULL;
|
||||||
|
for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
|
||||||
|
pTblCols = taosArrayDup(pDst->pData->aCol, NULL);
|
||||||
|
if (NULL == pTblCols) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols);
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t startUs2 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.setTbNameUs += startUs2 - startUs;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -682,8 +958,89 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
SArray* stmtGetFreeCol(STscStmt* pStmt, int32_t* idx) {
|
||||||
|
while (true) {
|
||||||
|
if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) {
|
||||||
|
pStmt->exec.smInfo.pColIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) {
|
||||||
|
taosUsleep(1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
*idx = pStmt->exec.smInfo.pColIdx;
|
||||||
|
return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) {
|
||||||
|
if (NULL == pStmt->sql.siInfo.pVgroupHash) {
|
||||||
|
pStmt->sql.siInfo.pVgroupHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
}
|
||||||
|
if (NULL == pStmt->sql.siInfo.pVgroupList) {
|
||||||
|
pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pStmt->sql.siInfo.pRequest) {
|
||||||
|
STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, (SRequestObj**)&pStmt->sql.siInfo.pRequest,
|
||||||
|
pStmt->reqid));
|
||||||
|
|
||||||
|
if (pStmt->reqid != 0) {
|
||||||
|
pStmt->reqid++;
|
||||||
|
}
|
||||||
|
pStmt->exec.pRequest->syncQuery = true;
|
||||||
|
|
||||||
|
pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId;
|
||||||
|
pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) {
|
||||||
|
pStmt->sql.siInfo.tbFromHash = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 == pStmt->sql.siInfo.firstName[0]) {
|
||||||
|
strcpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash;
|
||||||
|
param->next = NULL;
|
||||||
|
|
||||||
|
atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1);
|
||||||
|
|
||||||
|
stmtEnqueue(pStmt, param);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray** pTableCols) {
|
||||||
|
while (true) {
|
||||||
|
if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) {
|
||||||
|
*pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
SArray* pTblCols = NULL;
|
||||||
|
for (int32_t i = 0; i < 100; i++) {
|
||||||
|
pTblCols = taosArrayDup(pStmt->sql.siInfo.pDataCtx->pData->aCol, NULL);
|
||||||
|
if (NULL == pTblCols) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int64_t startUs = taosGetTimestampUs();
|
||||||
|
|
||||||
STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
|
STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
|
||||||
|
|
||||||
|
@ -699,9 +1056,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
pStmt->exec.pRequest = NULL;
|
pStmt->exec.pRequest = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
|
||||||
|
|
||||||
if (pStmt->bInfo.needParse) {
|
if (pStmt->bInfo.needParse) {
|
||||||
|
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,8 +1114,30 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
pStmt->exec.pCurrBlock = *pDataBlock;
|
pStmt->exec.pCurrBlock = *pDataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t startUs2 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.bindDataUs1 += startUs2 - startUs;
|
||||||
|
|
||||||
|
SStmtQNode* param = NULL;
|
||||||
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
|
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m));
|
||||||
|
STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, ¶m->tblData.aCol));
|
||||||
|
|
||||||
|
int32_t colNum = taosArrayGetSize(param->tblData.aCol);
|
||||||
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
|
SColData* pCol = (SColData*)taosArrayGet(param->tblData.aCol, i);
|
||||||
|
tColDataClear(pCol);
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy(param->tblData.tbName, pStmt->bInfo.tbName);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t startUs3 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.bindDataUs2 += startUs3 - startUs2;
|
||||||
|
|
||||||
|
SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
|
||||||
|
|
||||||
if (colIdx < 0) {
|
if (colIdx < 0) {
|
||||||
int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
|
code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
|
||||||
if (code) {
|
if (code) {
|
||||||
tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
|
tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
|
||||||
STMT_ERR_RET(code);
|
STMT_ERR_RET(code);
|
||||||
|
@ -776,20 +1154,38 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
||||||
pStmt->bInfo.sBindRowNum = bind->num;
|
pStmt->bInfo.sBindRowNum = bind->num;
|
||||||
}
|
}
|
||||||
|
|
||||||
qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
|
qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
|
||||||
pStmt->bInfo.sBindRowNum);
|
pStmt->bInfo.sBindRowNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t startUs4 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.bindDataUs3 += startUs4 - startUs3;
|
||||||
|
|
||||||
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
|
STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param));
|
||||||
|
}
|
||||||
|
|
||||||
|
pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int stmtAddBatch(TAOS_STMT* stmt) {
|
int stmtAddBatch(TAOS_STMT* stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
|
int64_t startUs = taosGetTimestampUs();
|
||||||
|
|
||||||
STMT_DLOG_E("start to add batch");
|
STMT_DLOG_E("start to add batch");
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
|
||||||
|
|
||||||
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
|
int64_t startUs2 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.addBatchUs += startUs2 - startUs;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtCacheBlock(pStmt));
|
STMT_ERR_RET(stmtCacheBlock(pStmt));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -858,6 +1254,8 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
|
||||||
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
|
||||||
|
|
||||||
|
pStmt->stat.ctgGetTbMetaNum++;
|
||||||
|
|
||||||
taos_free_result(pStmt->exec.pRequest);
|
taos_free_result(pStmt->exec.pRequest);
|
||||||
pStmt->exec.pRequest = NULL;
|
pStmt->exec.pRequest = NULL;
|
||||||
|
|
||||||
|
@ -879,26 +1277,22 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
return finalCode;
|
return finalCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int stmtExec(TAOS_STMT* stmt) {
|
/*
|
||||||
|
int stmtStaticModeExec(TAOS_STMT* stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSubmitRsp* pRsp = NULL;
|
SSubmitRsp* pRsp = NULL;
|
||||||
|
if (pStmt->sql.staticMode) {
|
||||||
|
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
STMT_DLOG_E("start to exec");
|
STMT_DLOG_E("start to exec");
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
||||||
|
|
||||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList, pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum));
|
||||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
|
||||||
} else {
|
|
||||||
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
|
||||||
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
|
||||||
|
|
||||||
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
|
|
||||||
|
|
||||||
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
|
|
||||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
||||||
}
|
|
||||||
|
|
||||||
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
||||||
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
|
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
|
||||||
|
@ -926,12 +1320,92 @@ _return:
|
||||||
|
|
||||||
STMT_RET(code);
|
STMT_RET(code);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
int stmtExec(TAOS_STMT* stmt) {
|
||||||
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
int32_t code = 0;
|
||||||
|
SSubmitRsp* pRsp = NULL;
|
||||||
|
|
||||||
|
int64_t startUs = taosGetTimestampUs();
|
||||||
|
|
||||||
|
STMT_DLOG_E("start to exec");
|
||||||
|
|
||||||
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
||||||
|
|
||||||
|
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
||||||
|
} else {
|
||||||
|
if (pStmt->sql.stbInterlaceMode) {
|
||||||
|
int64_t startTs = taosGetTimestampUs();
|
||||||
|
while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
|
||||||
|
taosUsleep(1);
|
||||||
|
}
|
||||||
|
pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs;
|
||||||
|
|
||||||
|
STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList));
|
||||||
|
taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
|
||||||
|
pStmt->sql.siInfo.pVgroupHash = NULL;
|
||||||
|
pStmt->sql.siInfo.pVgroupList = NULL;
|
||||||
|
} else {
|
||||||
|
tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
||||||
|
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
||||||
|
|
||||||
|
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
|
||||||
|
|
||||||
|
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
|
||||||
|
}
|
||||||
|
|
||||||
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
||||||
|
code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
|
||||||
|
if (code) {
|
||||||
|
pStmt->exec.pRequest->code = code;
|
||||||
|
} else {
|
||||||
|
tFreeSSubmitRsp(pRsp);
|
||||||
|
STMT_ERR_RET(stmtResetStmt(pStmt));
|
||||||
|
STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
STMT_ERR_JRET(pStmt->exec.pRequest->code);
|
||||||
|
|
||||||
|
pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
|
||||||
|
pStmt->affectedRows += pStmt->exec.affectedRows;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||||
|
|
||||||
|
tFreeSSubmitRsp(pRsp);
|
||||||
|
|
||||||
|
++pStmt->sql.runTimes;
|
||||||
|
|
||||||
|
int64_t startUs2 = taosGetTimestampUs();
|
||||||
|
pStmt->stat.execUseUs += startUs2 - startUs;
|
||||||
|
|
||||||
|
STMT_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
int stmtClose(TAOS_STMT* stmt) {
|
int stmtClose(TAOS_STMT* stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
STMT_DLOG_E("start to free stmt");
|
STMT_DLOG_E("start to free stmt");
|
||||||
|
|
||||||
|
pStmt->queue.stopQueue = true;
|
||||||
|
|
||||||
|
taosMsleep(10);
|
||||||
|
|
||||||
|
STMT_FLOG("stmt %p closed, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64
|
||||||
|
", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u"
|
||||||
|
", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64 ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64,
|
||||||
|
pStmt, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo, pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum,
|
||||||
|
pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND], pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE],
|
||||||
|
pStmt->stat.setTbNameUs, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4,
|
||||||
|
pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs);
|
||||||
|
|
||||||
stmtCleanSQLInfo(pStmt);
|
stmtCleanSQLInfo(pStmt);
|
||||||
taosMemoryFree(stmt);
|
taosMemoryFree(stmt);
|
||||||
|
|
||||||
|
|
|
@ -9624,6 +9624,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
|
#if 0
|
||||||
int32_t nColData = TARRAY_SIZE(pTbData->aCol);
|
int32_t nColData = TARRAY_SIZE(pTbData->aCol);
|
||||||
SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol);
|
SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol);
|
||||||
|
|
||||||
|
@ -9631,6 +9632,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
||||||
tColDataDestroy(&aColData[i]);
|
tColDataDestroy(&aColData[i]);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pTbData->aCol);
|
taosArrayDestroy(pTbData->aCol);
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
int32_t nRow = TARRAY_SIZE(pTbData->aRowP);
|
int32_t nRow = TARRAY_SIZE(pTbData->aRowP);
|
||||||
SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
|
SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
|
||||||
|
|
|
@ -51,7 +51,8 @@ int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta
|
||||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
|
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
|
||||||
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
|
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
|
||||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild);
|
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild);
|
||||||
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
//int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum);
|
||||||
|
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks, bool append);
|
||||||
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
||||||
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
|
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
|
||||||
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
|
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
|
||||||
|
|
|
@ -446,7 +446,7 @@ int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
|
||||||
uError("insMergeTableDataCxt failed");
|
uError("insMergeTableDataCxt failed");
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
|
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("insBuildVgDataBlocks failed");
|
uError("insBuildVgDataBlocks failed");
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -2474,7 +2474,7 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
||||||
taosHashClear(pStmt->pTableCxtHashObj);
|
taosHashClear(pStmt->pTableCxtHashObj);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
|
code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -2523,10 +2523,15 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S
|
||||||
pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
|
pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext;
|
||||||
|
|
||||||
if (!reentry) {
|
if (!reentry) {
|
||||||
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
if (pCxt->pComCxt->pStmtCb) {
|
||||||
|
pStmt->pTableBlockHashObj =
|
||||||
|
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
} else {
|
||||||
pStmt->pTableBlockHashObj =
|
pStmt->pTableBlockHashObj =
|
||||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
|
|
|
@ -51,6 +51,48 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) {
|
||||||
|
// merge according to vgId
|
||||||
|
return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->freeArrayFunc) {
|
||||||
|
pStmt->freeArrayFunc(pVgDataBlocks);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SArray* pVgDataBlocks = NULL;
|
||||||
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
|
||||||
|
|
||||||
|
// merge according to vgId
|
||||||
|
if (tbNum > 0) {
|
||||||
|
code = insMergeStmtTableDataCxt(pTbCtx, pBlockList, &pVgDataBlocks, true, tbNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->freeArrayFunc) {
|
||||||
|
pStmt->freeArrayFunc(pVgDataBlocks);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SArray* pVgDataBlocks = NULL;
|
SArray* pVgDataBlocks = NULL;
|
||||||
|
@ -60,8 +102,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
|
||||||
if (taosHashGetSize(pBlockHash) > 0) {
|
if (taosHashGetSize(pBlockHash) > 0) {
|
||||||
code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true);
|
code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
|
code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStmt->freeArrayFunc) {
|
if (pStmt->freeArrayFunc) {
|
||||||
|
@ -233,7 +276,7 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||||
|
@ -245,7 +288,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
|
||||||
|
|
||||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||||
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
|
SColData* pCol = taosArrayGet(pCols, c);
|
||||||
|
|
||||||
if (bind[c].num != rowNum) {
|
if (bind[c].num != rowNum) {
|
||||||
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||||
|
@ -283,14 +326,14 @@ _return:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||||
int32_t rowNum) {
|
int32_t rowNum) {
|
||||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
|
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
|
||||||
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
|
SColData* pCol = taosArrayGet(pCols, colIdx);
|
||||||
TAOS_MULTI_BIND ncharBind = {0};
|
TAOS_MULTI_BIND ncharBind = {0};
|
||||||
TAOS_MULTI_BIND* pBind = NULL;
|
TAOS_MULTI_BIND* pBind = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -393,6 +436,22 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qResetStmtColumns(SArray* pCols, bool deepClear) {
|
||||||
|
int32_t colNum = taosArrayGetSize(pCols);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
|
SColData* pCol = (SColData*)taosArrayGet(pCols, i);
|
||||||
|
if (deepClear) {
|
||||||
|
tColDataDeepClear(pCol);
|
||||||
|
} else {
|
||||||
|
tColDataClear(pCol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
|
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
|
||||||
STableDataCxt* pBlock = (STableDataCxt*)block;
|
STableDataCxt* pBlock = (STableDataCxt*)block;
|
||||||
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
|
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "tRealloc.h"
|
#include "tRealloc.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
|
||||||
void qDestroyBoundColInfo(void* pInfo) {
|
void qDestroyBoundColInfo(void* pInfo) {
|
||||||
if (NULL == pInfo) {
|
if (NULL == pInfo) {
|
||||||
|
@ -429,7 +430,7 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
|
||||||
taosHashCleanup(pTableCxtHash);
|
taosHashCleanup(pTableCxtHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild) {
|
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) {
|
||||||
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
||||||
pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
|
pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
|
||||||
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
||||||
|
@ -441,7 +442,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx
|
||||||
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
|
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
|
||||||
if (isRebuild) {
|
if (isRebuild) {
|
||||||
rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
|
rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
|
||||||
} else {
|
} else if (clear) {
|
||||||
taosMemoryFreeClear(pTableCxt->pData);
|
taosMemoryFreeClear(pTableCxt->pData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,6 +487,207 @@ int insColDataComp(const void* lp, const void* rp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId, STableColsData* pTbData, SName* sname) {
|
||||||
|
if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgroupInfo vgInfo = {0};
|
||||||
|
SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
|
||||||
|
.requestId = pBuildInfo->requestId,
|
||||||
|
.requestObjRefId = pBuildInfo->requestSelf,
|
||||||
|
.mgmtEps = pBuildInfo->mgmtEpSet};
|
||||||
|
|
||||||
|
int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo));
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData, uint64_t* uid, int32_t* vgId) {
|
||||||
|
STableVgUid* pTbInfo = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (pTbData->getFromHash) {
|
||||||
|
pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pTbInfo) {
|
||||||
|
SName sname;
|
||||||
|
qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0);
|
||||||
|
|
||||||
|
STableMeta* pTableMeta = NULL;
|
||||||
|
SRequestConnInfo conn = {.pTrans = pBuildInfo->transport,
|
||||||
|
.requestId = pBuildInfo->requestId,
|
||||||
|
.requestObjRefId = pBuildInfo->requestSelf,
|
||||||
|
.mgmtEps = pBuildInfo->mgmtEpSet};
|
||||||
|
code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta);
|
||||||
|
|
||||||
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
|
parserDebug("tb %s.%s not exist", sname.dbname, sname.tname);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*uid = pTableMeta->uid;
|
||||||
|
*vgId = pTableMeta->vgId;
|
||||||
|
|
||||||
|
STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId};
|
||||||
|
tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo));
|
||||||
|
|
||||||
|
code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname);
|
||||||
|
|
||||||
|
taosMemoryFree(pTableMeta);
|
||||||
|
} else {
|
||||||
|
*uid = pTbInfo->uid;
|
||||||
|
*vgId = pTbInfo->vgid;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
uint64_t uid;
|
||||||
|
int32_t vgId;
|
||||||
|
|
||||||
|
pTbCtx->pData->aCol = pTbData->aCol;
|
||||||
|
|
||||||
|
SColData* pCol = taosArrayGet(pTbCtx->pData->aCol, 0);
|
||||||
|
if (pCol->nVal <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTbCtx->pMeta->vgId = vgId;
|
||||||
|
pTbCtx->pMeta->uid = uid;
|
||||||
|
pTbCtx->pData->uid = uid;
|
||||||
|
|
||||||
|
if (pTbCtx->pData->pCreateTbReq) {
|
||||||
|
pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySort(pTbCtx->pData->aCol, insColDataComp);
|
||||||
|
|
||||||
|
tColDataSortMerge(pTbCtx->pData->aCol);
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgroupDataCxt* pVgCxt = NULL;
|
||||||
|
void** pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
|
||||||
|
if (NULL == pp) {
|
||||||
|
pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId));
|
||||||
|
if (NULL == pp) {
|
||||||
|
code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt);
|
||||||
|
} else {
|
||||||
|
pVgCxt = *(SVgroupDataCxt**)pp;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pVgCxt = *(SVgroupDataCxt**)pp;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 1000) {
|
||||||
|
code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
|
||||||
|
taosArrayClear(pVgCxt->pData->aSubmitTbData);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum) {
|
||||||
|
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||||
|
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
if (NULL == pVgroupHash || NULL == pVgroupList) {
|
||||||
|
taosHashCleanup(pVgroupHash);
|
||||||
|
taosArrayDestroy(pVgroupList);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
|
STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i);
|
||||||
|
pTableCxt->pMeta->vgId = pTableCols->vgId;
|
||||||
|
pTableCxt->pMeta->uid = pTableCols->uid;
|
||||||
|
pTableCxt->pData->uid = pTableCols->uid;
|
||||||
|
pTableCxt->pData->aCol = pTableCols->aCol;
|
||||||
|
|
||||||
|
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
|
||||||
|
if (pCol->nVal <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableCxt->pData->pCreateTbReq) {
|
||||||
|
pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySort(pTableCxt->pData->aCol, insColDataComp);
|
||||||
|
|
||||||
|
tColDataSortMerge(pTableCxt->pData->aCol);
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SVgroupDataCxt* pVgCxt = NULL;
|
||||||
|
int32_t vgId = pTableCxt->pMeta->vgId;
|
||||||
|
void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
|
||||||
|
if (NULL == pp) {
|
||||||
|
code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
|
||||||
|
} else {
|
||||||
|
pVgCxt = *(SVgroupDataCxt**)pp;
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(pVgroupHash);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pVgDataBlocks = pVgroupList;
|
||||||
|
} else {
|
||||||
|
insDestroyVgroupDataCxtList(pVgroupList);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
|
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
|
||||||
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||||
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
|
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
@ -546,7 +748,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool
|
||||||
pVgCxt = *(SVgroupDataCxt**)pp;
|
pVgCxt = *(SVgroupDataCxt**)pp;
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild);
|
code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -599,9 +801,9 @@ static void destroyVgDataBlocks(void* p) {
|
||||||
taosMemoryFree(pVg);
|
taosMemoryFree(pVg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) {
|
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) {
|
||||||
size_t numOfVg = taosArrayGetSize(pVgDataCxtList);
|
size_t numOfVg = taosArrayGetSize(pVgDataCxtList);
|
||||||
SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES);
|
||||||
if (NULL == pDataBlocks) {
|
if (NULL == pDataBlocks) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -609,6 +811,9 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
|
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
|
||||||
SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
|
SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
|
||||||
|
if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
if (NULL == dst) {
|
if (NULL == dst) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -626,6 +831,13 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (append) {
|
||||||
|
if (NULL == *pVgDataBlocks) {
|
||||||
|
*pVgDataBlocks = pDataBlocks;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pVgDataBlocks = pDataBlocks;
|
*pVgDataBlocks = pDataBlocks;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1268,7 +1268,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
#if 0
|
||||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
|
|
Loading…
Reference in New Issue