diff --git a/cmake/cmake.version b/cmake/cmake.version index 156e99bd03..44ac989200 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.3.1.0.alpha") + SET(TD_VER_NUMBER "3.3.2.0.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index f295e57bb5..119f22d96b 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t import Release from "/components/ReleaseV3"; +## 3.3.1.0 + + + ## 3.3.0.3 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index f69e1fd4a8..cddb8160f6 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.3.1.0 + + + ## 3.3.0.3 diff --git a/include/client/taos.h b/include/client/taos.h index 46e4e7633b..a22c8e5138 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -150,6 +150,12 @@ typedef struct TAOS_DB_ROUTE_INFO { TAOS_VGROUP_HASH_INFO *vgHash; } TAOS_DB_ROUTE_INFO; +typedef struct TAOS_STMT_OPTIONS { + int64_t reqId; + bool singleStbInsert; + bool singleTableBindOnce; +} TAOS_STMT_OPTIONS; + DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT setConfRet taos_set_config(const char *config); @@ -162,6 +168,7 @@ DLL_EXPORT const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid); +DLL_EXPORT TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS* options); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index ce9b95522a..7aec39817a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -370,6 +370,14 @@ int32_t tDecompressData(void *input, // input int32_t tCompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist); int32_t tDecompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist); +typedef struct { + int32_t columnId; + int32_t type; + TAOS_MULTI_BIND *bind; +} SBindInfo; +int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, + SArray *rowArray); + #endif #ifdef __cplusplus diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 37d5f3f0b6..4c4505544b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -118,7 +118,11 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid); void qCleanupKeywordsTable(); +int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo); +int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks); +//int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); +int32_t qResetStmtColumns(SArray* pCols, bool deepClear); int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf); int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset); int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, @@ -129,8 +133,9 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx); int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery); -int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, +int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos); +int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); +int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields); int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields); @@ -160,6 +165,7 @@ SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap); void destoryCatalogReq(SCatalogReq *pCatalogReq); bool isPrimaryKeyImpl(SNode* pExpr); +int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo); #ifdef __cplusplus } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 32f7cef12c..ef702f24d7 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,6 +25,7 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlog.h" +#include "tsimplehash.h" #include "tmsg.h" #include "tmsgcb.h" @@ -193,6 +194,27 @@ typedef struct SBoundColInfo { int32_t numOfBound; } SBoundColInfo; +typedef struct STableColsData { + char tbName[TSDB_TABLE_NAME_LEN]; + SArray* aCol; + bool getFromHash; +} STableColsData; + +typedef struct STableVgUid { + uint64_t uid; + int32_t vgid; +} STableVgUid; + +typedef struct STableBufInfo { + void* pCurBuff; + SArray* pBufList; + int64_t buffUnit; + int64_t buffSize; + int64_t buffIdx; + int64_t buffOffset; +} STableBufInfo; + + typedef struct STableDataCxt { STableMeta* pMeta; STSchema* pSchema; @@ -204,6 +226,33 @@ typedef struct STableDataCxt { bool duplicateTs; } STableDataCxt; +typedef struct SStbInterlaceInfo { + void* pCatalog; + void* pQuery; + int32_t acctId; + char* dbname; + void* transport; + SEpSet mgmtEpSet; + void* pRequest; + uint64_t requestId; + int64_t requestSelf; + bool tbFromHash; + SHashObj* pVgroupHash; + SArray* pVgroupList; + SSHashObj* pTableHash; + int64_t tbRemainNum; + STableBufInfo tbBuf; + char firstName[TSDB_TABLE_NAME_LEN]; + STSchema *pTSchema; + STableDataCxt *pDataCtx; + void *boundTags; + + bool tableColsReady; + SArray *pTableCols; + int32_t pTableColsIdx; +} SStbInterlaceInfo; + + typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index cbef80b6da..64b2acf732 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -40,6 +40,8 @@ typedef enum { STMT_MAX, } STMT_STATUS; +#define STMT_TABLE_COLS_NUM 1000 + typedef struct SStmtTableCache { STableDataCxt *pDataCtx; void *boundTags; @@ -57,6 +59,7 @@ typedef struct SStmtBindInfo { bool inExecCache; uint64_t tbUid; uint64_t tbSuid; + int32_t tbVgId; int32_t sBindRowNum; int32_t sBindLastIdx; int8_t tbType; @@ -66,8 +69,15 @@ typedef struct SStmtBindInfo { char tbFName[TSDB_TABLE_FNAME_LEN]; char stbFName[TSDB_TABLE_FNAME_LEN]; SName sname; + + char statbName[TSDB_TABLE_FNAME_LEN]; } SStmtBindInfo; +typedef struct SStmtAsyncParam { + STableColsData *pTbData; + void* pStmt; +} SStmtAsyncParam; + typedef struct SStmtExecInfo { int32_t affectedRows; SRequestObj *pRequest; @@ -77,8 +87,10 @@ typedef struct SStmtExecInfo { } SStmtExecInfo; typedef struct SStmtSQLInfo { + bool stbInterlaceMode; STMT_TYPE type; STMT_STATUS status; + uint64_t suid; uint64_t runTimes; SHashObj *pTableCache; // SHash SQuery *pQuery; @@ -88,21 +100,60 @@ typedef struct SStmtSQLInfo { SStmtQueryResInfo queryRes; bool autoCreateTbl; SHashObj *pVgHash; + SBindInfo *pBindInfo; + + SStbInterlaceInfo siInfo; } SStmtSQLInfo; +typedef struct SStmtStatInfo { + int64_t ctgGetTbMetaNum; + int64_t getCacheTbInfo; + int64_t parseSqlNum; + int64_t bindDataNum; + int64_t setTbNameUs; + int64_t bindDataUs1; + int64_t bindDataUs2; + int64_t bindDataUs3; + int64_t bindDataUs4; + int64_t addBatchUs; + int64_t execWaitUs; + int64_t execUseUs; +} SStmtStatInfo; + +typedef struct SStmtQNode { + bool restoreTbCols; + STableColsData tblData; + struct SStmtQNode* next; +} SStmtQNode; + +typedef struct SStmtQueue { + bool stopQueue; + SStmtQNode* head; + SStmtQNode* tail; + uint64_t qRemainNum; +} SStmtQueue; + + typedef struct STscStmt { - STscObj *taos; - SCatalog *pCatalog; - int32_t affectedRows; - uint32_t seqId; - uint32_t seqIds[STMT_MAX]; + STscObj *taos; + SCatalog *pCatalog; + int32_t affectedRows; + uint32_t seqId; + uint32_t seqIds[STMT_MAX]; + bool bindThreadInUse; + TdThread bindThread; + TAOS_STMT_OPTIONS options; + bool stbInterlaceMode; + SStmtQueue queue; - SStmtSQLInfo sql; - SStmtExecInfo exec; - SStmtBindInfo bInfo; + SStmtSQLInfo sql; + SStmtExecInfo exec; + SStmtBindInfo bInfo; - int64_t reqid; - int32_t errCode; + int64_t reqid; + int32_t errCode; + + SStmtStatInfo stat; } STscStmt; extern char *gStmtStatusStr[]; @@ -154,13 +205,14 @@ extern char *gStmtStatusStr[]; } while (0) +#define STMT_FLOG(param, ...) qFatal("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt) #define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt) -TAOS_STMT *stmtInit(STscObj *taos, int64_t reqid); +TAOS_STMT *stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions); int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); const char *stmtErrstr(TAOS_STMT *stmt); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index da5da044a7..81ae465cd2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1552,7 +1552,7 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) { return NULL; } - TAOS_STMT *pStmt = stmtInit(pObj, 0); + TAOS_STMT *pStmt = stmtInit(pObj, 0, NULL); releaseTscObj(*(int64_t *)taos); @@ -1567,13 +1567,29 @@ TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) { return NULL; } - TAOS_STMT *pStmt = stmtInit(pObj, reqid); + TAOS_STMT *pStmt = stmtInit(pObj, reqid, NULL); releaseTscObj(*(int64_t *)taos); return pStmt; } +TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) { + STscObj *pObj = acquireTscObj(*(int64_t *)taos); + if (NULL == pObj) { + tscError("invalid parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + TAOS_STMT *pStmt = stmtInit(pObj, options->reqId, options); + + releaseTscObj(*(int64_t *)taos); + + return pStmt; +} + + int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { if (stmt == NULL || sql == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 36a3e50aef..32d31585ea 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -8,6 +8,60 @@ char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags", "fetchFields", "bind", "bindCol", "addBatch", "exec"}; +static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) { + if (pTblBuf->buffOffset < pTblBuf->buffSize) { + *pBuf = (char*)pTblBuf->pCurBuff + pTblBuf->buffOffset; + pTblBuf->buffOffset += pTblBuf->buffUnit; + } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) { + pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++); + *pBuf = pTblBuf->pCurBuff; + pTblBuf->buffOffset = pTblBuf->buffUnit; + } else { + void *buff = taosMemoryMalloc(pTblBuf->buffSize); + if (NULL == buff) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pTblBuf->pBufList, &buff); + + pTblBuf->buffIdx++; + pTblBuf->pCurBuff = buff; + *pBuf = buff; + pTblBuf->buffOffset = pTblBuf->buffUnit; + } + + return TSDB_CODE_SUCCESS; +} + +bool stmtDequeue(STscStmt* pStmt, SStmtQNode **param) { + while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) { + taosUsleep(1); + return false; + } + + SStmtQNode *orig = pStmt->queue.head; + + SStmtQNode *node = pStmt->queue.head->next; + pStmt->queue.head = pStmt->queue.head->next; + + //taosMemoryFreeClear(orig); + + *param = node; + + atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); + + return true; +} + +void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { + pStmt->queue.tail->next = param; + pStmt->queue.tail = param; + + pStmt->stat.bindDataNum++; + atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); +} + + static int32_t stmtCreateRequest(STscStmt* pStmt) { int32_t code = 0; @@ -42,7 +96,10 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { pStmt->errCode = 0; break; case STMT_SETTBNAME: - if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) { + if (STMT_STATUS_EQ(INIT)) { + code = TSDB_CODE_TSC_STMT_API_ERROR; + } + if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) { code = TSDB_CODE_TSC_STMT_API_ERROR; } break; @@ -170,6 +227,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid; pStmt->bInfo.tbSuid = pTableMeta->suid; + pStmt->bInfo.tbVgId = pTableMeta->vgId; pStmt->bInfo.tbType = pTableMeta->tableType; pStmt->bInfo.boundTags = tags; pStmt->bInfo.tagsCached = false; @@ -195,6 +253,9 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SNam STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash)); pStmt->sql.autoCreateTbl = autoCreateTbl; + if (pStmt->sql.autoCreateTbl) { + pStmt->sql.stbInterlaceMode = false; + } return TSDB_CODE_SUCCESS; } @@ -261,14 +322,46 @@ int32_t stmtParseSql(STscStmt* pStmt) { STMT_ERR_RET(stmtCreateRequest(pStmt)); + pStmt->stat.parseSqlNum++; STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb)); - + pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery; + pStmt->bInfo.needParse = false; if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) { pStmt->sql.type = STMT_TYPE_INSERT; + pStmt->sql.stbInterlaceMode = false; } else if (pStmt->sql.pQuery->pPrepareRoot) { pStmt->sql.type = STMT_TYPE_QUERY; + pStmt->sql.stbInterlaceMode = false; + + return TSDB_CODE_SUCCESS; + } + + STableDataCxt** pSrc = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (NULL == pSrc || NULL == *pSrc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + STableDataCxt* pTableCtx = *pSrc; + if (pStmt->sql.stbInterlaceMode) { + int16_t lastIdx = -1; + + for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) { + if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) { + pStmt->sql.stbInterlaceMode = false; + break; + } + + lastIdx = pTableCtx->boundColsInfo.pColIndex[i]; + } + } + + if (NULL == pStmt->sql.pBindInfo) { + pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo)); + if (NULL == pStmt->sql.pBindInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; @@ -277,6 +370,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bInfo.tbUid = 0; pStmt->bInfo.tbSuid = 0; + pStmt->bInfo.tbVgId = -1; pStmt->bInfo.tbType = 0; pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; @@ -287,55 +381,96 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { qDestroyBoundColInfo(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags); } - memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN); + pStmt->bInfo.stbFName[0] = 0;; return TSDB_CODE_SUCCESS; } +void stmtFreeTableBlkList(STableColsData* pTb) { + qResetStmtColumns(pTb->aCol, true); + taosArrayDestroy(pTb->aCol); +} + +void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { + pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); + pTblBuf->buffIdx = 1; + pTblBuf->buffOffset = sizeof(*pQueue->head); + + pQueue->head = pQueue->tail = pTblBuf->pCurBuff; + pQueue->qRemainNum = 0; + pQueue->head->next = NULL; +} + int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { - if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) { - taos_free_result(pStmt->exec.pRequest); - pStmt->exec.pRequest = NULL; - } + if (pStmt->sql.stbInterlaceMode) { + if (deepClean) { + taosHashCleanup(pStmt->exec.pBlockHash); + pStmt->exec.pBlockHash = NULL; - size_t keyLen = 0; - void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); - while (pIter) { - STableDataCxt* pBlocks = *(STableDataCxt**)pIter; - char* key = taosHashGetKey(pIter, &keyLen); - STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); - - if (keepTable && pBlocks == pStmt->exec.pCurrBlock) { - TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData); - STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); - - pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); - continue; + if (NULL != pStmt->exec.pCurrBlock) { + taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData); + qDestroyStmtDataBlock(pStmt->exec.pCurrBlock); + } + } else { + pStmt->sql.siInfo.pTableColsIdx = 0; + stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue); + } + } else { + if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) { + taos_free_result(pStmt->exec.pRequest); + pStmt->exec.pRequest = NULL; } - qDestroyStmtDataBlock(pBlocks); - taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); + size_t keyLen = 0; + void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + while (pIter) { + STableDataCxt* pBlocks = *(STableDataCxt**)pIter; + char* key = taosHashGetKey(pIter, &keyLen); + STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); - pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + if (keepTable && pBlocks == pStmt->exec.pCurrBlock) { + TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData); + STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + continue; + } + + qDestroyStmtDataBlock(pBlocks); + taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + } + + if (keepTable) { + return TSDB_CODE_SUCCESS; + } + + taosHashCleanup(pStmt->exec.pBlockHash); + pStmt->exec.pBlockHash = NULL; + + tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pStmt->exec.pCurrTbData); } - if (keepTable) { - return TSDB_CODE_SUCCESS; - } - - taosHashCleanup(pStmt->exec.pBlockHash); - pStmt->exec.pBlockHash = NULL; - - tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); - taosMemoryFreeClear(pStmt->exec.pCurrTbData); - STMT_ERR_RET(stmtCleanBindInfo(pStmt)); return TSDB_CODE_SUCCESS; } +void stmtFreeTbBuf(void *buf) { + void* pBuf = *(void**)buf; + taosMemoryFree(pBuf); +} + +void stmtFreeTbCols(void *buf) { + SArray* pCols = *(SArray**)buf; + taosArrayDestroy(pCols); +} + int32_t stmtCleanSQLInfo(STscStmt* pStmt) { STMT_DLOG_E("start to free SQL info"); - + + taosMemoryFree(pStmt->sql.pBindInfo); taosMemoryFree(pStmt->sql.queryRes.fields); taosMemoryFree(pStmt->sql.queryRes.userFields); taosMemoryFree(pStmt->sql.sqlStr); @@ -360,34 +495,68 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true)); STMT_ERR_RET(stmtCleanBindInfo(pStmt)); + taos_free_result(pStmt->sql.siInfo.pRequest); + taosHashCleanup(pStmt->sql.siInfo.pVgroupHash); + tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash); + taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf); + taosMemoryFree(pStmt->sql.siInfo.pTSchema); + qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx); + taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols); + memset(&pStmt->sql, 0, sizeof(pStmt->sql)); + pStmt->sql.siInfo.tableColsReady = true; + STMT_DLOG_E("end to free SQL info"); return TSDB_CODE_SUCCESS; } -int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, - uint64_t suid) { - SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + +int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) { + if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) { + return TSDB_CODE_SUCCESS; + } + SVgroupInfo vgInfo = {0}; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, .requestId = pStmt->exec.pRequest->requestId, .requestObjRefId = pStmt->exec.pRequest->self, .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; - STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo)); - STMT_ERR_RET( - taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); + int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } - STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl)); + *vgId = vgInfo.vgId; + + return TSDB_CODE_SUCCESS; +} - STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId); + +int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, + uint64_t suid, int32_t vgId) { + STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId)); + STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl)); + + STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgId); return TSDB_CODE_SUCCESS; } int32_t stmtGetFromCache(STscStmt* pStmt) { + if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) { + pStmt->bInfo.needParse = false; + pStmt->bInfo.inExecCache = false; + return TSDB_CODE_SUCCESS; + } + pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; @@ -404,6 +573,11 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { } } + if (NULL == pStmt->pCatalog) { + STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); + pStmt->sql.siInfo.pCatalog = pStmt->pCatalog; + } + if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { if (pStmt->bInfo.inExecCache) { pStmt->bInfo.needParse = false; @@ -415,9 +589,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - if (NULL == pStmt->pCatalog) { - STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); - } if (pStmt->sql.autoCreateTbl) { SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid)); @@ -426,7 +597,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tbUid = 0; STableDataCxt* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid)); + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -443,12 +614,19 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_RET(stmtCleanBindInfo(pStmt)); } + uint64_t uid, suid; + int32_t vgId; + int8_t tableType; + STableMeta* pTableMeta = NULL; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, .requestId = pStmt->exec.pRequest->requestId, .requestObjRefId = pStmt->exec.pRequest->self, .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); + + pStmt->stat.ctgGetTbMetaNum++; + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { tscDebug("tb %s not exist", pStmt->bInfo.tbFName); stmtCleanBindInfo(pStmt); @@ -458,10 +636,14 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_ERR_RET(code); - uint64_t uid = pTableMeta->uid; - uint64_t suid = pTableMeta->suid; - int8_t tableType = pTableMeta->tableType; + uid = pTableMeta->uid; + suid = pTableMeta->suid; + tableType = pTableMeta->tableType; + pStmt->bInfo.tbVgId = pTableMeta->vgId; + vgId = pTableMeta->vgId; + taosMemoryFree(pTableMeta); + uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid; if (uid == pStmt->bInfo.tbUid) { @@ -493,7 +675,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid)); if (pCache) { pStmt->bInfo.needParse = false; @@ -505,7 +687,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tagsCached = true; STableDataCxt* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid)); + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -538,9 +720,102 @@ int32_t stmtResetStmt(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) { + +int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) { + SStmtQNode* pParam = (SStmtQNode*)param; + + if (pParam->restoreTbCols) { + for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) { + SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i); + *p = taosArrayInit(20, POINTER_BYTES); + } + + atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true); + } else { + STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock, &pStmt->sql.siInfo)); + + //taosMemoryFree(pParam->pTbData); + + atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + } + return TSDB_CODE_SUCCESS; +} + + +void *stmtBindThreadFunc(void *param) { + setThreadName("stmtBind"); + + qInfo("stmt bind thread started"); + + STscStmt* pStmt = (STscStmt*)param; + + while (true) { + if (atomic_load_8((int8_t *)&pStmt->queue.stopQueue)) { + break; + } + + SStmtQNode *asyncParam = NULL; + if (!stmtDequeue(pStmt, &asyncParam)) { + continue; + } + + stmtAsyncOutput(pStmt, asyncParam); + } + + qInfo("stmt bind thread stopped"); + + return NULL; +} + + +int32_t stmtStartBindThread(STscStmt* pStmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + STMT_ERR_RET(terrno); + } + + pStmt->bindThreadInUse = true; + + taosThreadAttrDestroy(&thAttr); + return TSDB_CODE_SUCCESS; +} + +int32_t stmtInitQueue(STscStmt* pStmt) { + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head)); + pStmt->queue.tail = pStmt->queue.head; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { + pTblBuf->buffUnit = sizeof(SStmtQNode); + pTblBuf->buffSize = pTblBuf->buffUnit * 1000; + pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES); + if (NULL == pTblBuf->pBufList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + void *buff = taosMemoryMalloc(pTblBuf->buffSize); + if (NULL == buff) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pTblBuf->pBufList, &buff); + + pTblBuf->pCurBuff = buff; + pTblBuf->buffIdx = 1; + pTblBuf->buffOffset = 0; + + return TSDB_CODE_SUCCESS; +} + +TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; + int32_t code = 0; pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); if (NULL == pStmt) { @@ -560,6 +835,45 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) { pStmt->sql.status = STMT_INIT; pStmt->reqid = reqid; + if (NULL != pOptions) { + memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); + if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) { + pStmt->stbInterlaceMode = true; + } + } + + if (pStmt->stbInterlaceMode) { + pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter; + pStmt->sql.siInfo.acctId = taos->acctId; + pStmt->sql.siInfo.dbname = taos->db; + pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pStmt->sql.siInfo.pTableHash) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stmtClose(pStmt); + return NULL; + } + pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); + if (NULL == pStmt->sql.siInfo.pTableCols) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stmtClose(pStmt); + return NULL; + } + + code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf); + if (TSDB_CODE_SUCCESS == code) { + stmtInitQueue(pStmt); + code = stmtStartBindThread(pStmt); + } + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + stmtClose(pStmt); + return NULL; + } + } + + pStmt->sql.siInfo.tableColsReady = true; + STMT_LOG_SEQ(STMT_INIT); tscDebug("stmt:%p initialized", pStmt); @@ -584,6 +898,33 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pStmt->sql.sqlStr = strndup(sql, length); pStmt->sql.sqlLen = length; + pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) { + STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (!pSrc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STableDataCxt* pDst = NULL; + + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true)); + pStmt->sql.siInfo.pDataCtx = pDst; + + SArray* pTblCols = NULL; + for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) { + pTblCols = taosArrayInit(20, POINTER_BYTES); + if (NULL == pTblCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + } + + pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; + return TSDB_CODE_SUCCESS; } @@ -591,6 +932,8 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STscStmt* pStmt = (STscStmt*)stmt; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG("start to set tbName: %s", tbName); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); @@ -602,21 +945,35 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STMT_ERR_RET(stmtCreateRequest(pStmt)); + if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) { + STMT_ERR_RET(stmtCreateRequest(pStmt)); - STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, - pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); + STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, + pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); + tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); + + STMT_ERR_RET(stmtGetFromCache(pStmt)); - STMT_ERR_RET(stmtGetFromCache(pStmt)); - - if (pStmt->bInfo.needParse) { + if (pStmt->bInfo.needParse) { + strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); + pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; + + STMT_ERR_RET(stmtParseSql(pStmt)); + } + } else { strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; - - STMT_ERR_RET(stmtParseSql(pStmt)); + pStmt->exec.pRequest->requestId++; + pStmt->bInfo.needParse = false; } + if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { + STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); + } + + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.setTbNameUs += startUs2 - startUs; + return TSDB_CODE_SUCCESS; } @@ -670,11 +1027,16 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STableDataCxt** pDataBlock = - (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); - if (NULL == pDataBlock) { - tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); - STMT_ERR_RET(TSDB_CODE_APP_ERROR); + STableDataCxt** pDataBlock = NULL; + + if (pStmt->sql.stbInterlaceMode) { + pDataBlock = &pStmt->sql.siInfo.pDataCtx; + } else { + pDataBlock = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (NULL == pDataBlock) { + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); + STMT_ERR_RET(TSDB_CODE_APP_ERROR); + } } STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields)); @@ -682,9 +1044,90 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields return TSDB_CODE_SUCCESS; } +/* +SArray* stmtGetFreeCol(STscStmt* pStmt, int32_t* idx) { + while (true) { + if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) { + pStmt->exec.smInfo.pColIdx = 0; + } + + if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) { + taosUsleep(1); + continue; + } + + *idx = pStmt->exec.smInfo.pColIdx; + return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++]; + } +} +*/ + +int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) { + if (NULL == pStmt->sql.siInfo.pVgroupHash) { + pStmt->sql.siInfo.pVgroupHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + } + if (NULL == pStmt->sql.siInfo.pVgroupList) { + pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES); + } + + if (NULL == pStmt->sql.siInfo.pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, (SRequestObj**)&pStmt->sql.siInfo.pRequest, + pStmt->reqid)); + + if (pStmt->reqid != 0) { + pStmt->reqid++; + } + pStmt->exec.pRequest->syncQuery = true; + + pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId; + pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self; + } + + if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) { + pStmt->sql.siInfo.tbFromHash = true; + } + + if (0 == pStmt->sql.siInfo.firstName[0]) { + strcpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName); + } + + param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; + param->next = NULL; + + atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + + stmtEnqueue(pStmt, param); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray** pTableCols) { + while (true) { + if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) { + *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++); + break; + } else { + SArray* pTblCols = NULL; + for (int32_t i = 0; i < 100; i++) { + pTblCols = taosArrayInit(20, POINTER_BYTES); + if (NULL == pTblCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + } + } + } + + return TSDB_CODE_SUCCESS; +} + int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); @@ -744,6 +1187,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { return TSDB_CODE_SUCCESS; } + if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { + STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt)); + } + STableDataCxt** pDataBlock = NULL; if (pStmt->exec.pCurrBlock) { @@ -756,15 +1203,50 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR); } pStmt->exec.pCurrBlock = *pDataBlock; + if (pStmt->sql.stbInterlaceMode) { + taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol); + pStmt->exec.pCurrBlock->pData->aCol = NULL; + } } + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.bindDataUs1 += startUs2 - startUs; + + SStmtQNode* param = NULL; + if (pStmt->sql.stbInterlaceMode) { + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, ¶m->tblData.aCol)); + taosArrayClear(param->tblData.aCol); + + //param->tblData.aCol = taosArrayInit(20, POINTER_BYTES); + + param->restoreTbCols = false; + strcpy(param->tblData.tbName, pStmt->bInfo.tbName); + } + + int64_t startUs3 = taosGetTimestampUs(); + pStmt->stat.bindDataUs2 += startUs3 - startUs2; + + SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol; + if (colIdx < 0) { - int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + if (pStmt->sql.stbInterlaceMode) { + (*pDataBlock)->pData->flags = 0; + code = qBindStmtStbColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo); + } else { + code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + } + if (code) { tscError("qBindStmtColsValue failed, error:%s", tstrerror(code)); STMT_ERR_RET(code); } } else { + if (pStmt->sql.stbInterlaceMode) { + tscError("bind single column not allowed in stb insert mode"); + STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); + } + if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) { tscError("bind column index not in sequence"); STMT_ERR_RET(TSDB_CODE_APP_ERROR); @@ -776,20 +1258,47 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { pStmt->bInfo.sBindRowNum = bind->num; } - qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, + qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum); } + int64_t startUs4 = taosGetTimestampUs(); + pStmt->stat.bindDataUs3 += startUs4 - startUs3; + + if (pStmt->sql.stbInterlaceMode) { + STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param)); + } + + pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4; + return TSDB_CODE_SUCCESS; } int stmtAddBatch(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG_E("start to add batch"); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); + if (pStmt->sql.stbInterlaceMode) { + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.addBatchUs += startUs2 - startUs; + + pStmt->sql.siInfo.tableColsReady = false; + + SStmtQNode* param = NULL; + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + param->restoreTbCols = true; + param->next = NULL; + + stmtEnqueue(pStmt, param); + + return TSDB_CODE_SUCCESS; + } + STMT_ERR_RET(stmtCacheBlock(pStmt)); return TSDB_CODE_SUCCESS; @@ -858,6 +1367,8 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); + pStmt->stat.ctgGetTbMetaNum++; + taos_free_result(pStmt->exec.pRequest); pStmt->exec.pRequest = NULL; @@ -879,11 +1390,58 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { return finalCode; } +/* +int stmtStaticModeExec(TAOS_STMT* stmt) { + STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; + SSubmitRsp* pRsp = NULL; + if (pStmt->sql.staticMode) { + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + STMT_DLOG_E("start to exec"); + + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); + + STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList, pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum)); + + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + + if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { + code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); + if (code) { + pStmt->exec.pRequest->code = code; + } else { + tFreeSSubmitRsp(pRsp); + STMT_ERR_RET(stmtResetStmt(pStmt)); + STMT_ERR_RET(TSDB_CODE_NEED_RETRY); + } + } + + STMT_ERR_JRET(pStmt->exec.pRequest->code); + + pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); + pStmt->affectedRows += pStmt->exec.affectedRows; + +_return: + + stmtCleanExecInfo(pStmt, (code ? false : true), false); + + tFreeSSubmitRsp(pRsp); + + ++pStmt->sql.runTimes; + + STMT_RET(code); +} +*/ + int stmtExec(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; SSubmitRsp* pRsp = NULL; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG_E("start to exec"); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); @@ -891,12 +1449,26 @@ int stmtExec(TAOS_STMT* stmt) { if (STMT_TYPE_QUERY == pStmt->sql.type) { launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { - tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); - taosMemoryFreeClear(pStmt->exec.pCurrTbData); + if (pStmt->sql.stbInterlaceMode) { + int64_t startTs = taosGetTimestampUs(); + while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { + taosUsleep(1); + } + pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs; + + STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList)); + taosHashCleanup(pStmt->sql.siInfo.pVgroupHash); + pStmt->sql.siInfo.pVgroupHash = NULL; + pStmt->sql.siInfo.pVgroupList = NULL; + } else { + tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pStmt->exec.pCurrTbData); - STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); + STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); - STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); + STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); + } + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } @@ -918,12 +1490,19 @@ int stmtExec(TAOS_STMT* stmt) { _return: - stmtCleanExecInfo(pStmt, (code ? false : true), false); + while (0 == atomic_load_8((int8_t*)&pStmt->sql.siInfo.tableColsReady)) { + taosUsleep(1); + } + stmtCleanExecInfo(pStmt, (code ? false : true), false); + tFreeSSubmitRsp(pRsp); ++pStmt->sql.runTimes; + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.execUseUs += startUs2 - startUs; + STMT_RET(code); } @@ -932,6 +1511,21 @@ int stmtClose(TAOS_STMT* stmt) { STMT_DLOG_E("start to free stmt"); + pStmt->queue.stopQueue = true; + + if (pStmt->bindThreadInUse) { + taosThreadJoin(pStmt->bindThread, NULL); + pStmt->bindThreadInUse = false; + } + + STMT_DLOG("stmt %p closed, stbInterlaceMode: %d, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64 + ", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u" + ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64 ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64, + pStmt, pStmt->sql.stbInterlaceMode, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo, pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, + pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND], pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], + pStmt->stat.setTbNameUs, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4, + pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); + stmtCleanSQLInfo(pStmt); taosMemoryFree(stmt); @@ -1027,6 +1621,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) { taos_free_result(pStmt->exec.pRequest); pStmt->exec.pRequest = NULL; + STMT_ERR_RET(stmtCreateRequest(pStmt)); } STMT_ERRI_JRET(stmtCreateRequest(pStmt)); @@ -1062,6 +1657,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { } STMT_ERR_RET(stmtCreateRequest(pStmt)); + if (pStmt->bInfo.needParse) { STMT_ERR_RET(stmtParseSql(pStmt)); } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 0e80d431d1..698714da86 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -426,6 +426,79 @@ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { return code; } +static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *param) { + if (((SBindInfo *)p1)->columnId < ((SBindInfo *)p2)->columnId) { + return -1; + } else if (((SBindInfo *)p1)->columnId > ((SBindInfo *)p2)->columnId) { + return 1; + } + return 0; +} + +/* build rows to `rowArray` from bind + * `infos` is the bind information array + * `numOfInfos` is the number of bind information + * `infoSorted` is whether the bind information is sorted by column id + * `pTSchema` is the schema of the table + * `rowArray` is the array to store the rows + */ +int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema, + SArray *rowArray) { + if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + if (!infoSorted) { + taosqsort_r(infos, numOfInfos, sizeof(SBindInfo), NULL, tBindInfoCompare); + } + + int32_t code = 0; + int32_t numOfRows = infos[0].bind->num; + SArray *colValArray; + SColVal colVal; + + if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t iRow = 0; iRow < numOfRows; iRow++) { + taosArrayClear(colValArray); + + for (int32_t iInfo = 0; iInfo < numOfInfos; iInfo++) { + if (infos[iInfo].bind->is_null && infos[iInfo].bind->is_null[iRow]) { + colVal = COL_VAL_NULL(infos[iInfo].columnId, infos[iInfo].type); + } else { + SValue value = { + .type = infos[iInfo].type, + }; + if (IS_VAR_DATA_TYPE(infos[iInfo].type)) { + value.nData = infos[iInfo].bind->length[iRow]; + value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow; + } else { + memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow, + infos[iInfo].bind->buffer_length); + } + colVal = COL_VAL_VALUE(infos[iInfo].columnId, value); + } + taosArrayPush(colValArray, &colVal); + } + + SRow *row; + if ((code = tRowBuild(colValArray, pTSchema, &row))) { + goto _exit; + } + + if ((taosArrayPush(rowArray, &row)) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + +_exit: + taosArrayDestroy(colValArray); + return code; +} + int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { ASSERT(iCol < pTSchema->numOfCols); ASSERT(pRow->sver == pTSchema->version); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f034244c69..c68dc85c29 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -776,7 +776,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { // min free disk space used to check if the disk is full [50MB, 1GB] if (cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; // clang-format on @@ -1299,8 +1299,8 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi return 0; } -int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, - const char *envFile, char *apolloUrl, SArray *pArgs) { +int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, + SArray *pArgs) { if (tsCfg == NULL) osDefaultInit(); SConfig *pCfg = cfgInit(); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f0cb2b7d9d..d465e24d5b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9737,6 +9737,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { for (int32_t i = 0; i < nRow; ++i) { tRowDestroy(rows[i]); + rows[i] = NULL; } taosArrayDestroy(pTbData->aRowP); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 4de0086f96..d02aec98ca 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -14,9 +14,8 @@ */ #define _DEFAULT_SOURCE -#include -#include "tjson.h" #include "mndDnode.h" +#include #include "audit.h" #include "mndCluster.h" #include "mndDb.h" @@ -28,9 +27,10 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "taos_monitor.h" +#include "tjson.h" #include "tmisce.h" #include "tunit.h" -#include "taos_monitor.h" #define TSDB_DNODE_VER_NUMBER 2 #define TSDB_DNODE_RESERVE_SIZE 40 @@ -191,8 +191,8 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pDnode->machineId, TSDB_MACHINE_ID_LEN, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER) - SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible - SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible + SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible + SDB_SET_INT16(pRaw, dataPos, 0, _OVER) // forward/backward compatible SDB_SET_DATALEN(pRaw, dataPos, _OVER); terrno = 0; @@ -536,49 +536,49 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { int32_t code = -1; char strClusterId[TSDB_CLUSTER_ID_LEN] = {0}; - sprintf(strClusterId, "%"PRId64, pMnode->clusterId); + sprintf(strClusterId, "%" PRId64, pMnode->clusterId); if (tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return code; } - if(tsMonitorLogProtocol){ + if (tsMonitorLogProtocol) { mInfo("process statis req,\n %s", statisReq.pCont); } - SJson* pJson = tjsonParse(statisReq.pCont); + SJson *pJson = tjsonParse(statisReq.pCont); int32_t ts_size = tjsonGetArraySize(pJson); - for(int32_t i = 0; i < ts_size; i++){ - SJson* item = tjsonGetArrayItem(pJson, i); + for (int32_t i = 0; i < ts_size; i++) { + SJson *item = tjsonGetArrayItem(pJson, i); - SJson* tables = tjsonGetObjectItem(item, "tables"); + SJson *tables = tjsonGetObjectItem(item, "tables"); int32_t tableSize = tjsonGetArraySize(tables); - for(int32_t i = 0; i < tableSize; i++){ - SJson* table = tjsonGetArrayItem(tables, i); + for (int32_t i = 0; i < tableSize; i++) { + SJson *table = tjsonGetArrayItem(tables, i); char tableName[MONITOR_TABLENAME_LEN] = {0}; tjsonGetStringValue(table, "name", tableName); - SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups"); + SJson *metricGroups = tjsonGetObjectItem(table, "metric_groups"); int32_t size = tjsonGetArraySize(metricGroups); - for(int32_t i = 0; i < size; i++){ - SJson* item = tjsonGetArrayItem(metricGroups, i); + for (int32_t i = 0; i < size; i++) { + SJson *item = tjsonGetArrayItem(metricGroups, i); - SJson* arrayTag = tjsonGetObjectItem(item, "tags"); + SJson *arrayTag = tjsonGetObjectItem(item, "tags"); int32_t tagSize = tjsonGetArraySize(arrayTag); - for(int32_t j = 0; j < tagSize; j++){ - SJson* item = tjsonGetArrayItem(arrayTag, j); + for (int32_t j = 0; j < tagSize; j++) { + SJson *item = tjsonGetArrayItem(arrayTag, j); char tagName[MONITOR_TAG_NAME_LEN] = {0}; tjsonGetStringValue(item, "name", tagName); - if(strncmp(tagName, "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { + if (strncmp(tagName, "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { tjsonDeleteItemFromObject(item, "value"); tjsonAddStringToObject(item, "value", strClusterId); } @@ -590,12 +590,12 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { char *pCont = tjsonToString(pJson); monSendContent(pCont); - if(pJson != NULL){ + if (pJson != NULL) { tjsonDelete(pJson); pJson = NULL; } - if(pCont != NULL){ + if (pCont != NULL) { taosMemoryFree(pCont); pCont = NULL; } @@ -603,132 +603,132 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { tFreeSStatisReq(&statisReq); return 0; -/* - SJson* pJson = tjsonParse(statisReq.pCont); + /* + SJson* pJson = tjsonParse(statisReq.pCont); - int32_t ts_size = tjsonGetArraySize(pJson); + int32_t ts_size = tjsonGetArraySize(pJson); - for(int32_t i = 0; i < ts_size; i++){ - SJson* item = tjsonGetArrayItem(pJson, i); + for(int32_t i = 0; i < ts_size; i++){ + SJson* item = tjsonGetArrayItem(pJson, i); - SJson* tables = tjsonGetObjectItem(item, "tables"); + SJson* tables = tjsonGetObjectItem(item, "tables"); - int32_t tableSize = tjsonGetArraySize(tables); - for(int32_t i = 0; i < tableSize; i++){ - SJson* table = tjsonGetArrayItem(tables, i); + int32_t tableSize = tjsonGetArraySize(tables); + for(int32_t i = 0; i < tableSize; i++){ + SJson* table = tjsonGetArrayItem(tables, i); - char tableName[MONITOR_TABLENAME_LEN] = {0}; - tjsonGetStringValue(table, "name", tableName); + char tableName[MONITOR_TABLENAME_LEN] = {0}; + tjsonGetStringValue(table, "name", tableName); - SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups"); + SJson* metricGroups = tjsonGetObjectItem(table, "metric_groups"); - int32_t size = tjsonGetArraySize(metricGroups); - for(int32_t i = 0; i < size; i++){ - SJson* item = tjsonGetArrayItem(metricGroups, i); + int32_t size = tjsonGetArraySize(metricGroups); + for(int32_t i = 0; i < size; i++){ + SJson* item = tjsonGetArrayItem(metricGroups, i); - SJson* arrayTag = tjsonGetObjectItem(item, "tags"); + SJson* arrayTag = tjsonGetObjectItem(item, "tags"); - int32_t tagSize = tjsonGetArraySize(arrayTag); + int32_t tagSize = tjsonGetArraySize(arrayTag); - char** labels = taosMemoryMalloc(sizeof(char*) * tagSize); - char** sample_labels = taosMemoryMalloc(sizeof(char*) * tagSize); + char** labels = taosMemoryMalloc(sizeof(char*) * tagSize); + char** sample_labels = taosMemoryMalloc(sizeof(char*) * tagSize); - for(int32_t j = 0; j < tagSize; j++){ - SJson* item = tjsonGetArrayItem(arrayTag, j); + for(int32_t j = 0; j < tagSize; j++){ + SJson* item = tjsonGetArrayItem(arrayTag, j); - *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); - tjsonGetStringValue(item, "name", *(labels + j)); + *(labels + j) = taosMemoryMalloc(MONITOR_TAG_NAME_LEN); + tjsonGetStringValue(item, "name", *(labels + j)); - *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); - tjsonGetStringValue(item, "value", *(sample_labels + j)); - if(strncmp(*(labels + j), "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { - strncpy(*(sample_labels + j), strClusterId, MONITOR_TAG_VALUE_LEN); + *(sample_labels + j) = taosMemoryMalloc(MONITOR_TAG_VALUE_LEN); + tjsonGetStringValue(item, "value", *(sample_labels + j)); + if(strncmp(*(labels + j), "cluster_id", MONITOR_TAG_NAME_LEN) == 0) { + strncpy(*(sample_labels + j), strClusterId, MONITOR_TAG_VALUE_LEN); + } } - } - SJson* metrics = tjsonGetObjectItem(item, "metrics"); + SJson* metrics = tjsonGetObjectItem(item, "metrics"); - int32_t metricLen = tjsonGetArraySize(metrics); - for(int32_t j = 0; j < metricLen; j++){ - SJson *item = tjsonGetArrayItem(metrics, j); + int32_t metricLen = tjsonGetArraySize(metrics); + for(int32_t j = 0; j < metricLen; j++){ + SJson *item = tjsonGetArrayItem(metrics, j); - char name[MONITOR_METRIC_NAME_LEN] = {0}; - tjsonGetStringValue(item, "name", name); + char name[MONITOR_METRIC_NAME_LEN] = {0}; + tjsonGetStringValue(item, "name", name); - double value = 0; - tjsonGetDoubleValue(item, "value", &value); + double value = 0; + tjsonGetDoubleValue(item, "value", &value); - double type = 0; - tjsonGetDoubleValue(item, "type", &type); + double type = 0; + tjsonGetDoubleValue(item, "type", &type); - int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; - char* metricName = taosMemoryMalloc(metricNameLen); - memset(metricName, 0, metricNameLen); - sprintf(metricName, "%s:%s", tableName, name); + int32_t metricNameLen = strlen(name) + strlen(tableName) + 2; + char* metricName = taosMemoryMalloc(metricNameLen); + memset(metricName, 0, metricNameLen); + sprintf(metricName, "%s:%s", tableName, name); - taos_metric_t* metric = taos_collector_registry_get_metric(metricName); - if(metric == NULL){ - if(type == 0){ - metric = taos_counter_new(metricName, "", tagSize, (const char**)labels); - } - if(type == 1){ - metric = taos_gauge_new(metricName, "", tagSize, (const char**)labels); - } - mTrace("fail to get metric from registry, new one metric:%p", metric); - - if(taos_collector_registry_register_metric(metric) == 1){ + taos_metric_t* metric = taos_collector_registry_get_metric(metricName); + if(metric == NULL){ if(type == 0){ - taos_counter_destroy(metric); + metric = taos_counter_new(metricName, "", tagSize, (const char**)labels); } if(type == 1){ - taos_gauge_destroy(metric); + metric = taos_gauge_new(metricName, "", tagSize, (const char**)labels); } + mTrace("fail to get metric from registry, new one metric:%p", metric); - metric = taos_collector_registry_get_metric(metricName); + if(taos_collector_registry_register_metric(metric) == 1){ + if(type == 0){ + taos_counter_destroy(metric); + } + if(type == 1){ + taos_gauge_destroy(metric); + } - mTrace("fail to register metric, get metric from registry:%p", metric); + metric = taos_collector_registry_get_metric(metricName); + + mTrace("fail to register metric, get metric from registry:%p", metric); + } + else{ + mTrace("succeed to register metric:%p", metric); + } } else{ - mTrace("succeed to register metric:%p", metric); + mTrace("get metric from registry:%p", metric); } - } - else{ - mTrace("get metric from registry:%p", metric); + + if(type == 0){ + taos_counter_add(metric, value, (const char**)sample_labels); + } + if(type == 1){ + taos_gauge_set(metric, value, (const char**)sample_labels); + } + + taosMemoryFreeClear(metricName); } - if(type == 0){ - taos_counter_add(metric, value, (const char**)sample_labels); - } - if(type == 1){ - taos_gauge_set(metric, value, (const char**)sample_labels); + for(int32_t j = 0; j < tagSize; j++){ + taosMemoryFreeClear(*(labels + j)); + taosMemoryFreeClear(*(sample_labels + j)); } - taosMemoryFreeClear(metricName); + taosMemoryFreeClear(sample_labels); + taosMemoryFreeClear(labels); } - - for(int32_t j = 0; j < tagSize; j++){ - taosMemoryFreeClear(*(labels + j)); - taosMemoryFreeClear(*(sample_labels + j)); - } - - taosMemoryFreeClear(sample_labels); - taosMemoryFreeClear(labels); } + } - } + code = 0; - code = 0; + _OVER: + if(pJson != NULL){ + tjsonDelete(pJson); + pJson = NULL; + } -_OVER: - if(pJson != NULL){ - tjsonDelete(pJson); - pJson = NULL; - } - - tFreeSStatisReq(&statisReq); - return code; - */ + tFreeSStatisReq(&statisReq); + return code; + */ } static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) { @@ -816,8 +816,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes; bool encryptKeyChanged = pDnode->encryptionKeyChksum != statusReq.clusterCfg.encryptionKeyChksum; + bool enableWhiteListChanged = statusReq.clusterCfg.enableWhiteList != (tsEnableWhiteList ? 1 : 0); bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || - pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged; + pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged; const STraceId *trace = &pReq->info.traceId; mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3fb298e1ea..699865a9bf 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2366,7 +2366,10 @@ EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) { } if (pResult->ts < pBlockInfo->window.skey) { return FUNC_DATA_REQUIRED_NOT_LOAD; - } else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) { + } else if (pResult->ts == pBlockInfo->window.skey) { + if (NULL == pResult->pkData) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 0, TSDB_ORDER_ASC) < 0) { return FUNC_DATA_REQUIRED_NOT_LOAD; } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index f41b5525f3..d7e5d87e80 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -25,8 +25,10 @@ #include "tref.h" #include "tsched.h" -#define INDEX_NUM_OF_THREADS 5 -#define INDEX_QUEUE_SIZE 200 +#define INDEX_NUM_OF_THREADS 5 +#define INDEX_MAX_NUM_OF_THREADS 10 + +#define INDEX_QUEUE_SIZE 200 #define INDEX_DATA_BOOL_NULL 0x02 #define INDEX_DATA_TINYINT_NULL 0x80 @@ -61,6 +63,7 @@ static void indexDestroy(void* sIdx); void indexInit(int32_t threadNum) { indexThreads = threadNum; if (indexThreads <= 1) indexThreads = INDEX_NUM_OF_THREADS; + if (indexThreads >= INDEX_MAX_NUM_OF_THREADS) indexThreads = INDEX_MAX_NUM_OF_THREADS; } void indexEnvInit() { // refactor later diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 1988620539..4f2877fcf6 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -51,7 +51,8 @@ int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals); int32_t initTableColSubmitData(STableDataCxt *pTableCxt); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild); -int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); +//int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum); +int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks, bool append); void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); void insDestroyVgroupDataCxtList(SArray *pVgCxtList); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index fcb5588717..db2d34b844 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -446,7 +446,7 @@ int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) { uError("insMergeTableDataCxt failed"); return code; } - code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); if (code != TSDB_CODE_SUCCESS) { uError("insBuildVgDataBlocks failed"); return code; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 22f274b21c..53b9805267 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2474,7 +2474,7 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt taosHashClear(pStmt->pTableCxtHashObj); if (TSDB_CODE_SUCCESS == code) { - code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); } return code; @@ -2523,9 +2523,14 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext; if (!reentry) { - pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - pStmt->pTableBlockHashObj = - taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pCxt->pComCxt->pStmtCb) { + pStmt->pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } else { + pStmt->pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + } } pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index bdeb548bd7..648a119712 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -51,6 +51,48 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) { return TSDB_CODE_SUCCESS; } +int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) { + // merge according to vgId + return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo); +} + +int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) { + int32_t code = TSDB_CODE_SUCCESS; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true); + } + + if (pStmt->freeArrayFunc) { + pStmt->freeArrayFunc(pVgDataBlocks); + } + return code; +} + + +/* +int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* pVgDataBlocks = NULL; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + // merge according to vgId + if (tbNum > 0) { + code = insMergeStmtTableDataCxt(pTbCtx, pBlockList, &pVgDataBlocks, true, tbNum); + } + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); + } + + if (pStmt->freeArrayFunc) { + pStmt->freeArrayFunc(pVgDataBlocks); + } + return code; +} +*/ + int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; @@ -60,8 +102,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash if (taosHashGetSize(pBlockHash) > 0) { code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true); } + if (TSDB_CODE_SUCCESS == code) { - code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks, false); } if (pStmt->freeArrayFunc) { @@ -233,7 +276,74 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* return TSDB_CODE_SUCCESS; } -int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { +int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos) { + STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; + SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); + SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; + SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; + int32_t rowNum = bind->num; + TAOS_MULTI_BIND ncharBind = {0}; + TAOS_MULTI_BIND* pBind = NULL; + int32_t code = 0; + int16_t lastColId = -1; + bool colInOrder = true; + + if (NULL == *pTSchema) { + *pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion); + } + + for (int c = 0; c < boundInfo->numOfBound; ++c) { + SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + if (pColSchema->colId <= lastColId) { + colInOrder = false; + } else { + lastColId = pColSchema->colId; + } + //SColData* pCol = taosArrayGet(pCols, c); + + if (bind[c].num != rowNum) { + code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); + goto _return; + } + + if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type + code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + goto _return; + } + + if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) { + code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind); + if (code) { + goto _return; + } + pBind = &ncharBind; + } else { + pBind = bind + c; + } + + pBindInfos[c].columnId = pColSchema->colId; + pBindInfos[c].bind = pBind; + pBindInfos[c].type = pColSchema->type; + + //code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); + //if (code) { + // goto _return; + //} + } + + code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols); + + qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum); + +_return: + + taosMemoryFree(ncharBind.buffer); + taosMemoryFree(ncharBind.length); + + return code; +} + +int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; @@ -245,7 +355,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in for (int c = 0; c < boundInfo->numOfBound; ++c) { SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; - SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c); + SColData* pCol = taosArrayGet(pCols, c); if (bind[c].num != rowNum) { code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); @@ -283,14 +393,15 @@ _return: return code; } -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, + +int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]]; - SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx); + SColData* pCol = taosArrayGet(pCols, colIdx); TAOS_MULTI_BIND ncharBind = {0}; TAOS_MULTI_BIND* pBind = NULL; int32_t code = 0; @@ -393,6 +504,22 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel return TSDB_CODE_SUCCESS; } +int32_t qResetStmtColumns(SArray* pCols, bool deepClear) { + int32_t colNum = taosArrayGetSize(pCols); + + for (int32_t i = 0; i < colNum; ++i) { + SColData* pCol = (SColData*)taosArrayGet(pCols, i); + if (deepClear) { + tColDataDeepClear(pCol); + } else { + tColDataClear(pCol); + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) { STableDataCxt* pBlock = (STableDataCxt*)block; int32_t colNum = taosArrayGetSize(pBlock->pData->aCol); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 7c43113ad2..028866064d 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -21,6 +21,7 @@ #include "querynodes.h" #include "tRealloc.h" #include "tdatablock.h" +#include "tmisce.h" void qDestroyBoundColInfo(void* pInfo) { if (NULL == pInfo) { @@ -429,7 +430,7 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { taosHashCleanup(pTableCxtHash); } -static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild) { +static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) { if (NULL == pVgCxt->pData->aSubmitTbData) { pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); if (NULL == pVgCxt->pData->aSubmitTbData) { @@ -441,7 +442,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); if (isRebuild) { rebuildTableData(pTableCxt->pData, &pTableCxt->pData); - } else { + } else if (clear) { taosMemoryFreeClear(pTableCxt->pData); } @@ -486,6 +487,203 @@ int insColDataComp(const void* lp, const void* rp) { return 0; } + +int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId, STableColsData* pTbData, SName* sname) { + if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) { + return TSDB_CODE_SUCCESS; + } + + SVgroupInfo vgInfo = {0}; + SRequestConnInfo conn = {.pTrans = pBuildInfo->transport, + .requestId = pBuildInfo->requestId, + .requestObjRefId = pBuildInfo->requestSelf, + .mgmtEps = pBuildInfo->mgmtEpSet}; + + int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData, uint64_t* uid, int32_t* vgId) { + STableVgUid* pTbInfo = NULL; + int32_t code = 0; + + if (pTbData->getFromHash) { + pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName)); + } + + if (NULL == pTbInfo) { + SName sname; + qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0); + + STableMeta* pTableMeta = NULL; + SRequestConnInfo conn = {.pTrans = pBuildInfo->transport, + .requestId = pBuildInfo->requestId, + .requestObjRefId = pBuildInfo->requestSelf, + .mgmtEps = pBuildInfo->mgmtEpSet}; + code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta); + + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { + parserDebug("tb %s.%s not exist", sname.dbname, sname.tname); + return code; + } + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + *uid = pTableMeta->uid; + *vgId = pTableMeta->vgId; + + STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId}; + tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo)); + + code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname); + + taosMemoryFree(pTableMeta); + } else { + *uid = pTbInfo->uid; + *vgId = pTbInfo->vgid; + } + + return code; +} + + +int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) { + int32_t code = TSDB_CODE_SUCCESS; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true); + } + + return code; +} + + + +int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) { + int32_t code = TSDB_CODE_SUCCESS; + uint64_t uid; + int32_t vgId; + + pTbCtx->pData->aRowP = pTbData->aCol; + + code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + pTbCtx->pMeta->vgId = vgId; + pTbCtx->pMeta->uid = uid; + pTbCtx->pData->uid = uid; + + if (!pTbCtx->ordered) { + code = tRowSort(pTbCtx->pData->aRowP); + } + if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) { + code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0); + } + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + SVgroupDataCxt* pVgCxt = NULL; + void** pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt); + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + + if (TSDB_CODE_SUCCESS == code) { + code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false); + } + + if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) { + code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList); + //taosArrayClear(pVgCxt->pData->aSubmitTbData); + tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE); + //insDestroyVgroupDataCxt(pVgCxt); + } + + return code; +} + +/* +int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum) { + SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); + if (NULL == pVgroupHash || NULL == pVgroupList) { + taosHashCleanup(pVgroupHash); + taosArrayDestroy(pVgroupList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + + for (int32_t i = 0; i < tbNum; ++i) { + STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i); + pTableCxt->pMeta->vgId = pTableCols->vgId; + pTableCxt->pMeta->uid = pTableCols->uid; + pTableCxt->pData->uid = pTableCols->uid; + pTableCxt->pData->aCol = pTableCols->aCol; + + SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0); + if (pCol->nVal <= 0) { + continue; + } + + if (pTableCxt->pData->pCreateTbReq) { + pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + } + + taosArraySort(pTableCxt->pData->aCol, insColDataComp); + + tColDataSortMerge(pTableCxt->pData->aCol); + + if (TSDB_CODE_SUCCESS == code) { + SVgroupDataCxt* pVgCxt = NULL; + int32_t vgId = pTableCxt->pMeta->vgId; + void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + if (TSDB_CODE_SUCCESS == code) { + code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false); + } + } + } + + taosHashCleanup(pVgroupHash); + if (TSDB_CODE_SUCCESS == code) { + *pVgDataBlocks = pVgroupList; + } else { + insDestroyVgroupDataCxtList(pVgroupList); + } + + return code; +} +*/ + int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); @@ -546,7 +744,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool pVgCxt = *(SVgroupDataCxt**)pp; } if (TSDB_CODE_SUCCESS == code) { - code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild); + code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true); } } if (TSDB_CODE_SUCCESS == code) { @@ -599,9 +797,9 @@ static void destroyVgDataBlocks(void* p) { taosMemoryFree(pVg); } -int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) { +int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) { size_t numOfVg = taosArrayGetSize(pVgDataCxtList); - SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES); if (NULL == pDataBlocks) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -609,6 +807,9 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, int32_t code = TSDB_CODE_SUCCESS; for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) { SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i); + if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) { + continue; + } SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); if (NULL == dst) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -626,6 +827,13 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, } } + if (append) { + if (NULL == *pVgDataBlocks) { + *pVgDataBlocks = pDataBlocks; + } + return code; + } + if (TSDB_CODE_SUCCESS == code) { *pVgDataBlocks = pDataBlocks; } else { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 57f2543691..b9cf5d48f0 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -177,7 +177,7 @@ __compar_fn_t gUint64SignCompare[] = {compareUint64Int8, compareUint64Int16, co compareUint64Int64, compareUint64Float, compareUint64Double}; __compar_fn_t gUint64UsignCompare[] = {compareUint64Uint8, compareUint64Uint16, compareUint64Uint32, compareUint64Val}; -int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { +int8_t filterGetCompFuncIdx(int32_t type, int32_t optr, bool scalarMode) { int8_t comparFn = 0; if (optr == OP_TYPE_IN && (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_VARBINARY && @@ -290,9 +290,9 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { case TSDB_DATA_TYPE_NCHAR: { if (optr == OP_TYPE_MATCH) { - comparFn = 28; + comparFn = scalarMode ? 28 : 19; } else if (optr == OP_TYPE_NMATCH) { - comparFn = 29; + comparFn = scalarMode ? 29 : 20; } else if (optr == OP_TYPE_LIKE) { comparFn = 9; } else if (optr == OP_TYPE_NOT_LIKE) { @@ -343,7 +343,7 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { return comparFn; } -__compar_fn_t filterGetCompFunc(int32_t type, int32_t optr) { return gDataCompare[filterGetCompFuncIdx(type, optr)]; } +__compar_fn_t filterGetCompFunc(int32_t type, int32_t optr) { return gDataCompare[filterGetCompFuncIdx(type, optr, true)]; } __compar_fn_t filterGetCompFuncEx(int32_t lType, int32_t rType, int32_t optr) { if (TSDB_DATA_TYPE_NULL == rType || TSDB_DATA_TYPE_JSON == rType) { @@ -2785,7 +2785,7 @@ int32_t filterGenerateComInfo(SFilterInfo *info) { for (uint32_t i = 0; i < info->unitNum; ++i) { SFilterUnit *unit = &info->units[i]; - info->cunits[i].func = filterGetCompFuncIdx(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr); // set terrno if err + info->cunits[i].func = filterGetCompFuncIdx(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr, false); // set terrno if err info->cunits[i].rfunc = filterGetRangeCompFuncFromOptrs(unit->compare.optr, unit->compare.optr2); info->cunits[i].optr = FILTER_UNIT_OPTR(unit); info->cunits[i].colData = NULL; diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 0acd6f67f5..d2519d06c1 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -17,15 +17,6 @@ #include #include "os.h" -#ifdef WINDOWS -#define THREAD_PTR_CHECK(p) \ - do { \ - if (!(p) || !(*(p))) return 0; \ - } while (0); -#else -#define THREAD_PTR_CHECK(p) -#endif - int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) { return pthread_create(tid, attr, start, arg); } @@ -126,7 +117,6 @@ int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { } return 0; #else - THREAD_PTR_CHECK(mutex) return pthread_cond_wait(cond, mutex); #endif } @@ -140,7 +130,6 @@ int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const } return EINVAL; #else - THREAD_PTR_CHECK(mutex) return pthread_cond_timedwait(cond, mutex, abstime); #endif } @@ -211,7 +200,6 @@ int32_t taosThreadKeyDelete(TdThreadKey key) { return pthread_key_delete(key); } int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(thread, sig); } // int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) { -// THREAD_PTR_CHECK(mutex) // return pthread_mutex_consistent(mutex); // } @@ -220,7 +208,6 @@ int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { DeleteCriticalSection(mutex); return 0; #else - THREAD_PTR_CHECK(mutex) return pthread_mutex_destroy(mutex); #endif } @@ -244,7 +231,6 @@ int32_t taosThreadMutexLock(TdThreadMutex *mutex) { EnterCriticalSection(mutex); return 0; #else - THREAD_PTR_CHECK(mutex) return pthread_mutex_lock(mutex); #endif } @@ -258,7 +244,6 @@ int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { if (TryEnterCriticalSection(mutex)) return 0; return EBUSY; #else - THREAD_PTR_CHECK(mutex) return pthread_mutex_trylock(mutex); #endif } @@ -268,7 +253,6 @@ int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { LeaveCriticalSection(mutex); return 0; #else - THREAD_PTR_CHECK(mutex) return pthread_mutex_unlock(mutex); #endif } @@ -461,7 +445,6 @@ int32_t taosThreadSetSchedParam(TdThread thread, int32_t policy, const struct sc int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) { return pthread_setspecific(key, value); } int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) { - THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_destroy((pthread_mutex_t *)lock); #else @@ -480,7 +463,6 @@ int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) { } int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { - THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_lock((pthread_mutex_t *)lock); #else @@ -489,7 +471,6 @@ int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { - THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_trylock((pthread_mutex_t *)lock); #else @@ -498,7 +479,6 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { - THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_unlock((pthread_mutex_t *)lock); #else diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 0e3b50974a..eb959f2bd4 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -124,6 +124,7 @@ int queryColumnTest(TAOS_STMT *stmt, TAOS *taos); int queryMiscTest(TAOS_STMT *stmt, TAOS *taos); int insertNonExistsTb(TAOS_STMT *stmt, TAOS *taos); int insertVarLenErr(TAOS_STMT *stmt, TAOS *taos); +int insertStbTest(TAOS_STMT *stmt, TAOS *taos); enum { TTYPE_INSERT = 1, @@ -148,53 +149,56 @@ typedef struct { int32_t bindNullNum; int32_t runTimes; int32_t preCaseIdx; + bool stbInsert; } CaseCfg; CaseCfg gCase[] = { - {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 1, 10, 10, 0, 0, 0, 1, -1}, - {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 100, 10, 0, 0, 0, 1, -1}, + {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 1, 10, 10, 0, 0, 0, 1, -1, false}, + {"insert:MBSE0-FULL", tListLen(shortColList), shortColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 100, 10, 0, 0, 0, 1, -1, false}, - {"insert:MBSE1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBSE1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBSE1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBSE1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest1, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBSE1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBSE1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest1, 10, 10, 2, 2, 0, 0, 1, -1, false}, - {"insert:MBSE2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest2, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBSE2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBSE2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBSE2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBSETest2, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBSE2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBSE2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBSETest2, 10, 10, 2, 2, 0, 0, 1, -1, false}, - {"insert:MBME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest1, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBME1-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest1, 10, 10, 2, 2, 0, 0, 1, -1, false}, // 11 - {"insert:MBME2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest2, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME2-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest2, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBME2-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBME2-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest2, 10, 10, 2, 2, 0, 0, 1, -1, false}, - {"insert:MBME3-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest3, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME3-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME3-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME3-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest3, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBME3-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBME3-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest3, 10, 10, 2, 2, 0, 0, 1, -1, false}, - {"insert:MBME4-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest4, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MBME4-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 12, 0, 0, 1, -1}, - {"insert:MBME4-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 2, 0, 0, 1, -1}, + {"insert:MBME4-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMBMETest4, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MBME4-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 12, 0, 0, 1, -1, false}, + {"insert:MBME4-C002", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMBMETest4, 10, 10, 2, 2, 0, 0, 1, -1, false}, - {"insert:MPME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMPMETest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:MPME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMPMETest1, 10, 10, 2, 12, 0, 0, 1, -1}, + {"insert:MPME1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertMPMETest1, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:MPME1-C012", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, false, insertMPMETest1, 10, 10, 2, 12, 0, 0, 1, -1, false}, - // 22 - {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 1, false, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"insert:AUTO2-TBEXISTS", tListLen(fullColList), fullColList, TTYPE_INSERT, 3, false, true, insertAUTOTest2, 10, 10, 2, 0, 0, 0, 1, -1}, -// {"insert:AUTO3-NTB", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, true, insertAUTOTest3, 10, 10, 2, 0, 0, 0, 1, -1}, + {"insert:STBI-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, false, true, insertStbTest, 10, 10, 2, 0, 0, 0, 1, -1, true}, - {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, - {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, + // 23 + {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, 1, false, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1, false}, + {"insert:AUTO2-TBEXISTS", tListLen(fullColList), fullColList, TTYPE_INSERT, 3, false, true, insertAUTOTest2, 10, 10, 2, 0, 0, 0, 1, -1, false}, +// {"insert:AUTO3-NTB", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, true, insertAUTOTest3, 10, 10, 2, 0, 0, 0, 1, -1, false}, - {"query:NG-TBNEXISTS",tListLen(fullColList), fullColList, TTYPE_INSERT_NG,0, false, false, insertNonExistsTb, 10, 10, 1, 3, 0, 0, 1, -1}, - {"query:NG-VARLENERR",tListLen(fullColList), fullColList, TTYPE_INSERT_NG,0, false, true, insertVarLenErr, 10, 10, 1, 3, 0, 0, 1, -1}, + {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2, false}, + {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2, false}, -// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, -// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, + {"query:NG-TBNEXISTS",tListLen(fullColList), fullColList, TTYPE_INSERT_NG,0, false, false, insertNonExistsTb, 10, 10, 1, 3, 0, 0, 1, -1, false}, + {"query:NG-VARLENERR",tListLen(fullColList), fullColList, TTYPE_INSERT_NG,0, false, true, insertVarLenErr, 10, 10, 1, 3, 0, 0, 1, -1, false}, + +// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2, false}, +// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2, false}, }; @@ -233,7 +237,7 @@ typedef struct { #if 0 CaseCtrl gCaseCtrl = { - .precision = TIME_PRECISION_MICRO, + .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, .printCreateTblSql = true, .printQuerySql = true, @@ -256,7 +260,7 @@ CaseCtrl gCaseCtrl = { .funcIdxList = NULL, .checkParamNum = false, .runTimes = 0, - .caseIdx = 26, + .caseIdx = 20, .caseNum = 1, .caseRunIdx = -1, .caseRunNum = -1, @@ -268,7 +272,7 @@ CaseCtrl gCaseCtrl = { CaseCtrl gCaseCtrl = { // default .precision = TIME_PRECISION_MILLI, .bindNullNum = 0, - .printCreateTblSql = false, + .printCreateTblSql = true, .printQuerySql = true, .printStmtSql = true, .printVerbose = false, @@ -456,9 +460,6 @@ void generateInsertSQL(BindData *data) { case TSDB_DATA_TYPE_UBIGINT: len += sprintf(data->sql + len, "tubigdata"); break; - case TSDB_DATA_TYPE_GEOMETRY: - len += sprintf(data->sql + len, "tgeometrydata"); - break; default: printf("!!!invalid tag type:%d", data->pTags[c].buffer_type); exit(1); @@ -527,9 +528,6 @@ void generateInsertSQL(BindData *data) { case TSDB_DATA_TYPE_UBIGINT: len += sprintf(data->sql + len, "ubigdata"); break; - case TSDB_DATA_TYPE_GEOMETRY: - len += sprintf(data->sql + len, "tgeometrydata"); - break; default: printf("!!!invalid col type:%d", data->pBind[c].buffer_type); exit(1); @@ -560,7 +558,11 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType, int32 pInfo = &operInfo[gCaseCtrl.optrIdxList[idx]]; } else { if (TSDB_DATA_TYPE_VARCHAR == dataType || TSDB_DATA_TYPE_NCHAR == dataType || TSDB_DATA_TYPE_GEOMETRY == dataType) { +#if 1 pInfo = &operInfo[varoperatorList[rand() % tListLen(varoperatorList)]]; +#else + pInfo = &operInfo[11]; +#endif } else { pInfo = &operInfo[operatorList[rand() % tListLen(operatorList)]]; } @@ -644,9 +646,6 @@ int32_t bpAppendColumnName(BindData *data, int32_t type, int32_t len) { case TSDB_DATA_TYPE_UBIGINT: return sprintf(data->sql + len, "ubigdata"); break; - case TSDB_DATA_TYPE_GEOMETRY: - len += sprintf(data->sql + len, "tgeometrydata"); - break; default: printf("!!!invalid col type:%d", type); exit(1); @@ -751,7 +750,7 @@ void generateErrorSQL(BindData *data, int32_t tblIdx) { } } -void generateColDataType(BindData *data, int32_t bindIdx, int32_t colIdx, int32_t *dataType) { +void generateColDataType(bool isQuery, BindData *data, int32_t bindIdx, int32_t colIdx, int32_t *dataType) { if (bindIdx < gCurCase->bindColNum) { if (gCaseCtrl.bindColTypeNum) { *dataType = gCaseCtrl.bindColTypeList[colIdx]; @@ -769,15 +768,23 @@ void generateColDataType(BindData *data, int32_t bindIdx, int32_t colIdx, int32_ break; } return; - } else if (0 == colIdx) { + } else if (0 == colIdx && !isQuery) { *dataType = TSDB_DATA_TYPE_TIMESTAMP; return; } else { while (true) { +#if 1 *dataType = rand() % (TSDB_DATA_TYPE_MAX - 1) + 1; +#else + if (!colExists(data->pBind, TSDB_DATA_TYPE_NCHAR)) { + *dataType = TSDB_DATA_TYPE_NCHAR; + } else { + *dataType = rand() % (TSDB_DATA_TYPE_MAX - 1) + 1; + } +#endif if (*dataType == TSDB_DATA_TYPE_JSON || *dataType == TSDB_DATA_TYPE_DECIMAL || *dataType == TSDB_DATA_TYPE_BLOB || *dataType == TSDB_DATA_TYPE_MEDIUMBLOB - || *dataType == TSDB_DATA_TYPE_VARBINARY) { + || *dataType == TSDB_DATA_TYPE_VARBINARY || *dataType == TSDB_DATA_TYPE_GEOMETRY) { continue; } @@ -806,7 +813,7 @@ void generateTagDataType(BindData *data, int32_t bindIdx, int32_t colIdx, int32_ *dataType = rand() % (TSDB_DATA_TYPE_MAX - 1) + 1; if (*dataType == TSDB_DATA_TYPE_JSON || *dataType == TSDB_DATA_TYPE_DECIMAL || *dataType == TSDB_DATA_TYPE_BLOB || *dataType == TSDB_DATA_TYPE_MEDIUMBLOB - || *dataType == TSDB_DATA_TYPE_VARBINARY) { + || *dataType == TSDB_DATA_TYPE_VARBINARY || *dataType == TSDB_DATA_TYPE_GEOMETRY) { continue; } @@ -823,7 +830,7 @@ void generateTagDataType(BindData *data, int32_t bindIdx, int32_t colIdx, int32_ } -int32_t prepareColData(BP_BIND_TYPE bType, BindData *data, int32_t bindIdx, int32_t rowIdx, int32_t colIdx) { +int32_t prepareColData(bool isQuery, BP_BIND_TYPE bType, BindData *data, int32_t bindIdx, int32_t rowIdx, int32_t colIdx) { int32_t dataType = TSDB_DATA_TYPE_TIMESTAMP; TAOS_MULTI_BIND *pBase = NULL; @@ -832,7 +839,7 @@ int32_t prepareColData(BP_BIND_TYPE bType, BindData *data, int32_t bindIdx, int3 generateTagDataType(data, bindIdx, colIdx, &dataType); } else { pBase = data->pBind; - generateColDataType(data, bindIdx, colIdx, &dataType); + generateColDataType(isQuery, data, bindIdx, colIdx, &dataType); } @@ -993,13 +1000,13 @@ int32_t prepareInsertData(BindData *data) { for (int b = 0; b < (allRowNum/gCurCase->bindRowNum); b++) { for (int c = 0; c < gCurCase->bindColNum; ++c) { - prepareColData(BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); + prepareColData(false, BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); } } for (int b = 0; b < gCurCase->tblNum; b++) { for (int c = 0; c < gCurCase->bindTagNum; ++c) { - prepareColData(BP_BIND_TAG, data, b*gCurCase->bindTagNum+c, b, c); + prepareColData(false, BP_BIND_TAG, data, b*gCurCase->bindTagNum+c, b, c); } } @@ -1056,7 +1063,7 @@ int32_t prepareQueryCondData(BindData *data, int32_t tblIdx) { for (int b = 0; b < bindNum; b++) { for (int c = 0; c < gCurCase->bindColNum; ++c) { - prepareColData(BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); + prepareColData(true, BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); } } @@ -1116,10 +1123,10 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { } else { gCaseCtrl.numericParam = false; } - + for (int b = 0; b < bindNum; b++) { for (int c = 0; c < gCurCase->bindColNum; ++c) { - prepareColData(BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); + prepareColData(true, BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c); } } @@ -1462,7 +1469,7 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, bool expectFail) { } if (gCurCase->bindRowNum > 1) { - if (0 == (n++%2)) { + if (0 == (n++%2) || gCurCase->stbInsert) { if (taos_stmt_bind_param_batch(stmt, bind)) { if (expectFail) return 0; printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); @@ -1473,7 +1480,7 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, bool expectFail) { for (int32_t i = 0; i < gCurCase->bindColNum; ++i) { if (taos_stmt_bind_single_param_batch(stmt, bind+i, i)) { if (expectFail) continue; - printf("!!!taos_stmt_bind_single_param_batch %d error:%s\n", taos_stmt_errstr(stmt), i); + printf("!!!taos_stmt_bind_single_param_batch %d error:%s\n", i, taos_stmt_errstr(stmt)); bpShowBindParam(bind, gCurCase->bindColNum); exit(1); } @@ -1925,6 +1932,62 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { return 0; } +/* prepare [settbname [bind] exec] */ +int insertStbTest(TAOS_STMT *stmt, TAOS *taos) { + BindData data = {0}; + prepareInsertData(&data); + + int code = taos_stmt_prepare(stmt, data.sql, 0); + if (code != 0){ + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + + bpCheckIsInsert(stmt, 1); + + int32_t bindTimes = gCurCase->rowNum/gCurCase->bindRowNum; + for (int32_t t = 0; t< gCurCase->tblNum; ++t) { + if (gCurCase->tblNum > 1) { + char buf[32]; + sprintf(buf, "t%d", t); + code = bpSetTableNameTags(&data, t, buf, stmt); + if (code != 0){ + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + } + + if (gCaseCtrl.checkParamNum) { + bpCheckParamNum(stmt); + } + + for (int32_t b = 0; b bindColNum + b*gCurCase->bindColNum, false)) { + exit(1); + } + } + + if (taos_stmt_add_batch(stmt)) { + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + + if (taos_stmt_execute(stmt) != 0) { + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + exit(1); + } + } + + bpCheckIsInsert(stmt, 1); + + destroyData(&data); + + bpCheckAffectedRows(stmt, 1); + + return 0; +} + + /* [prepare [settbnametag [bind add] exec]] */ int insertAUTOTest1(TAOS_STMT *stmt, TAOS *taos) { @@ -2556,9 +2619,6 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t * case TSDB_DATA_TYPE_UBIGINT: blen += sprintf(buf + blen, "ubigdata bigint unsigned"); break; - case TSDB_DATA_TYPE_GEOMETRY: - blen += sprintf(buf + blen, "geometrydata geometry(%d)", gVarCharSize); - break; default: printf("invalid col type:%d", colList[c]); exit(1); @@ -2617,9 +2677,6 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t * case TSDB_DATA_TYPE_UBIGINT: blen += sprintf(buf + blen, "tubigdata bigint unsigned"); break; - case TSDB_DATA_TYPE_GEOMETRY: - blen += sprintf(buf + blen, "tgeometrydata geometry(%d)", gVarCharSize); - break; default: printf("invalid col type:%d", colList[c]); exit(1); @@ -2678,9 +2735,6 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t * case TSDB_DATA_TYPE_UBIGINT: blen += sprintf(buf + blen, "%d", rand() % 128); break; - case TSDB_DATA_TYPE_GEOMETRY: - blen += sprintf(buf + blen, "'geo%d'", rand() % 128); - break; default: printf("invalid col type:%d", colList[c]); exit(1); @@ -2880,8 +2934,16 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { } beginUs = taosGetTimestampUs(); - - stmt = taos_stmt_init(taos); + + if (gCurCase->stbInsert) { + TAOS_STMT_OPTIONS op; + op.reqId = 0; + op.singleStbInsert = true; + op.singleTableBindOnce = true; + stmt = taos_stmt_init_with_options(taos, &op); + } else { + stmt = taos_stmt_init(taos); + } if (NULL == stmt) { printf("!!!taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt)); exit(1);