diff --git a/include/client/taos.h b/include/client/taos.h index 45dc85f6d9..587d483f35 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -150,6 +150,12 @@ typedef struct TAOS_DB_ROUTE_INFO { TAOS_VGROUP_HASH_INFO *vgHash; } TAOS_DB_ROUTE_INFO; +typedef struct TAOS_STMT_OPTIONS { + int64_t reqId; + bool singleStbInsert; + bool singleTableBindOnce; +} TAOS_STMT_OPTIONS; + DLL_EXPORT void taos_cleanup(void); DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT setConfRet taos_set_config(const char *config); @@ -162,6 +168,7 @@ DLL_EXPORT const char *taos_data_type(int type); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid); +DLL_EXPORT TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS* options); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 37d5f3f0b6..0a3bc078ec 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -118,7 +118,11 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid); void qCleanupKeywordsTable(); +int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo); +int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks); +//int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); +int32_t qResetStmtColumns(SArray* pCols, bool deepClear); int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf); int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset); int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, uint64_t suid, int32_t vgId, @@ -129,8 +133,8 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx); int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery); -int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, +int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); +int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields); int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields); @@ -160,6 +164,7 @@ SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap); void destoryCatalogReq(SCatalogReq *pCatalogReq); bool isPrimaryKeyImpl(SNode* pExpr); +int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo); #ifdef __cplusplus } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 32f7cef12c..e1eb6d8d42 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,6 +25,7 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlog.h" +#include "tsimplehash.h" #include "tmsg.h" #include "tmsgcb.h" @@ -193,6 +194,27 @@ typedef struct SBoundColInfo { int32_t numOfBound; } SBoundColInfo; +typedef struct STableColsData { + char tbName[TSDB_TABLE_NAME_LEN]; + SArray* aCol; + bool getFromHash; +} STableColsData; + +typedef struct STableVgUid { + uint64_t uid; + int32_t vgid; +} STableVgUid; + +typedef struct STableBufInfo { + void* pCurBuff; + SArray* pBufList; + int64_t buffUnit; + int64_t buffSize; + int64_t buffIdx; + int64_t buffOffset; +} STableBufInfo; + + typedef struct STableDataCxt { STableMeta* pMeta; STSchema* pSchema; @@ -204,6 +226,32 @@ typedef struct STableDataCxt { bool duplicateTs; } STableDataCxt; +typedef struct SStbInterlaceInfo { + void* pCatalog; + void* pQuery; + int32_t acctId; + char* dbname; + void* transport; + SEpSet mgmtEpSet; + void* pRequest; + uint64_t requestId; + int64_t requestSelf; + bool tbFromHash; + SHashObj* pVgroupHash; + SArray* pVgroupList; + SSHashObj* pTableHash; + int64_t tbRemainNum; + STableBufInfo tbBuf; + char firstName[TSDB_TABLE_NAME_LEN]; + + STableDataCxt *pDataCtx; + void *boundTags; + + SArray *pTableCols; + int32_t pTableColsIdx; +} SStbInterlaceInfo; + + typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index cbef80b6da..2e2735b2ae 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -40,6 +40,8 @@ typedef enum { STMT_MAX, } STMT_STATUS; +#define STMT_TABLE_COLS_NUM 1000 + typedef struct SStmtTableCache { STableDataCxt *pDataCtx; void *boundTags; @@ -57,6 +59,7 @@ typedef struct SStmtBindInfo { bool inExecCache; uint64_t tbUid; uint64_t tbSuid; + int32_t tbVgId; int32_t sBindRowNum; int32_t sBindLastIdx; int8_t tbType; @@ -66,8 +69,15 @@ typedef struct SStmtBindInfo { char tbFName[TSDB_TABLE_FNAME_LEN]; char stbFName[TSDB_TABLE_FNAME_LEN]; SName sname; + + char statbName[TSDB_TABLE_FNAME_LEN]; } SStmtBindInfo; +typedef struct SStmtAsyncParam { + STableColsData *pTbData; + void* pStmt; +} SStmtAsyncParam; + typedef struct SStmtExecInfo { int32_t affectedRows; SRequestObj *pRequest; @@ -77,8 +87,10 @@ typedef struct SStmtExecInfo { } SStmtExecInfo; typedef struct SStmtSQLInfo { + bool stbInterlaceMode; STMT_TYPE type; STMT_STATUS status; + uint64_t suid; uint64_t runTimes; SHashObj *pTableCache; // SHash SQuery *pQuery; @@ -88,21 +100,58 @@ typedef struct SStmtSQLInfo { SStmtQueryResInfo queryRes; bool autoCreateTbl; SHashObj *pVgHash; + + SStbInterlaceInfo siInfo; } SStmtSQLInfo; +typedef struct SStmtStatInfo { + int64_t ctgGetTbMetaNum; + int64_t getCacheTbInfo; + int64_t parseSqlNum; + int64_t bindDataNum; + int64_t setTbNameUs; + int64_t bindDataUs1; + int64_t bindDataUs2; + int64_t bindDataUs3; + int64_t bindDataUs4; + int64_t addBatchUs; + int64_t execWaitUs; + int64_t execUseUs; +} SStmtStatInfo; + +typedef struct SStmtQNode { + STableColsData tblData; + struct SStmtQNode* next; +} SStmtQNode; + +typedef struct SStmtQueue { + bool stopQueue; + SStmtQNode* head; + SStmtQNode* tail; + uint64_t qRemainNum; +} SStmtQueue; + + typedef struct STscStmt { - STscObj *taos; - SCatalog *pCatalog; - int32_t affectedRows; - uint32_t seqId; - uint32_t seqIds[STMT_MAX]; + STscObj *taos; + SCatalog *pCatalog; + int32_t affectedRows; + uint32_t seqId; + uint32_t seqIds[STMT_MAX]; + bool bindThreadInUse; + TdThread bindThread; + TAOS_STMT_OPTIONS options; + bool stbInterlaceMode; + SStmtQueue queue; - SStmtSQLInfo sql; - SStmtExecInfo exec; - SStmtBindInfo bInfo; + SStmtSQLInfo sql; + SStmtExecInfo exec; + SStmtBindInfo bInfo; - int64_t reqid; - int32_t errCode; + int64_t reqid; + int32_t errCode; + + SStmtStatInfo stat; } STscStmt; extern char *gStmtStatusStr[]; @@ -154,13 +203,14 @@ extern char *gStmtStatusStr[]; } while (0) +#define STMT_FLOG(param, ...) qFatal("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt) #define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt) -TAOS_STMT *stmtInit(STscObj *taos, int64_t reqid); +TAOS_STMT *stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions); int stmtClose(TAOS_STMT *stmt); int stmtExec(TAOS_STMT *stmt); const char *stmtErrstr(TAOS_STMT *stmt); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ba7f65c52b..9d782f882a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1548,7 +1548,7 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) { return NULL; } - TAOS_STMT *pStmt = stmtInit(pObj, 0); + TAOS_STMT *pStmt = stmtInit(pObj, 0, NULL); releaseTscObj(*(int64_t *)taos); @@ -1563,13 +1563,29 @@ TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) { return NULL; } - TAOS_STMT *pStmt = stmtInit(pObj, reqid); + TAOS_STMT *pStmt = stmtInit(pObj, reqid, NULL); releaseTscObj(*(int64_t *)taos); return pStmt; } +TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) { + STscObj *pObj = acquireTscObj(*(int64_t *)taos); + if (NULL == pObj) { + tscError("invalid parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + TAOS_STMT *pStmt = stmtInit(pObj, options->reqId, options); + + releaseTscObj(*(int64_t *)taos); + + return pStmt; +} + + int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { if (stmt == NULL || sql == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 36a3e50aef..74aba3074b 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -8,6 +8,60 @@ char* gStmtStatusStr[] = {"unknown", "init", "prepare", "settbname", "settags", "fetchFields", "bind", "bindCol", "addBatch", "exec"}; +static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pBuf) { + if (pTblBuf->buffOffset < pTblBuf->buffSize) { + *pBuf = pTblBuf->pCurBuff + pTblBuf->buffOffset; + pTblBuf->buffOffset += pTblBuf->buffUnit; + } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) { + pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++); + *pBuf = pTblBuf->pCurBuff; + pTblBuf->buffOffset = pTblBuf->buffUnit; + } else { + void *buff = taosMemoryMalloc(pTblBuf->buffSize); + if (NULL == buff) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pTblBuf->pBufList, &buff); + + pTblBuf->buffIdx++; + pTblBuf->pCurBuff = buff; + *pBuf = buff; + pTblBuf->buffOffset = pTblBuf->buffUnit; + } + + return TSDB_CODE_SUCCESS; +} + +bool stmtDequeue(STscStmt* pStmt, SStmtQNode **param) { + while (0 == atomic_load_64(&pStmt->queue.qRemainNum)) { + taosUsleep(1); + return false; + } + + SStmtQNode *orig = pStmt->queue.head; + + SStmtQNode *node = pStmt->queue.head->next; + pStmt->queue.head = pStmt->queue.head->next; + + //taosMemoryFreeClear(orig); + + *param = node; + + atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); + + return true; +} + +void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { + pStmt->queue.tail->next = param; + pStmt->queue.tail = param; + + pStmt->stat.bindDataNum++; + atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); +} + + static int32_t stmtCreateRequest(STscStmt* pStmt) { int32_t code = 0; @@ -42,7 +96,10 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { pStmt->errCode = 0; break; case STMT_SETTBNAME: - if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) { + if (STMT_STATUS_EQ(INIT)) { + code = TSDB_CODE_TSC_STMT_API_ERROR; + } + if (!pStmt->sql.stbInterlaceMode && (STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL))) { code = TSDB_CODE_TSC_STMT_API_ERROR; } break; @@ -170,6 +227,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid; pStmt->bInfo.tbSuid = pTableMeta->suid; + pStmt->bInfo.tbVgId = pTableMeta->vgId; pStmt->bInfo.tbType = pTableMeta->tableType; pStmt->bInfo.boundTags = tags; pStmt->bInfo.tagsCached = false; @@ -261,14 +319,18 @@ int32_t stmtParseSql(STscStmt* pStmt) { STMT_ERR_RET(stmtCreateRequest(pStmt)); + pStmt->stat.parseSqlNum++; STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb)); - + pStmt->sql.siInfo.pQuery = pStmt->sql.pQuery; + pStmt->bInfo.needParse = false; if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) { pStmt->sql.type = STMT_TYPE_INSERT; + pStmt->sql.stbInterlaceMode = false; } else if (pStmt->sql.pQuery->pPrepareRoot) { pStmt->sql.type = STMT_TYPE_QUERY; + pStmt->sql.stbInterlaceMode = false; } return TSDB_CODE_SUCCESS; @@ -277,6 +339,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bInfo.tbUid = 0; pStmt->bInfo.tbSuid = 0; + pStmt->bInfo.tbVgId = -1; pStmt->bInfo.tbType = 0; pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; @@ -287,47 +350,67 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { qDestroyBoundColInfo(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags); } - memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN); + pStmt->bInfo.stbFName[0] = 0;; return TSDB_CODE_SUCCESS; } +void stmtFreeTableBlkList(STableColsData* pTb) { + qResetStmtColumns(pTb->aCol, true); + taosArrayDestroy(pTb->aCol); +} + +void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { + pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); + pTblBuf->buffIdx = 1; + pTblBuf->buffOffset = sizeof(*pQueue->head); + + pQueue->head = pQueue->tail = pTblBuf->pCurBuff; + pQueue->qRemainNum = 0; + pQueue->head->next = NULL; +} + int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { - if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) { - taos_free_result(pStmt->exec.pRequest); - pStmt->exec.pRequest = NULL; - } - - size_t keyLen = 0; - void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); - while (pIter) { - STableDataCxt* pBlocks = *(STableDataCxt**)pIter; - char* key = taosHashGetKey(pIter, &keyLen); - STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); - - if (keepTable && pBlocks == pStmt->exec.pCurrBlock) { - TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData); - STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); - - pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); - continue; + if (pStmt->sql.stbInterlaceMode) { + pStmt->sql.siInfo.pTableColsIdx = 0; + stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue); + } else { + if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) { + taos_free_result(pStmt->exec.pRequest); + pStmt->exec.pRequest = NULL; } - qDestroyStmtDataBlock(pBlocks); - taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); + size_t keyLen = 0; + void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + while (pIter) { + STableDataCxt* pBlocks = *(STableDataCxt**)pIter; + char* key = taosHashGetKey(pIter, &keyLen); + STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); - pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + if (keepTable && pBlocks == pStmt->exec.pCurrBlock) { + TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData); + STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + continue; + } + + qDestroyStmtDataBlock(pBlocks); + taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + } + + if (keepTable) { + return TSDB_CODE_SUCCESS; + } + + taosHashCleanup(pStmt->exec.pBlockHash); + pStmt->exec.pBlockHash = NULL; + + tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pStmt->exec.pCurrTbData); } - if (keepTable) { - return TSDB_CODE_SUCCESS; - } - - taosHashCleanup(pStmt->exec.pBlockHash); - pStmt->exec.pBlockHash = NULL; - - tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); - taosMemoryFreeClear(pStmt->exec.pCurrTbData); - STMT_ERR_RET(stmtCleanBindInfo(pStmt)); return TSDB_CODE_SUCCESS; @@ -367,27 +450,49 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, - uint64_t suid) { - SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + +int32_t stmtTryAddTableVgroupInfo(STscStmt* pStmt, int32_t* vgId) { + if (*vgId >= 0 && taosHashGet(pStmt->sql.pVgHash, (const char*)vgId, sizeof(*vgId))) { + return TSDB_CODE_SUCCESS; + } + SVgroupInfo vgInfo = {0}; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, .requestId = pStmt->exec.pRequest->requestId, .requestObjRefId = pStmt->exec.pRequest->self, .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; - STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo)); - STMT_ERR_RET( - taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); + int32_t code = catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} - STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl)); - STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId); +int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid, + uint64_t suid, int32_t vgId) { + STMT_ERR_RET(stmtTryAddTableVgroupInfo(pStmt, &vgId)); + STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgId, pStmt->sql.autoCreateTbl)); + + STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgId); return TSDB_CODE_SUCCESS; } int32_t stmtGetFromCache(STscStmt* pStmt) { + if (pStmt->sql.stbInterlaceMode && pStmt->sql.siInfo.pDataCtx) { + pStmt->bInfo.needParse = false; + pStmt->bInfo.inExecCache = false; + return TSDB_CODE_SUCCESS; + } + pStmt->bInfo.needParse = true; pStmt->bInfo.inExecCache = false; @@ -404,6 +509,11 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { } } + if (NULL == pStmt->pCatalog) { + STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); + pStmt->sql.siInfo.pCatalog = pStmt->pCatalog; + } + if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { if (pStmt->bInfo.inExecCache) { pStmt->bInfo.needParse = false; @@ -415,9 +525,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - if (NULL == pStmt->pCatalog) { - STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); - } if (pStmt->sql.autoCreateTbl) { SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid)); @@ -426,7 +533,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tbUid = 0; STableDataCxt* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid)); + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid, -1)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -443,12 +550,19 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_RET(stmtCleanBindInfo(pStmt)); } + uint64_t uid, suid; + int32_t vgId; + int8_t tableType; + STableMeta* pTableMeta = NULL; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, .requestId = pStmt->exec.pRequest->requestId, .requestObjRefId = pStmt->exec.pRequest->self, .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); + + pStmt->stat.ctgGetTbMetaNum++; + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { tscDebug("tb %s not exist", pStmt->bInfo.tbFName); stmtCleanBindInfo(pStmt); @@ -458,10 +572,14 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_ERR_RET(code); - uint64_t uid = pTableMeta->uid; - uint64_t suid = pTableMeta->suid; - int8_t tableType = pTableMeta->tableType; + uid = pTableMeta->uid; + suid = pTableMeta->suid; + tableType = pTableMeta->tableType; + pStmt->bInfo.tbVgId = pTableMeta->vgId; + vgId = pTableMeta->vgId; + taosMemoryFree(pTableMeta); + uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid; if (uid == pStmt->bInfo.tbUid) { @@ -493,7 +611,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid)); if (pCache) { pStmt->bInfo.needParse = false; @@ -505,7 +623,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tagsCached = true; STableDataCxt* pNewBlock = NULL; - STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid)); + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid, vgId)); if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { @@ -538,9 +656,94 @@ int32_t stmtResetStmt(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) { + +int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) { + SStmtQNode* pParam = (SStmtQNode*)param; + + STMT_ERR_RET(qAppendStmtTableOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, &pParam->tblData, pStmt->exec.pCurrBlock, &pStmt->sql.siInfo)); + + //taosMemoryFree(pParam->pTbData); + + atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + + return TSDB_CODE_SUCCESS; +} + + +void *stmtBindThreadFunc(void *param) { + setThreadName("stmtBind"); + + qInfo("stmt bind thread started"); + + STscStmt* pStmt = (STscStmt*)param; + + while (true) { + if (atomic_load_8((int8_t *)&pStmt->queue.stopQueue)) { + break; + } + + SStmtQNode *asyncParam = NULL; + if (!stmtDequeue(pStmt, &asyncParam)) { + continue; + } + + stmtAsyncOutput(pStmt, asyncParam); + } + + qInfo("stmt bind thread stopped"); + + return NULL; +} + + +int32_t stmtStartBindThread(STscStmt* pStmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + STMT_ERR_RET(terrno); + } + + pStmt->bindThreadInUse = true; + + taosThreadAttrDestroy(&thAttr); + return TSDB_CODE_SUCCESS; +} + +int32_t stmtInitQueue(STscStmt* pStmt) { + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&pStmt->queue.head)); + pStmt->queue.tail = pStmt->queue.head; + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { + pTblBuf->buffUnit = sizeof(SStmtQNode); + pTblBuf->buffSize = pTblBuf->buffUnit * 1000; + pTblBuf->pBufList = taosArrayInit(100, POINTER_BYTES); + if (NULL == pTblBuf->pBufList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + void *buff = taosMemoryMalloc(pTblBuf->buffSize); + if (NULL == buff) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pTblBuf->pBufList, &buff); + + pTblBuf->pCurBuff = buff; + pTblBuf->buffIdx = 1; + pTblBuf->buffOffset = 0; + + return TSDB_CODE_SUCCESS; +} + +TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) { STscObj* pObj = (STscObj*)taos; STscStmt* pStmt = NULL; + int32_t code = 0; pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); if (NULL == pStmt) { @@ -560,6 +763,43 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) { pStmt->sql.status = STMT_INIT; pStmt->reqid = reqid; + if (NULL != pOptions) { + memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); + if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) { + pStmt->stbInterlaceMode = true; + } + } + + if (pStmt->stbInterlaceMode) { + pStmt->sql.siInfo.transport = taos->pAppInfo->pTransporter; + pStmt->sql.siInfo.acctId = taos->acctId; + pStmt->sql.siInfo.dbname = taos->db; + pStmt->sql.siInfo.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pStmt->sql.siInfo.pTableHash) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stmtClose(pStmt); + return NULL; + } + pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); + if (NULL == pStmt->sql.siInfo.pTableCols) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stmtClose(pStmt); + return NULL; + } + + code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf); + if (TSDB_CODE_SUCCESS == code) { + stmtInitQueue(pStmt); + code = stmtStartBindThread(pStmt); + } + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + stmtClose(pStmt); + return NULL; + } + } + STMT_LOG_SEQ(STMT_INIT); tscDebug("stmt:%p initialized", pStmt); @@ -584,6 +824,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pStmt->sql.sqlStr = strndup(sql, length); pStmt->sql.sqlLen = length; + pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode; return TSDB_CODE_SUCCESS; } @@ -591,6 +832,8 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STscStmt* pStmt = (STscStmt*)stmt; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG("start to set tbName: %s", tbName); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME)); @@ -602,21 +845,54 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STMT_ERR_RET(stmtCreateRequest(pStmt)); + if (!pStmt->sql.stbInterlaceMode || NULL == pStmt->sql.siInfo.pDataCtx) { + STMT_ERR_RET(stmtCreateRequest(pStmt)); - STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, - pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); - - STMT_ERR_RET(stmtGetFromCache(pStmt)); - - if (pStmt->bInfo.needParse) { strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; - STMT_ERR_RET(stmtParseSql(pStmt)); + STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, + pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); + tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); + + STMT_ERR_RET(stmtGetFromCache(pStmt)); + + if (pStmt->bInfo.needParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); + } + } else { + strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); + pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; + pStmt->exec.pRequest->requestId++; + pStmt->bInfo.needParse = false; } + if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) { + STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (!pSrc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STableDataCxt* pDst = NULL; + + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true)); + pStmt->sql.siInfo.pDataCtx = pDst; + + SArray* pTblCols = NULL; + for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) { + pTblCols = taosArrayDup(pDst->pData->aCol, NULL); + if (NULL == pTblCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + } + + pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; + } + + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.setTbNameUs += startUs2 - startUs; + return TSDB_CODE_SUCCESS; } @@ -682,9 +958,90 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields return TSDB_CODE_SUCCESS; } +/* +SArray* stmtGetFreeCol(STscStmt* pStmt, int32_t* idx) { + while (true) { + if (pStmt->exec.smInfo.pColIdx >= STMT_COL_BUF_SIZE) { + pStmt->exec.smInfo.pColIdx = 0; + } + + if ((pStmt->exec.smInfo.pColIdx + 1) == atomic_load_32(&pStmt->exec.smInfo.pColFreeIdx)) { + taosUsleep(1); + continue; + } + + *idx = pStmt->exec.smInfo.pColIdx; + return pStmt->exec.smInfo.pCols[pStmt->exec.smInfo.pColIdx++]; + } +} +*/ + +int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) { + if (NULL == pStmt->sql.siInfo.pVgroupHash) { + pStmt->sql.siInfo.pVgroupHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + } + if (NULL == pStmt->sql.siInfo.pVgroupList) { + pStmt->sql.siInfo.pVgroupList = taosArrayInit(64, POINTER_BYTES); + } + + if (NULL == pStmt->sql.siInfo.pRequest) { + STMT_ERR_RET(buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, (SRequestObj**)&pStmt->sql.siInfo.pRequest, + pStmt->reqid)); + + if (pStmt->reqid != 0) { + pStmt->reqid++; + } + pStmt->exec.pRequest->syncQuery = true; + + pStmt->sql.siInfo.requestId = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->requestId; + pStmt->sql.siInfo.requestSelf = ((SRequestObj*)pStmt->sql.siInfo.pRequest)->self; + } + + if (!pStmt->sql.siInfo.tbFromHash && pStmt->sql.siInfo.firstName[0] && 0 == strcmp(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName)) { + pStmt->sql.siInfo.tbFromHash = true; + } + + if (0 == pStmt->sql.siInfo.firstName[0]) { + strcpy(pStmt->sql.siInfo.firstName, pStmt->bInfo.tbName); + } + + param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; + param->next = NULL; + + atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + + stmtEnqueue(pStmt, param); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray** pTableCols) { + while (true) { + if (pStmt->sql.siInfo.pTableColsIdx < taosArrayGetSize(pStmt->sql.siInfo.pTableCols)) { + *pTableCols = (SArray*)taosArrayGetP(pStmt->sql.siInfo.pTableCols, pStmt->sql.siInfo.pTableColsIdx++); + break; + } else { + SArray* pTblCols = NULL; + for (int32_t i = 0; i < 100; i++) { + pTblCols = taosArrayDup(pStmt->sql.siInfo.pDataCtx->pData->aCol, NULL); + if (NULL == pTblCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + } + } + } + + return TSDB_CODE_SUCCESS; +} + int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND)); @@ -699,9 +1056,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { pStmt->exec.pRequest = NULL; } - STMT_ERR_RET(stmtCreateRequest(pStmt)); - if (pStmt->bInfo.needParse) { + STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt)); } @@ -758,8 +1114,30 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { pStmt->exec.pCurrBlock = *pDataBlock; } + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.bindDataUs1 += startUs2 - startUs; + + SStmtQNode* param = NULL; + if (pStmt->sql.stbInterlaceMode) { + STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)¶m)); + STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, ¶m->tblData.aCol)); + + int32_t colNum = taosArrayGetSize(param->tblData.aCol); + for (int32_t i = 0; i < colNum; ++i) { + SColData* pCol = (SColData*)taosArrayGet(param->tblData.aCol, i); + tColDataClear(pCol); + } + + strcpy(param->tblData.tbName, pStmt->bInfo.tbName); + } + + int64_t startUs3 = taosGetTimestampUs(); + pStmt->stat.bindDataUs2 += startUs3 - startUs2; + + SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol; + if (colIdx < 0) { - int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); + code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen); if (code) { tscError("qBindStmtColsValue failed, error:%s", tstrerror(code)); STMT_ERR_RET(code); @@ -776,20 +1154,38 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { pStmt->bInfo.sBindRowNum = bind->num; } - qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, + qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum); } + int64_t startUs4 = taosGetTimestampUs(); + pStmt->stat.bindDataUs3 += startUs4 - startUs3; + + if (pStmt->sql.stbInterlaceMode) { + STMT_ERR_RET(stmtAppendTablePostHandle(pStmt, param)); + } + + pStmt->stat.bindDataUs4 += taosGetTimestampUs() - startUs4; + return TSDB_CODE_SUCCESS; } int stmtAddBatch(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG_E("start to add batch"); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH)); + if (pStmt->sql.stbInterlaceMode) { + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.addBatchUs += startUs2 - startUs; + + return TSDB_CODE_SUCCESS; + } + STMT_ERR_RET(stmtCacheBlock(pStmt)); return TSDB_CODE_SUCCESS; @@ -858,6 +1254,8 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); + pStmt->stat.ctgGetTbMetaNum++; + taos_free_result(pStmt->exec.pRequest); pStmt->exec.pRequest = NULL; @@ -879,11 +1277,58 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { return finalCode; } +/* +int stmtStaticModeExec(TAOS_STMT* stmt) { + STscStmt* pStmt = (STscStmt*)stmt; + int32_t code = 0; + SSubmitRsp* pRsp = NULL; + if (pStmt->sql.staticMode) { + return TSDB_CODE_TSC_STMT_API_ERROR; + } + + STMT_DLOG_E("start to exec"); + + STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); + + STMT_ERR_RET(qBuildStmtOutputFromTbList(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pTbBlkList, pStmt->exec.pCurrBlock, pStmt->exec.tbBlkNum)); + + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + + if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { + code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest); + if (code) { + pStmt->exec.pRequest->code = code; + } else { + tFreeSSubmitRsp(pRsp); + STMT_ERR_RET(stmtResetStmt(pStmt)); + STMT_ERR_RET(TSDB_CODE_NEED_RETRY); + } + } + + STMT_ERR_JRET(pStmt->exec.pRequest->code); + + pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); + pStmt->affectedRows += pStmt->exec.affectedRows; + +_return: + + stmtCleanExecInfo(pStmt, (code ? false : true), false); + + tFreeSSubmitRsp(pRsp); + + ++pStmt->sql.runTimes; + + STMT_RET(code); +} +*/ + int stmtExec(TAOS_STMT* stmt) { STscStmt* pStmt = (STscStmt*)stmt; int32_t code = 0; SSubmitRsp* pRsp = NULL; + int64_t startUs = taosGetTimestampUs(); + STMT_DLOG_E("start to exec"); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); @@ -891,12 +1336,26 @@ int stmtExec(TAOS_STMT* stmt) { if (STMT_TYPE_QUERY == pStmt->sql.type) { launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { - tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); - taosMemoryFreeClear(pStmt->exec.pCurrTbData); + if (pStmt->sql.stbInterlaceMode) { + int64_t startTs = taosGetTimestampUs(); + while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) { + taosUsleep(1); + } + pStmt->stat.execWaitUs += taosGetTimestampUs() - startTs; + + STMT_ERR_RET(qBuildStmtFinOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->sql.siInfo.pVgroupList)); + taosHashCleanup(pStmt->sql.siInfo.pVgroupHash); + pStmt->sql.siInfo.pVgroupHash = NULL; + pStmt->sql.siInfo.pVgroupList = NULL; + } else { + tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pStmt->exec.pCurrTbData); - STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); + STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData)); - STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); + STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); + } + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } @@ -919,11 +1378,14 @@ int stmtExec(TAOS_STMT* stmt) { _return: stmtCleanExecInfo(pStmt, (code ? false : true), false); - + tFreeSSubmitRsp(pRsp); ++pStmt->sql.runTimes; + int64_t startUs2 = taosGetTimestampUs(); + pStmt->stat.execUseUs += startUs2 - startUs; + STMT_RET(code); } @@ -932,6 +1394,18 @@ int stmtClose(TAOS_STMT* stmt) { STMT_DLOG_E("start to free stmt"); + pStmt->queue.stopQueue = true; + + taosMsleep(10); + + STMT_FLOG("stmt %p closed, statInfo: ctgGetTbMetaNum=>%" PRId64 ", getCacheTbInfo=>%" PRId64 ", parseSqlNum=>%" PRId64 + ", pStmt->stat.bindDataNum=>%" PRId64 ", settbnameAPI:%u, bindAPI:%u, addbatchAPI:%u, execAPI:%u" + ", setTbNameUs:%" PRId64 ", bindDataUs:%" PRId64 ",%" PRId64 ",%" PRId64 ",%" PRId64 " addBatchUs:%" PRId64 ", execWaitUs:%" PRId64 ", execUseUs:%" PRId64, + pStmt, pStmt->stat.ctgGetTbMetaNum, pStmt->stat.getCacheTbInfo, pStmt->stat.parseSqlNum, pStmt->stat.bindDataNum, + pStmt->seqIds[STMT_SETTBNAME], pStmt->seqIds[STMT_BIND], pStmt->seqIds[STMT_ADD_BATCH], pStmt->seqIds[STMT_EXECUTE], + pStmt->stat.setTbNameUs, pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4, + pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); + stmtCleanSQLInfo(pStmt); taosMemoryFree(stmt); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ef37a41fcf..19eef78c63 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9624,6 +9624,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { } if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { +#if 0 int32_t nColData = TARRAY_SIZE(pTbData->aCol); SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol); @@ -9631,6 +9632,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { tColDataDestroy(&aColData[i]); } taosArrayDestroy(pTbData->aCol); +#endif } else { int32_t nRow = TARRAY_SIZE(pTbData->aRowP); SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP); diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index 1988620539..4f2877fcf6 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -51,7 +51,8 @@ int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals); int32_t initTableColSubmitData(STableDataCxt *pTableCxt); int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild); -int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); +//int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum); +int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks, bool append); void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); void insDestroyVgroupDataCxtList(SArray *pVgCxtList); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index fcb5588717..db2d34b844 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -446,7 +446,7 @@ int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) { uError("insMergeTableDataCxt failed"); return code; } - code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); if (code != TSDB_CODE_SUCCESS) { uError("insBuildVgDataBlocks failed"); return code; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 22f274b21c..53b9805267 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2474,7 +2474,7 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt taosHashClear(pStmt->pTableCxtHashObj); if (TSDB_CODE_SUCCESS == code) { - code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); } return code; @@ -2523,9 +2523,14 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S pStmt->freeStbRowsCxtFunc = destroyStbRowsDataContext; if (!reentry) { - pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - pStmt->pTableBlockHashObj = - taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pCxt->pComCxt->pStmtCb) { + pStmt->pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } else { + pStmt->pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + } } pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index bdeb548bd7..ecca1266be 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -51,6 +51,48 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData) { return TSDB_CODE_SUCCESS; } +int32_t qAppendStmtTableOutput(SQuery* pQuery, SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) { + // merge according to vgId + return insAppendStmtTableDataCxt(pAllVgHash, pTbData, pTbCtx, pBuildInfo); +} + +int32_t qBuildStmtFinOutput(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) { + int32_t code = TSDB_CODE_SUCCESS; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true); + } + + if (pStmt->freeArrayFunc) { + pStmt->freeArrayFunc(pVgDataBlocks); + } + return code; +} + + +/* +int32_t qBuildStmtOutputFromTbList(SQuery* pQuery, SHashObj* pVgHash, SArray* pBlockList, STableDataCxt* pTbCtx, int32_t tbNum) { + int32_t code = TSDB_CODE_SUCCESS; + SArray* pVgDataBlocks = NULL; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + // merge according to vgId + if (tbNum > 0) { + code = insMergeStmtTableDataCxt(pTbCtx, pBlockList, &pVgDataBlocks, true, tbNum); + } + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); + } + + if (pStmt->freeArrayFunc) { + pStmt->freeArrayFunc(pVgDataBlocks); + } + return code; +} +*/ + int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) { int32_t code = TSDB_CODE_SUCCESS; SArray* pVgDataBlocks = NULL; @@ -60,8 +102,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash if (taosHashGetSize(pBlockHash) > 0) { code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true); } + if (TSDB_CODE_SUCCESS == code) { - code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); + code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks, false); } if (pStmt->freeArrayFunc) { @@ -233,7 +276,7 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND* return TSDB_CODE_SUCCESS; } -int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { +int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; @@ -245,7 +288,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in for (int c = 0; c < boundInfo->numOfBound; ++c) { SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; - SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c); + SColData* pCol = taosArrayGet(pCols, c); if (bind[c].num != rowNum) { code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); @@ -283,14 +326,14 @@ _return: return code; } -int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, +int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta); SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]]; - SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx); + SColData* pCol = taosArrayGet(pCols, colIdx); TAOS_MULTI_BIND ncharBind = {0}; TAOS_MULTI_BIND* pBind = NULL; int32_t code = 0; @@ -393,6 +436,22 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel return TSDB_CODE_SUCCESS; } +int32_t qResetStmtColumns(SArray* pCols, bool deepClear) { + int32_t colNum = taosArrayGetSize(pCols); + + for (int32_t i = 0; i < colNum; ++i) { + SColData* pCol = (SColData*)taosArrayGet(pCols, i); + if (deepClear) { + tColDataDeepClear(pCol); + } else { + tColDataClear(pCol); + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) { STableDataCxt* pBlock = (STableDataCxt*)block; int32_t colNum = taosArrayGetSize(pBlock->pData->aCol); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 7c43113ad2..4d3151d847 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -21,6 +21,7 @@ #include "querynodes.h" #include "tRealloc.h" #include "tdatablock.h" +#include "tmisce.h" void qDestroyBoundColInfo(void* pInfo) { if (NULL == pInfo) { @@ -429,7 +430,7 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { taosHashCleanup(pTableCxtHash); } -static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild) { +static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild, bool clear) { if (NULL == pVgCxt->pData->aSubmitTbData) { pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); if (NULL == pVgCxt->pData->aSubmitTbData) { @@ -441,7 +442,7 @@ static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCx taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); if (isRebuild) { rebuildTableData(pTableCxt->pData, &pTableCxt->pData); - } else { + } else if (clear) { taosMemoryFreeClear(pTableCxt->pData); } @@ -486,6 +487,207 @@ int insColDataComp(const void* lp, const void* rp) { return 0; } + +int32_t insTryAddTableVgroupInfo(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, int32_t* vgId, STableColsData* pTbData, SName* sname) { + if (*vgId >= 0 && taosHashGet(pAllVgHash, (const char*)vgId, sizeof(*vgId))) { + return TSDB_CODE_SUCCESS; + } + + SVgroupInfo vgInfo = {0}; + SRequestConnInfo conn = {.pTrans = pBuildInfo->transport, + .requestId = pBuildInfo->requestId, + .requestObjRefId = pBuildInfo->requestSelf, + .mgmtEps = pBuildInfo->mgmtEpSet}; + + int32_t code = catalogGetTableHashVgroup((SCatalog*)pBuildInfo->pCatalog, &conn, sname, &vgInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(pAllVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t insGetStmtTableVgUid(SHashObj* pAllVgHash, SStbInterlaceInfo* pBuildInfo, STableColsData* pTbData, uint64_t* uid, int32_t* vgId) { + STableVgUid* pTbInfo = NULL; + int32_t code = 0; + + if (pTbData->getFromHash) { + pTbInfo = (STableVgUid*)tSimpleHashGet(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName)); + } + + if (NULL == pTbInfo) { + SName sname; + qCreateSName(&sname, pTbData->tbName, pBuildInfo->acctId, pBuildInfo->dbname, NULL, 0); + + STableMeta* pTableMeta = NULL; + SRequestConnInfo conn = {.pTrans = pBuildInfo->transport, + .requestId = pBuildInfo->requestId, + .requestObjRefId = pBuildInfo->requestSelf, + .mgmtEps = pBuildInfo->mgmtEpSet}; + code = catalogGetTableMeta((SCatalog*)pBuildInfo->pCatalog, &conn, &sname, &pTableMeta); + + if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { + parserDebug("tb %s.%s not exist", sname.dbname, sname.tname); + return code; + } + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + *uid = pTableMeta->uid; + *vgId = pTableMeta->vgId; + + STableVgUid tbInfo = {.uid = *uid, .vgid = *vgId}; + tSimpleHashPut(pBuildInfo->pTableHash, pTbData->tbName, strlen(pTbData->tbName), &tbInfo, sizeof(tbInfo)); + + code = insTryAddTableVgroupInfo(pAllVgHash, pBuildInfo, vgId, pTbData, &sname); + + taosMemoryFree(pTableMeta); + } else { + *uid = pTbInfo->uid; + *vgId = pTbInfo->vgid; + } + + return code; +} + + +int32_t qBuildStmtFinOutput1(SQuery* pQuery, SHashObj* pAllVgHash, SArray* pVgDataBlocks) { + int32_t code = TSDB_CODE_SUCCESS; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot; + + if (TSDB_CODE_SUCCESS == code) { + code = insBuildVgDataBlocks(pAllVgHash, pVgDataBlocks, &pStmt->pDataBlocks, true); + } + + return code; +} + + + +int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData, STableDataCxt* pTbCtx, SStbInterlaceInfo* pBuildInfo) { + int32_t code = TSDB_CODE_SUCCESS; + uint64_t uid; + int32_t vgId; + + pTbCtx->pData->aCol = pTbData->aCol; + + SColData* pCol = taosArrayGet(pTbCtx->pData->aCol, 0); + if (pCol->nVal <= 0) { + return TSDB_CODE_SUCCESS; + } + + code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + pTbCtx->pMeta->vgId = vgId; + pTbCtx->pMeta->uid = uid; + pTbCtx->pData->uid = uid; + + if (pTbCtx->pData->pCreateTbReq) { + pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + } + + taosArraySort(pTbCtx->pData->aCol, insColDataComp); + + tColDataSortMerge(pTbCtx->pData->aCol); + + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + SVgroupDataCxt* pVgCxt = NULL; + void** pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + pp = taosHashGet(pBuildInfo->pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + code = createVgroupDataCxt(pTbCtx, pBuildInfo->pVgroupHash, pBuildInfo->pVgroupList, &pVgCxt); + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + + if (TSDB_CODE_SUCCESS == code) { + code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false); + } + + if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 1000) { + code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList); + taosArrayClear(pVgCxt->pData->aSubmitTbData); + } + + return code; +} + +/* +int32_t insMergeStmtTableDataCxt(STableDataCxt* pTableCxt, SArray* pTableList, SArray** pVgDataBlocks, bool isRebuild, int32_t tbNum) { + SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); + if (NULL == pVgroupHash || NULL == pVgroupList) { + taosHashCleanup(pVgroupHash); + taosArrayDestroy(pVgroupList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + + for (int32_t i = 0; i < tbNum; ++i) { + STableColsData *pTableCols = (STableColsData*)taosArrayGet(pTableList, i); + pTableCxt->pMeta->vgId = pTableCols->vgId; + pTableCxt->pMeta->uid = pTableCols->uid; + pTableCxt->pData->uid = pTableCols->uid; + pTableCxt->pData->aCol = pTableCols->aCol; + + SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, 0); + if (pCol->nVal <= 0) { + continue; + } + + if (pTableCxt->pData->pCreateTbReq) { + pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + } + + taosArraySort(pTableCxt->pData->aCol, insColDataComp); + + tColDataSortMerge(pTableCxt->pData->aCol); + + if (TSDB_CODE_SUCCESS == code) { + SVgroupDataCxt* pVgCxt = NULL; + int32_t vgId = pTableCxt->pMeta->vgId; + void** pp = taosHashGet(pVgroupHash, &vgId, sizeof(vgId)); + if (NULL == pp) { + code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt); + } else { + pVgCxt = *(SVgroupDataCxt**)pp; + } + if (TSDB_CODE_SUCCESS == code) { + code = fillVgroupDataCxt(pTableCxt, pVgCxt, false, false); + } + } + } + + taosHashCleanup(pVgroupHash); + if (TSDB_CODE_SUCCESS == code) { + *pVgDataBlocks = pVgroupList; + } else { + insDestroyVgroupDataCxtList(pVgroupList); + } + + return code; +} +*/ + int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); @@ -546,7 +748,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool pVgCxt = *(SVgroupDataCxt**)pp; } if (TSDB_CODE_SUCCESS == code) { - code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild); + code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild, true); } } if (TSDB_CODE_SUCCESS == code) { @@ -599,9 +801,9 @@ static void destroyVgDataBlocks(void* p) { taosMemoryFree(pVg); } -int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) { +int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks, bool append) { size_t numOfVg = taosArrayGetSize(pVgDataCxtList); - SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + SArray* pDataBlocks = (append && *pVgDataBlocks) ? *pVgDataBlocks : taosArrayInit(numOfVg, POINTER_BYTES); if (NULL == pDataBlocks) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -609,6 +811,9 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, int32_t code = TSDB_CODE_SUCCESS; for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) { SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i); + if (taosArrayGetSize(src->pData->aSubmitTbData) <= 0) { + continue; + } SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); if (NULL == dst) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -626,6 +831,13 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, } } + if (append) { + if (NULL == *pVgDataBlocks) { + *pVgDataBlocks = pDataBlocks; + } + return code; + } + if (TSDB_CODE_SUCCESS == code) { *pVgDataBlocks = pDataBlocks; } else { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 4b8695e8ae..7771dbc0ac 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1268,7 +1268,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } -#if 1 +#if 0 SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL;