diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d946fe6a98..7f17c1673b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -252,24 +252,31 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); typedef struct { - int32_t index; // index of failed block in submit blocks - int32_t vnode; // vnode index of failed block - int32_t sid; // table index of failed block - int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table -} SSubmitRspBlock; + int8_t hashMeta; + int64_t uid; + char* tblFName; + int32_t numOfRows; + int32_t affectedRows; +} SSubmitBlkRsp; typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; - SSubmitRspBlock failedBlocks[]; + int32_t numOfRows; + int32_t affectedRows; + int32_t nBlocks; + union { + SArray* pArray; + SSubmitBlkRsp* pBlocks; + }; } SSubmitRsp; +int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); +int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); +void tFreeSSubmitRsp(SSubmitRsp *pRsp); + #define COL_SMA_ON ((int8_t)0x1) #define COL_IDX_ON ((int8_t)0x2) #define COL_VAL_SET ((int8_t)0x4) + typedef struct SSchema { int8_t type; int8_t flags; @@ -476,6 +483,7 @@ typedef struct { int32_t acctId; int64_t clusterId; uint32_t connId; + int32_t dnodeNum; int8_t superUser; int8_t connType; SEpSet epSet; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index a8bcac1c42..61e4bb3723 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -26,8 +26,7 @@ extern "C" { typedef struct SStmtCallback { TAOS_STMT* pStmt; int32_t (*getTbNameFn)(TAOS_STMT*, char**); - int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*); - int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*); + int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*); int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; @@ -59,8 +58,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash int32_t qResetStmtDataBlock(void* block, bool keepBuf); int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); void qFreeStmtDataBlock(void* pDataBlock); -int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc); +int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId); void qDestroyStmtDataBlock(void* pBlock); +STableMeta *qGetTableMetaInDataBlock(void* pDataBlock); int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen); int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx, diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 460749243c..b3f35025d1 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -56,6 +56,7 @@ typedef struct SQueryResult { uint64_t numOfRows; int32_t msgSize; char *msg; + void *res; } SQueryResult; typedef struct STaskInfo { @@ -71,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 57d4031835..f9d257af98 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -307,9 +307,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v // --- mq void hbMgrInitMqHbRspHandle(); -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery); +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res); int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); -int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); +int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); #ifdef __cplusplus diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 32da75fb1e..e5efafc214 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -55,6 +55,7 @@ typedef struct SStmtQueryResInfo { typedef struct SStmtBindInfo { bool needParse; + bool inExecCache; uint64_t tbUid; uint64_t tbSuid; int32_t sBindRowNum; @@ -62,7 +63,9 @@ typedef struct SStmtBindInfo { int8_t tbType; bool tagsCached; void* boundTags; - char* tbName; + char tbName[TSDB_TABLE_FNAME_LEN];; + char tbFName[TSDB_TABLE_FNAME_LEN]; + char stbFName[TSDB_TABLE_FNAME_LEN]; SName sname; } SStmtBindInfo; @@ -71,12 +74,12 @@ typedef struct SStmtExecInfo { SRequestObj* pRequest; SHashObj* pVgHash; SHashObj* pBlockHash; + bool autoCreateTbl; } SStmtExecInfo; typedef struct SStmtSQLInfo { STMT_TYPE type; STMT_STATUS status; - bool autoCreate; uint64_t runTimes; SHashObj* pTableCache; //SHash SQuery* pQuery; @@ -85,6 +88,7 @@ typedef struct SStmtSQLInfo { SArray* nodeList; SQueryPlan* pQueryPlan; SStmtQueryResInfo queryRes; + bool autoCreateTbl; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 53aebe751b..cb361f09a6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -286,12 +286,12 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { pResInfo->precision = precision; } -int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { +int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, - pRequest->metric.start, &res); + pRequest->metric.start, NULL != pRes, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); @@ -310,6 +310,10 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } } + if (pRes) { + *pRes = res.res; + } + pRequest->code = res.code; terrno = res.code; return pRequest->code; @@ -320,7 +324,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList); } -SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) { +SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) { if (TSDB_CODE_SUCCESS == code) { switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: @@ -333,7 +337,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); if (TSDB_CODE_SUCCESS == code) { - code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList); + code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res); } taosArrayDestroy(pNodeList); break; @@ -373,7 +377,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) { return pRequest; } - return launchQueryImpl(pRequest, pQuery, code, false); + return launchQueryImpl(pRequest, pQuery, code, false, NULL); } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 8096ce395a..11c6971e3d 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -58,7 +58,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index b5d09d6fd5..9909aa63f9 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1660,7 +1660,7 @@ static int32_t smlInsertData(SSmlHandle* info) { smlBuildOutput(info->exec, info->pVgHash); info->cost.insertRpcTime = taosGetTimestampUs(); - launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); + launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true, NULL); info->affectedRows = taos_affected_rows(info->pRequest); return info->pRequest->code; diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index d69ab4413b..a7e50b2044 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -67,8 +67,8 @@ int32_t stmtGetTbName(TAOS_STMT* stmt, char** tbName) { STscStmt* pStmt = (STscStmt*)stmt; pStmt->sql.type = STMT_TYPE_MULTI_INSERT; - - if (NULL == pStmt->bInfo.tbName) { + + if ('\0' == pStmt->bInfo.tbName[0]) { tscError("no table name set"); STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR); } @@ -121,9 +121,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) { +int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName) { STscStmt* pStmt = (STscStmt*)stmt; + strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); + pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0; + pStmt->bInfo.tbUid = pTableMeta->uid; pStmt->bInfo.tbSuid = pTableMeta->suid; pStmt->bInfo.tbType = pTableMeta->tableType; @@ -133,15 +136,28 @@ int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) { return TSDB_CODE_SUCCESS; } -int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) { +int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; pStmt->exec.pVgHash = pVgHash; pStmt->exec.pBlockHash = pBlockHash; + pStmt->exec.autoCreateTbl = autoCreateTbl; return TSDB_CODE_SUCCESS; } +int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash) { + STscStmt* pStmt = (STscStmt*)stmt; + + STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName)); + STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl)); + + pStmt->sql.autoCreateTbl = autoCreateTbl; + + return TSDB_CODE_SUCCESS; +} + + int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) { STscStmt* pStmt = (STscStmt*)stmt; @@ -156,16 +172,16 @@ int32_t stmtCacheBlock(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - uint64_t uid = pStmt->bInfo.tbUid; - uint64_t tuid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid; + uint64_t uid = pStmt->bInfo.tbUid; + uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid; - if (taosHashGet(pStmt->sql.pTableCache, &tuid, sizeof(tuid))) { + if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) { return TSDB_CODE_SUCCESS; } - STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid)); - STableDataBlocks* pDst = NULL; - + STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + STableDataBlocks* pDst = NULL; + STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc)); SStmtTableCache cache = { @@ -173,22 +189,25 @@ int32_t stmtCacheBlock(STscStmt* pStmt) { .boundTags = pStmt->bInfo.boundTags, }; - if (taosHashPut(pStmt->sql.pTableCache, &tuid, sizeof(tuid), &cache, sizeof(cache))) { + if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) { return TSDB_CODE_OUT_OF_MEMORY; } - pStmt->bInfo.boundTags = NULL; - + if (pStmt->sql.autoCreateTbl) { + pStmt->bInfo.tagsCached = true; + } else { + pStmt->bInfo.boundTags = NULL; + } + return TSDB_CODE_SUCCESS; } int32_t stmtParseSql(STscStmt* pStmt) { SStmtCallback stmtCb = { - .pStmt = pStmt, - .getTbNameFn = stmtGetTbName, - .setBindInfoFn = stmtSetBindInfo, - .setExecInfoFn = stmtSetExecInfo, - .getExecInfoFn = stmtGetExecInfo, + .pStmt = pStmt, + .getTbNameFn = stmtGetTbName, + .setInfoFn = stmtUpdateInfo, + .getExecInfoFn = stmtGetExecInfo, }; if (NULL == pStmt->exec.pRequest) { @@ -221,8 +240,10 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bInfo.tbSuid = 0; pStmt->bInfo.tbType = 0; pStmt->bInfo.needParse = true; + pStmt->bInfo.inExecCache = false; - taosMemoryFreeClear(pStmt->bInfo.tbName); + pStmt->bInfo.tbName[0] = 0; + pStmt->bInfo.tbFName[0] = 0; if (!pStmt->bInfo.tagsCached) { destroyBoundColumnInfo(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags); @@ -237,12 +258,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { pStmt->exec.pRequest = NULL; } - void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + size_t keyLen = 0; + void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { - STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter; - uint64_t* key = taosHashGetKey(pIter, NULL); - - if (keepTable && (*key == pStmt->bInfo.tbUid)) { + STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter; + char *key = taosHashGetKey(pIter, &keyLen); + STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); + + if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); @@ -250,11 +273,13 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { } qFreeStmtDataBlock(pBlocks); - taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key)); + taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); } + pStmt->exec.autoCreateTbl = false; + if (keepTable) { return TSDB_CODE_SUCCESS; } @@ -296,10 +321,39 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } +int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks *pDataBlock, STableDataBlocks **newBlock, uint64_t uid) { + SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + SVgroupInfo vgInfo = {0}; + + STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &vgInfo)); + STMT_ERR_RET(taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo))); + + STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId)); + + return TSDB_CODE_SUCCESS; +} + int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.needParse = true; + pStmt->bInfo.inExecCache = false; + + STableDataBlocks *pBlockInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); + if (pBlockInExec) { + pStmt->bInfo.needParse = false; + pStmt->bInfo.inExecCache = true; + + if (pStmt->sql.autoCreateTbl) { + return TSDB_CODE_SUCCESS; + } + } if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { + if (pStmt->bInfo.inExecCache) { + ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1); + pStmt->bInfo.needParse = false; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_SUCCESS; } @@ -307,10 +361,31 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); } - STableMeta* pTableMeta = NULL; - SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); - int32_t code = - catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta); + if (pStmt->sql.autoCreateTbl) { + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid)); + if (pCache) { + pStmt->bInfo.needParse = false; + pStmt->exec.autoCreateTbl = true; + + pStmt->bInfo.tbUid = 0; + + STableDataBlocks* pNewBlock = NULL; + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0)); + + if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + STMT_RET(stmtCleanBindInfo(pStmt)); + } + + + STableMeta *pTableMeta = NULL; + SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); + int32_t code = catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta); if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { STMT_ERR_RET(stmtCleanBindInfo(pStmt)); @@ -323,18 +398,19 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { uint64_t suid = pTableMeta->suid; int8_t tableType = pTableMeta->tableType; taosMemoryFree(pTableMeta); - + uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid; + if (uid == pStmt->bInfo.tbUid) { pStmt->bInfo.needParse = false; return TSDB_CODE_SUCCESS; } - if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) { - SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid)); + if (pStmt->bInfo.inExecCache) { + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid)); if (NULL == pCache) { - tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid); - + tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); } @@ -349,7 +425,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } - SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid)); + SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid)); if (pCache) { pStmt->bInfo.needParse = false; @@ -360,10 +436,9 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { pStmt->bInfo.tagsCached = true; STableDataBlocks* pNewBlock = NULL; - STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock)); + STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid)); - if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, - POINTER_BYTES)) { + if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -447,15 +522,15 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest)); } - - STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, - pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - + + STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); + tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); + STMT_ERR_RET(stmtGetFromCache(pStmt)); if (pStmt->bInfo.needParse) { - taosMemoryFree(pStmt->bInfo.tbName); - pStmt->bInfo.tbName = strdup(tbName); + strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); + pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; } return TSDB_CODE_SUCCESS; @@ -466,16 +541,17 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS)); - if (!pStmt->bInfo.needParse) { + if (pStmt->bInfo.needParse) { + STMT_ERR_RET(stmtParseSql(pStmt)); + } + + if (pStmt->bInfo.inExecCache) { return TSDB_CODE_SUCCESS; } - STMT_ERR_RET(stmtParseSql(pStmt)); - - STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet( - pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid)); + STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { - tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid); + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -491,10 +567,9 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet( - pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid)); + STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { - tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid); + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -509,10 +584,9 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); } - STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet( - pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid)); + STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { - tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid); + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -557,11 +631,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { bool emptyResult = false; STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId, &emptyResult)); } - - STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet( - pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid)); + + STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { - tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid); + tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -600,17 +673,73 @@ int stmtAddBatch(TAOS_STMT* stmt) { return TSDB_CODE_SUCCESS; } -int stmtExec(TAOS_STMT* stmt) { +int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) { + if (pRsp->nBlocks <= 0) { + tscError("invalid submit resp block number %d", pRsp->nBlocks); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + size_t keyLen = 0; + STableDataBlocks **pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); + while (pIter) { + STableDataBlocks *pBlock = *pIter; + char *key = taosHashGetKey(pIter, &keyLen); + + STableMeta *pMeta = qGetTableMetaInDataBlock(pBlock); + if (pMeta->uid != pStmt->bInfo.tbUid) { + tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + if (pMeta->uid) { + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + continue; + } + + SSubmitBlkRsp *blkRsp = NULL; + int32_t i = 0; + for (; i < pRsp->nBlocks; ++i) { + blkRsp = pRsp->pBlocks + i; + if (strlen(blkRsp->tblFName) != keyLen) { + continue; + } + + if (strncmp(blkRsp->tblFName, key, keyLen)) { + continue; + } + + break; + } + + if (i < pRsp->nBlocks) { + tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid, blkRsp->uid); + + pMeta->uid = blkRsp->uid; + pStmt->bInfo.tbUid = blkRsp->uid; + } else { + tscError("table %s not found in submit rsp", pStmt->bInfo.tbFName); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + } + + return TSDB_CODE_SUCCESS; +} + +int stmtExec(TAOS_STMT *stmt) { STscStmt* pStmt = (STscStmt*)stmt; - int32_t code = 0; + int32_t code = 0; + SSubmitRsp *pRsp = NULL; + bool autoCreateTbl = pStmt->exec.autoCreateTbl; STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY == pStmt->sql.type) { - scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList); + scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL); } else { STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash)); - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true); + launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL)); } if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { @@ -632,6 +761,15 @@ _return: stmtCleanExecInfo(pStmt, (code ? false : true), false); + if (TSDB_CODE_SUCCESS == code && autoCreateTbl) { + if (NULL == pRsp) { + tscError("no submit resp got for auto create table"); + STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR); + } + + STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp)); + } + ++pStmt->sql.runTimes; STMT_RET(code); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9524fb1f44..ebd81c7da3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2764,6 +2764,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->dnodeNum) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; @@ -2783,6 +2784,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->dnodeNum) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; @@ -4026,3 +4028,84 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { tEndDecode(pCoder); return 0; } + +static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) { + if (tStartEncode(pEncoder) < 0) return -1; + + if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; + if (pBlock->hashMeta) { + if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; + if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1; + } + if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; + if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; + + tEndEncode(pEncoder); + return 0; +} + +static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { + if (tStartDecode(pDecoder) < 0) return -1; + + if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; + if (pBlock->hashMeta) { + if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; + pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); + if (NULL == pBlock->tblFName) return -1; + if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; + } + if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) { + int32_t nBlocks = taosArrayGetSize(pRsp->pArray); + + if (tStartEncode(pEncoder) < 0) return -1; + + if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1; + if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1; + if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1; + for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) { + if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1; + } + + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + + if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1; + if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1; + pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks)); + if (pRsp->pBlocks == NULL) return -1; + for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) { + if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; + } + + tEndDecode(pDecoder); + tDecoderClear(pDecoder); + return 0; +} + +void tFreeSSubmitRsp(SSubmitRsp *pRsp) { + if (NULL == pRsp) return; + + if (pRsp->pBlocks) { + for (int32_t i = 0; i < pRsp->nBlocks; ++i) { + SSubmitBlkRsp *sRsp = pRsp->pBlocks + i; + taosMemoryFree(sRsp->tblFName); + } + + taosMemoryFree(pRsp->pBlocks); + } + + taosMemoryFree(pRsp); +} + diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 2de337537f..600bdcf310 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -20,6 +20,7 @@ #include "mndShow.h" #include "mndStb.h" #include "mndUser.h" +#include "mndDnode.h" #include "tglobal.h" #include "version.h" @@ -227,6 +228,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { connectRsp.clusterId = pMnode->clusterId; connectRsp.connId = pConn->id; connectRsp.connType = connReq.connType; + connectRsp.dnodeNum = mndGetDnodeSize(pMnode); snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 986b2740f3..9a36fc6eae 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,7 +102,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t* pAffectedRows); +int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 384f3fd489..d40a73eb67 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -290,7 +290,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) { +int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) { SSubmitBlkIter blkIter = {0}; STsdbMemTable *pMemTable = pTsdb->mem; void *tptr; @@ -344,7 +344,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; - (*pAffectedRows) = pMsgIter->numOfRows; + pRsp->numOfRows = pMsgIter->numOfRows; + pRsp->affectedRows = pMsgIter->numOfRows; return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5a7892a750..3107c6f5c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -36,9 +36,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * // loop to insert tInitSubmitMsgIter(pMsg, &msgIter); while (true) { + SSubmitBlkRsp r = {0}; tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) { + if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) { return -1; } @@ -46,8 +47,8 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * } if (pRsp != NULL) { - pRsp->affectedRows = affectedrows; - pRsp->numOfRows = numOfRows; + // pRsp->affectedRows = affectedrows; + // pRsp->numOfRows = numOfRows; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 3e869650bf..8156e6c512 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -119,7 +119,7 @@ _exit: taosMemoryFree(metaRsp.pSchemas); metaReaderClear(&mer2); metaReaderClear(&mer1); - return code; + return TSDB_CODE_SUCCESS; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 630a7ffd43..fc2b6fe676 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -502,55 +502,66 @@ _exit: return 0; } +static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { + SSubmitBlkIter blkIter = {0}; + STSchema *pSchema = NULL; + tb_uid_t suid = 0; + STSRow *row = NULL; + + tInitSubmitBlkIter(msgIter, pBlock, &blkIter); + if (blkIter.row == NULL) return 0; + if (!pSchema || (suid != msgIter->suid)) { + if (pSchema) { + taosMemoryFreeClear(pSchema); + } + pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema + if (pSchema) { + suid = msgIter->suid; + } + } + if (!pSchema) { + printf("%s:%d no valid schema\n", tags, __LINE__); + return -1; + } + char __tags[128] = {0}; + snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid); + while ((row = tGetSubmitBlkNext(&blkIter))) { + tdSRowPrint(row, pSchema, __tags); + } + + taosMemoryFreeClear(pSchema); + + return TSDB_CODE_SUCCESS; +} + static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; SMeta *pMeta = pVnode->pMeta; SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - STSchema *pSchema = NULL; - tb_uid_t suid = 0; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; - tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); - if (blkIter.row == NULL) continue; - if (!pSchema || (suid != msgIter.suid)) { - if (pSchema) { - taosMemoryFreeClear(pSchema); - } - pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema - if (pSchema) { - suid = msgIter.suid; - } - } - if (!pSchema) { - printf("%s:%d no valid schema\n", tags, __LINE__); - continue; - } - char __tags[128] = {0}; - snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid); - while ((row = tGetSubmitBlkNext(&blkIter))) { - tdSRowPrint(row, pSchema, __tags); - } + + vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); } - taosMemoryFreeClear(pSchema); - return 0; } static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; + SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; SSubmitBlk *pBlock; SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; int32_t nRows; + int32_t tsize, ret; + SEncoder encoder = {0}; pRsp->code = 0; @@ -564,12 +575,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in goto _exit; } - for (;;) { + submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp)); + for (int i = 0;;) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; + SSubmitBlkRsp submitBlkRsp = {0}; + // create table for auto create table mode if (msgIter.schemaLen > 0) { + submitBlkRsp.hashMeta = 1; + tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { pRsp->code = TSDB_CODE_INVALID_MSG; @@ -585,6 +601,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } } + submitBlkRsp.uid = createTbReq.uid; + submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); + sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); + msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { msgIter.suid = createTbReq.ctb.suid; @@ -592,22 +612,33 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in msgIter.suid = 0; } + vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); tDecoderClear(&decoder); } - if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) { + if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { pRsp->code = terrno; goto _exit; } - rsp.affectedRows += nRows; + submitRsp.numOfRows += submitBlkRsp.numOfRows; + submitRsp.affectedRows += submitBlkRsp.affectedRows; + taosArrayPush(submitRsp.pArray, &submitBlkRsp); } _exit: - // encode the response (TODO) - pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); - memcpy(pRsp->pCont, &rsp, sizeof(rsp)); - pRsp->contLen = sizeof(SSubmitRsp); + tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); + pRsp->pCont = rpcMallocCont(tsize); + pRsp->contLen = tsize; + tEncoderInit(&encoder, pRsp->pCont, tsize); + tEncodeSSubmitRsp(&encoder, &submitRsp); + tEncoderClear(&encoder); + + for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) { + taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName); + } + + taosArrayDestroy(submitRsp.pArray); tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index 0627ed1066..e19f54dff3 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -137,7 +137,7 @@ void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockHashmap(SHashObj* pDataBlockHash); int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); -int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, +int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq); int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks); int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 87dc8b97c8..3d069257c9 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -237,13 +237,8 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con return code; } -static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) { +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *dbFname, bool isStb) { SParseContext* pBasicCtx = pCxt->pComCxt; - SName name = {0}; - createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg); - - char dbFname[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(&name, dbFname); bool pass = false; CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, @@ -252,22 +247,22 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool return TSDB_CODE_PAR_PERMISSION_DENIED; } if (isStb) { - CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, + CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &pCxt->pTableMeta)); } else { - CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, + CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &pCxt->pTableMeta)); SVgroupInfo vg; CHECK_CODE( - catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); + catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); } return TSDB_CODE_SUCCESS; } -static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); } +static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, false); } -static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); } +static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, true); } static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { while (start < end) { @@ -841,8 +836,9 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); - pMeta->uid = tGenIdPI64(); + pMeta->uid = 0; pMeta->vgId = vg.vgId; + pMeta->tableType = TSDB_CHILD_TABLE; STableMeta* pBackup = NULL; if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) { @@ -852,11 +848,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) -static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) { - SName name; - createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); - char tbFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&name, tbFName); +static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) { int32_t len = strlen(tbFName); STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len); if (NULL != pMeta) { @@ -866,11 +858,17 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) SToken sToken; // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); - CHECK_CODE(getSTableMeta(pCxt, &sToken)); + + SName sname; + createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); + char stbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(&sname, stbFName); + + CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } - CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta)); + CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta)); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); @@ -890,7 +888,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) if (TK_NK_LP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } - CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name.tname)); + CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname)); NEXT_TOKEN(pCxt->pSql, sToken); if (TK_NK_RP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z); @@ -1065,6 +1063,8 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t tbNum = 0; + char tbFName[TSDB_TABLE_FNAME_LEN]; + bool autoCreateTbl = false; // for each table while (1) { @@ -1102,16 +1102,21 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { SToken tbnameToken = sToken; NEXT_TOKEN(pCxt->pSql, sToken); + SName name; + createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); + tNameExtractFullName(&name, tbFName); + // USING cluase if (TK_USING == sToken.type) { - CHECK_CODE(parseUsingClause(pCxt, &tbnameToken)); + CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); NEXT_TOKEN(pCxt->pSql, sToken); + autoCreateTbl = true; } else { - CHECK_CODE(getTableMeta(pCxt, &tbnameToken)); + CHECK_CODE(getTableMeta(pCxt, &name, tbFName)); } STableDataBlocks* dataBuf = NULL; - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); @@ -1153,10 +1158,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags); - memset(&pCxt->tags, 0, sizeof(pCxt->tags)); + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); - (*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); + memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; pCxt->pTableBlockHashObj = NULL; pCxt->pTableMeta = NULL; @@ -1193,7 +1197,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { &context.pTableBlockHashObj); } else { context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || @@ -1389,7 +1393,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in tdSRowPrint(row, pSTSchema, __func__); taosMemoryFree(pSTSchema); #endif - pDataBlock->size += extendedRowSize; } @@ -1476,6 +1479,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu taosMemoryFree(pSTSchema); } #endif + } if (rowEnd) { @@ -1673,9 +1677,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid); STableDataBlocks* pDataBlock = NULL; - ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), - getTableInfo(pTableMeta).rowSize, pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq); - if (ret != TSDB_CODE_SUCCESS) { + ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta, + &pDataBlock, NULL, &smlHandle->createTblReq); + if(ret != TSDB_CODE_SUCCESS){ buildInvalidOperationMsg(&pBuf, "create data block error"); return ret; } diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index c074334722..8deaad6091 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -185,11 +185,11 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) return TSDB_CODE_SUCCESS; } -int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, +int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq) { *dataBlocks = NULL; - STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id)); + STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen); if (t1 != NULL) { *dataBlocks = *t1; } @@ -207,7 +207,7 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3 } } - taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); + taosHashPut(pHashList, (const char*)id, idLen, (char*)dataBlocks, POINTER_BYTES); if (pBlockList) { taosArrayPush(pBlockList, dataBlocks); } @@ -445,7 +445,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); - SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pHashObj, NULL); @@ -457,7 +457,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p STableDataBlocks* dataBuf = NULL; pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId; // for schemaless, restore origin vgId int32_t ret = - getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, + getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL); if (ret != TSDB_CODE_SUCCESS) { taosHashCleanup(pVnodeDataBlockHashList); @@ -620,7 +620,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) { return qResetStmtDataBlock(*pDst, false); } -int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) { +int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) { int32_t code = qCloneStmtDataBlock(pDst, pSrc); if (code) { return code; @@ -633,11 +633,22 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) { return TSDB_CODE_OUT_OF_MEMORY; } + pBlock->vgId = vgId; + + if (pBlock->pTableMeta) { + pBlock->pTableMeta->uid = uid; + pBlock->pTableMeta->vgId = vgId; + } + memset(pBlock->pData, 0, sizeof(SSubmitBlk)); return TSDB_CODE_SUCCESS; } +STableMeta *qGetTableMetaInDataBlock(void* pDataBlock) { + return ((STableDataBlocks*)pDataBlock)->pTableMeta; +} + void qFreeStmtDataBlock(void* pDataBlock) { if (pDataBlock == NULL) { return; diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index eaab8e1f53..039e6693c2 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -260,7 +260,7 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { comparFn = 20; } else if (optr == OP_TYPE_LIKE) { comparFn = 9; - } else if (optr == OP_TYPE_LIKE) { + } else if (optr == OP_TYPE_NOT_LIKE) { comparFn = 27; } else if (optr == OP_TYPE_IN) { comparFn = 8; @@ -3512,7 +3512,8 @@ void fltConvertToTsValueNode(SFltTreeStat *stat, SValueNode* valueNode) { valueNode->datum.i = 0; } taosMemoryFree(timeStr); - + + valueNode->typeData = valueNode->datum.i; valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 5231890821..28aaafba0a 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -92,8 +92,9 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { } if (IS_VAR_DATA_TYPE(type)) { - len = varDataLen(out.columnData->pData); - buf = varDataVal(out.columnData->pData); + char* data = colDataGetVarData(out.columnData, 0); + len = varDataLen(data); + buf = varDataVal(data); } else { len = tDataTypes[type].bytes; buf = out.columnData->pData; @@ -109,7 +110,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { } if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) { - sclError("taosHashPut failed"); + sclError("taosHashPut to set failed"); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index b83147bfee..c9fcaeb32e 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -328,9 +328,10 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowInd static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIndex) { int32_t len = 0; int32_t inputLen = varDataLen(buf); + int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; - char* t = taosMemoryCalloc(1,(inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); - /*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), pOut->columnData->info.bytes, &len); + char* t = taosMemoryCalloc(1, outputMaxLen); + /*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen, &len); varDataSetLen(t, len); colDataAppend(pOut->columnData, rowIndex, t, false); @@ -512,7 +513,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1 if (outType == TSDB_DATA_TYPE_NCHAR) { varToNchar(tmp, pOut, i); } else { - colDataAppend(pOutputCol, i, (char *)&value, false); + colDataAppend(pOutputCol, i, (char *)tmp, false); } } } else if (IS_UNSIGNED_NUMERIC_TYPE(inType)) { @@ -529,7 +530,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1 if (outType == TSDB_DATA_TYPE_NCHAR) { varToNchar(tmp, pOut, i); } else { - colDataAppend(pOutputCol, i, (char *)&value, false); + colDataAppend(pOutputCol, i, (char *)tmp, false); } } } else if (IS_FLOAT_TYPE(inType)) { @@ -546,7 +547,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1 if (outType == TSDB_DATA_TYPE_NCHAR) { varToNchar(tmp, pOut, i); } else { - colDataAppend(pOutputCol, i, (char *)&value, false); + colDataAppend(pOutputCol, i, (char *)tmp, false); } } } else { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5906ee8970..a90fb7fc2e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -159,6 +159,7 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; + bool needRes; bool syncSchedule; bool queryJob; bool needFlowCtrl; @@ -190,6 +191,7 @@ typedef struct SSchJob { SSchTask *fetchTask; int32_t errCode; SArray *errList; // SArray + SRWLatch resLock; void *resData; //TODO free it or not int32_t resNumOfRows; const char *sql; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 10e4255022..1ce074c49f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * } int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { + int64_t startTs, bool needRes, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -81,6 +81,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; + pJob->attr.needRes = needRes; pJob->transport = transport; pJob->sql = sql; @@ -1133,16 +1134,39 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } case TDMT_VND_SUBMIT_RSP: { - if (msg) { - SSubmitRsp *rsp = (SSubmitRsp *)msg; - SCH_ERR_JRET(rsp->code); - } - SCH_ERR_JRET(rspCode); - SSubmitRsp *rsp = (SSubmitRsp *)msg; - if (rsp) { - pJob->resNumOfRows += rsp->affectedRows; + if (msg) { + SDecoder coder = {0}; + SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); + tDecoderInit(&coder, msg, msgSize); + code = tDecodeSSubmitRsp(&coder, rsp); + if (code) { + SCH_TASK_ELOG("decode submitRsp failed, code:%d", code); + tFreeSSubmitRsp(rsp); + SCH_ERR_JRET(code); + } + + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); + + if (pJob->attr.needRes) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->resData) { + SSubmitRsp *sum = pJob->resData; + sum->affectedRows += rsp->affectedRows; + sum->nBlocks += rsp->nBlocks; + sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); + memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + taosMemoryFree(rsp->pBlocks); + taosMemoryFree(rsp); + } else { + pJob->resData = rsp; + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + } else { + tFreeSSubmitRsp(rsp); + } } SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -2350,7 +2374,7 @@ void schFreeJobImpl(void *job) { } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool syncSchedule) { + int64_t startTs, bool needRes, bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -2359,7 +2383,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule)); + SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule)); SCH_ERR_JRET(schLaunchJob(pJob)); @@ -2473,7 +2497,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - int64_t startTs, SQueryResult *pRes) { + int64_t startTs, bool needRes, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -2481,13 +2505,17 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true)); } SSchJob *job = schAcquireJob(*pJob); pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; + if (needRes) { + pRes->res = job->resData; + job->resData = NULL; + } schReleaseJob(*pJob); @@ -2502,7 +2530,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); + SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index fc0e05aaf1..09ecd9fffd 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -985,7 +985,7 @@ TEST(insertTest, normalCase) { taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); SQueryResult res = {0}; - code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20); diff --git a/source/util/src/tuuid.c b/source/util/src/tuuid.c index 69cf7baaad..1f3cbd9573 100644 --- a/source/util/src/tuuid.c +++ b/source/util/src/tuuid.c @@ -48,10 +48,18 @@ int64_t tGenIdPI64(void) { } } - int64_t ts = taosGetTimestampMs(); - uint64_t pid = taosGetPId(); - int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1); + int64_t id; + + while (true) { + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1); + + id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + if (id) { + break; + } + } - int64_t id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); return id; } diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 8169e6c503..adf38015ae 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -11,8 +11,8 @@ int32_t shortColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT}; int32_t fullColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_BIGINT, TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_BINARY, TSDB_DATA_TYPE_NCHAR}; -int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_NCHAR}; -int32_t optrIdxList[] = {0, 1, 2}; +int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_NCHAR}; +int32_t optrIdxList[] = {0, 9}; typedef struct { char* oper; @@ -92,7 +92,8 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos); int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos); int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos); int insertAUTOTest1(TAOS_STMT *stmt, TAOS *taos); -int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos); +int queryColumnTest(TAOS_STMT *stmt, TAOS *taos); +int queryMiscTest(TAOS_STMT *stmt, TAOS *taos); enum { TTYPE_INSERT = 1, @@ -152,7 +153,8 @@ CaseCfg gCase[] = { // 22 {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, - {"query:SUBT-FULL", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, querySUBTTest1, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, }; @@ -184,7 +186,7 @@ typedef struct { int32_t caseRunNum; // total run case num } CaseCtrl; -#if 0 +#if 1 CaseCtrl gCaseCtrl = { // default .bindNullNum = 0, .printCreateTblSql = false, @@ -202,7 +204,7 @@ CaseCtrl gCaseCtrl = { // default .optrIdxListNum = 0, .optrIdxList = NULL, .checkParamNum = false, - .printRes = true, + .printRes = false, .runTimes = 0, .caseIdx = -1, .caseNum = -1, @@ -212,8 +214,8 @@ CaseCtrl gCaseCtrl = { // default #endif -#if 1 -CaseCtrl gCaseCtrl = { // default +#if 0 +CaseCtrl gCaseCtrl = { .bindNullNum = 0, .printCreateTblSql = true, .printQuerySql = true, @@ -223,18 +225,18 @@ CaseCtrl gCaseCtrl = { // default .bindColNum = 0, .bindTagNum = 0, .bindRowNum = 0, - .bindColTypeNum = 0, - .bindColTypeList = NULL, + .bindColTypeNum = tListLen(bindColTypeList), + .bindColTypeList = bindColTypeList, .bindTagTypeNum = 0, .bindTagTypeList = NULL, - .optrIdxListNum = 0, - .optrIdxList = NULL, + .optrIdxListNum = tListLen(optrIdxList), + .optrIdxList = optrIdxList, .checkParamNum = false, - .printRes = true, + .printRes = false, .runTimes = 0, - .caseIdx = -1, + .caseIdx = 23, .caseNum = 1, - .caseRunIdx = 11, + .caseRunIdx = -1, .caseRunNum = 1, }; #endif @@ -253,14 +255,14 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper .optrIdxListNum = 0, .optrIdxList = NULL, .checkParamNum = false, - .printRes = true, + .printRes = false, .runTimes = 0, .caseRunIdx = -1, .optrIdxListNum = 0, .optrIdxList = NULL, .bindColTypeNum = 0, .bindColTypeList = NULL, - .caseIdx = 22, + .caseIdx = 23, .caseNum = 1, .caseRunNum = 1, }; @@ -1749,7 +1751,7 @@ int insertAUTOTest1(TAOS_STMT *stmt, TAOS *taos) { /* select * from table */ -int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { +int queryColumnTest(TAOS_STMT *stmt, TAOS *taos) { BindData data = {0}; for (int32_t t = 0; t< gCurCase->tblNum; ++t) { @@ -1795,7 +1797,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { } /* value in query sql */ -int querySUBTTest2(TAOS_STMT *stmt, TAOS *taos) { +int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) { BindData data = {0}; for (int32_t t = 0; t< gCurCase->tblNum; ++t) { @@ -1862,1119 +1864,6 @@ int errorSQLTest1(TAOS_STMT *stmt, TAOS *taos) { return 0; } - -#if 0 - -//1 tables 10 records -int stmt_funcb_autoctb1(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - -//1 tables 10 records -int stmt_funcb_autoctb2(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - -// int one_null = 1; - int one_not_null = 0; - - char* is_null = taosMemoryMalloc(sizeof(char) * 10); - char* no_null = taosMemoryMalloc(sizeof(char) * 10); - - for (int i = 0; i < 10; ++i) { - lb[i] = 40; - no_null[i] = 0; - is_null[i] = (i % 10 == 2) ? 1 : 0; - v.b[i] = (int8_t)(i % 2); - v.v1[i] = (int8_t)((i+1) % 2); - v.v2[i] = (int16_t)i; - v.v4[i] = (int32_t)(i+1); - v.v8[i] = (int64_t)(i+2); - v.f4[i] = (float)(i+3); - v.f8[i] = (double)(i+4); - memset(v.bin[i], '0'+i%10, 40); - } - - for (int i = 0; i < 10; i+=10) { - params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[i+0].buffer_length = sizeof(int64_t); - params[i+0].buffer = &v.ts[10*i/10]; - params[i+0].length = NULL; - params[i+0].is_null = no_null; - params[i+0].num = 10; - - params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; - params[i+1].buffer_length = sizeof(int8_t); - params[i+1].buffer = v.b; - params[i+1].length = NULL; - params[i+1].is_null = is_null; - params[i+1].num = 10; - - params[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT; - params[i+2].buffer_length = sizeof(int8_t); - params[i+2].buffer = v.v1; - params[i+2].length = NULL; - params[i+2].is_null = is_null; - params[i+2].num = 10; - - params[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT; - params[i+3].buffer_length = sizeof(int16_t); - params[i+3].buffer = v.v2; - params[i+3].length = NULL; - params[i+3].is_null = is_null; - params[i+3].num = 10; - - params[i+4].buffer_type = TSDB_DATA_TYPE_INT; - params[i+4].buffer_length = sizeof(int32_t); - params[i+4].buffer = v.v4; - params[i+4].length = NULL; - params[i+4].is_null = is_null; - params[i+4].num = 10; - - params[i+5].buffer_type = TSDB_DATA_TYPE_BIGINT; - params[i+5].buffer_length = sizeof(int64_t); - params[i+5].buffer = v.v8; - params[i+5].length = NULL; - params[i+5].is_null = is_null; - params[i+5].num = 10; - - params[i+6].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[i+6].buffer_length = sizeof(float); - params[i+6].buffer = v.f4; - params[i+6].length = NULL; - params[i+6].is_null = is_null; - params[i+6].num = 10; - - params[i+7].buffer_type = TSDB_DATA_TYPE_DOUBLE; - params[i+7].buffer_length = sizeof(double); - params[i+7].buffer = v.f8; - params[i+7].length = NULL; - params[i+7].is_null = is_null; - params[i+7].num = 10; - - params[i+8].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+8].buffer_length = 40; - params[i+8].buffer = v.bin; - params[i+8].length = lb; - params[i+8].is_null = is_null; - params[i+8].num = 10; - - params[i+9].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+9].buffer_length = 40; - params[i+9].buffer = v.bin; - params[i+9].length = lb; - params[i+9].is_null = is_null; - params[i+9].num = 10; - - } - - int64_t tts = 1591060628000; - for (int i = 0; i < 10; ++i) { - v.ts[i] = tts + i; - } - - - for (int i = 0; i < 1; ++i) { - tags[i+0].buffer_type = TSDB_DATA_TYPE_INT; - tags[i+0].buffer = v.v4; - tags[i+0].is_null = &one_not_null; - tags[i+0].length = NULL; - - tags[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; - tags[i+1].buffer = v.b; - tags[i+1].is_null = &one_not_null; - tags[i+1].length = NULL; - - tags[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT; - tags[i+2].buffer = v.v1; - tags[i+2].is_null = &one_not_null; - tags[i+2].length = NULL; - - tags[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT; - tags[i+3].buffer = v.v2; - tags[i+3].is_null = &one_not_null; - tags[i+3].length = NULL; - - tags[i+4].buffer_type = TSDB_DATA_TYPE_BIGINT; - tags[i+4].buffer = v.v8; - tags[i+4].is_null = &one_not_null; - tags[i+4].length = NULL; - - tags[i+5].buffer_type = TSDB_DATA_TYPE_FLOAT; - tags[i+5].buffer = v.f4; - tags[i+5].is_null = &one_not_null; - tags[i+5].length = NULL; - - tags[i+6].buffer_type = TSDB_DATA_TYPE_DOUBLE; - tags[i+6].buffer = v.f8; - tags[i+6].is_null = &one_not_null; - tags[i+6].length = NULL; - - tags[i+7].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[i+7].buffer = v.bin; - tags[i+7].is_null = &one_not_null; - tags[i+7].length = (uintptr_t *)lb; - - tags[i+8].buffer_type = TSDB_DATA_TYPE_NCHAR; - tags[i+8].buffer = v.bin; - tags[i+8].is_null = &one_not_null; - tags[i+8].length = (uintptr_t *)lb; - } - - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(1,true,2,3,4,5.0,6.0,'a','b') values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - -//1 tables 10 records -int stmt_funcb_autoctb3(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - -// int one_null = 1; - int one_not_null = 0; - - char* is_null = taosMemoryMalloc(sizeof(char) * 10); - char* no_null = taosMemoryMalloc(sizeof(char) * 10); - - for (int i = 0; i < 10; ++i) { - lb[i] = 40; - no_null[i] = 0; - is_null[i] = (i % 10 == 2) ? 1 : 0; - v.b[i] = (int8_t)(i % 2); - v.v1[i] = (int8_t)((i+1) % 2); - v.v2[i] = (int16_t)i; - v.v4[i] = (int32_t)(i+1); - v.v8[i] = (int64_t)(i+2); - v.f4[i] = (float)(i+3); - v.f8[i] = (double)(i+4); - memset(v.bin[i], '0'+i%10, 40); - } - - for (int i = 0; i < 10; i+=10) { - params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[i+0].buffer_length = sizeof(int64_t); - params[i+0].buffer = &v.ts[10*i/10]; - params[i+0].length = NULL; - params[i+0].is_null = no_null; - params[i+0].num = 10; - - params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; - params[i+1].buffer_length = sizeof(int8_t); - params[i+1].buffer = v.b; - params[i+1].length = NULL; - params[i+1].is_null = is_null; - params[i+1].num = 10; - - params[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT; - params[i+2].buffer_length = sizeof(int8_t); - params[i+2].buffer = v.v1; - params[i+2].length = NULL; - params[i+2].is_null = is_null; - params[i+2].num = 10; - - params[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT; - params[i+3].buffer_length = sizeof(int16_t); - params[i+3].buffer = v.v2; - params[i+3].length = NULL; - params[i+3].is_null = is_null; - params[i+3].num = 10; - - params[i+4].buffer_type = TSDB_DATA_TYPE_INT; - params[i+4].buffer_length = sizeof(int32_t); - params[i+4].buffer = v.v4; - params[i+4].length = NULL; - params[i+4].is_null = is_null; - params[i+4].num = 10; - - params[i+5].buffer_type = TSDB_DATA_TYPE_BIGINT; - params[i+5].buffer_length = sizeof(int64_t); - params[i+5].buffer = v.v8; - params[i+5].length = NULL; - params[i+5].is_null = is_null; - params[i+5].num = 10; - - params[i+6].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[i+6].buffer_length = sizeof(float); - params[i+6].buffer = v.f4; - params[i+6].length = NULL; - params[i+6].is_null = is_null; - params[i+6].num = 10; - - params[i+7].buffer_type = TSDB_DATA_TYPE_DOUBLE; - params[i+7].buffer_length = sizeof(double); - params[i+7].buffer = v.f8; - params[i+7].length = NULL; - params[i+7].is_null = is_null; - params[i+7].num = 10; - - params[i+8].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+8].buffer_length = 40; - params[i+8].buffer = v.bin; - params[i+8].length = lb; - params[i+8].is_null = is_null; - params[i+8].num = 10; - - params[i+9].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+9].buffer_length = 40; - params[i+9].buffer = v.bin; - params[i+9].length = lb; - params[i+9].is_null = is_null; - params[i+9].num = 10; - - } - - int64_t tts = 1591060628000; - for (int i = 0; i < 10; ++i) { - v.ts[i] = tts + i; - } - - - for (int i = 0; i < 1; ++i) { - tags[i+0].buffer_type = TSDB_DATA_TYPE_BOOL; - tags[i+0].buffer = v.b; - tags[i+0].is_null = &one_not_null; - tags[i+0].length = NULL; - - tags[i+1].buffer_type = TSDB_DATA_TYPE_SMALLINT; - tags[i+1].buffer = v.v2; - tags[i+1].is_null = &one_not_null; - tags[i+1].length = NULL; - - tags[i+2].buffer_type = TSDB_DATA_TYPE_FLOAT; - tags[i+2].buffer = v.f4; - tags[i+2].is_null = &one_not_null; - tags[i+2].length = NULL; - - tags[i+3].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[i+3].buffer = v.bin; - tags[i+3].is_null = &one_not_null; - tags[i+3].length = (uintptr_t *)lb; - } - - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(1,?,2,?,4,?,6.0,?,'b') values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - - - -//1 tables 10 records -int stmt_funcb_autoctb4(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*5); - -// int one_null = 1; - int one_not_null = 0; - - char* is_null = taosMemoryMalloc(sizeof(char) * 10); - char* no_null = taosMemoryMalloc(sizeof(char) * 10); - - for (int i = 0; i < 10; ++i) { - lb[i] = 40; - no_null[i] = 0; - is_null[i] = (i % 10 == 2) ? 1 : 0; - v.b[i] = (int8_t)(i % 2); - v.v1[i] = (int8_t)((i+1) % 2); - v.v2[i] = (int16_t)i; - v.v4[i] = (int32_t)(i+1); - v.v8[i] = (int64_t)(i+2); - v.f4[i] = (float)(i+3); - v.f8[i] = (double)(i+4); - memset(v.bin[i], '0'+i%10, 40); - } - - for (int i = 0; i < 5; i+=5) { - params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[i+0].buffer_length = sizeof(int64_t); - params[i+0].buffer = &v.ts[10*i/10]; - params[i+0].length = NULL; - params[i+0].is_null = no_null; - params[i+0].num = 10; - - params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; - params[i+1].buffer_length = sizeof(int8_t); - params[i+1].buffer = v.b; - params[i+1].length = NULL; - params[i+1].is_null = is_null; - params[i+1].num = 10; - - params[i+2].buffer_type = TSDB_DATA_TYPE_INT; - params[i+2].buffer_length = sizeof(int32_t); - params[i+2].buffer = v.v4; - params[i+2].length = NULL; - params[i+2].is_null = is_null; - params[i+2].num = 10; - - params[i+3].buffer_type = TSDB_DATA_TYPE_BIGINT; - params[i+3].buffer_length = sizeof(int64_t); - params[i+3].buffer = v.v8; - params[i+3].length = NULL; - params[i+3].is_null = is_null; - params[i+3].num = 10; - - params[i+4].buffer_type = TSDB_DATA_TYPE_DOUBLE; - params[i+4].buffer_length = sizeof(double); - params[i+4].buffer = v.f8; - params[i+4].length = NULL; - params[i+4].is_null = is_null; - params[i+4].num = 10; - } - - int64_t tts = 1591060628000; - for (int i = 0; i < 10; ++i) { - v.ts[i] = tts + i; - } - - - for (int i = 0; i < 1; ++i) { - tags[i+0].buffer_type = TSDB_DATA_TYPE_BOOL; - tags[i+0].buffer = v.b; - tags[i+0].is_null = &one_not_null; - tags[i+0].length = NULL; - - tags[i+1].buffer_type = TSDB_DATA_TYPE_SMALLINT; - tags[i+1].buffer = v.v2; - tags[i+1].is_null = &one_not_null; - tags[i+1].length = NULL; - - tags[i+2].buffer_type = TSDB_DATA_TYPE_FLOAT; - tags[i+2].buffer = v.f4; - tags[i+2].is_null = &one_not_null; - tags[i+2].length = NULL; - - tags[i+3].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[i+3].buffer = v.bin; - tags[i+3].is_null = &one_not_null; - tags[i+3].length = (uintptr_t *)lb; - } - - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(1,?,2,?,4,?,6.0,?,'b') (ts,b,v4,v8,f8) values(?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - } - - taos_stmt_bind_param_batch(stmt, params + id * 5); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - -//1 tables 10 records -int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - -// int one_null = 1; - int one_not_null = 0; - - char* is_null = taosMemoryMalloc(sizeof(char) * 10); - char* no_null = taosMemoryMalloc(sizeof(char) * 10); - - for (int i = 0; i < 10; ++i) { - lb[i] = 40; - no_null[i] = 0; - is_null[i] = (i % 10 == 2) ? 1 : 0; - v.b[i] = (int8_t)(i % 2); - v.v1[i] = (int8_t)((i+1) % 2); - v.v2[i] = (int16_t)i; - v.v4[i] = (int32_t)(i+1); - v.v8[i] = (int64_t)(i+2); - v.f4[i] = (float)(i+3); - v.f8[i] = (double)(i+4); - memset(v.bin[i], '0'+i%10, 40); - } - - for (int i = 0; i < 10; i+=10) { - params[i+0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - params[i+0].buffer_length = sizeof(int64_t); - params[i+0].buffer = &v.ts[10*i/10]; - params[i+0].length = NULL; - params[i+0].is_null = no_null; - params[i+0].num = 10; - - params[i+1].buffer_type = TSDB_DATA_TYPE_BOOL; - params[i+1].buffer_length = sizeof(int8_t); - params[i+1].buffer = v.b; - params[i+1].length = NULL; - params[i+1].is_null = is_null; - params[i+1].num = 10; - - params[i+2].buffer_type = TSDB_DATA_TYPE_TINYINT; - params[i+2].buffer_length = sizeof(int8_t); - params[i+2].buffer = v.v1; - params[i+2].length = NULL; - params[i+2].is_null = is_null; - params[i+2].num = 10; - - params[i+3].buffer_type = TSDB_DATA_TYPE_SMALLINT; - params[i+3].buffer_length = sizeof(int16_t); - params[i+3].buffer = v.v2; - params[i+3].length = NULL; - params[i+3].is_null = is_null; - params[i+3].num = 10; - - params[i+4].buffer_type = TSDB_DATA_TYPE_INT; - params[i+4].buffer_length = sizeof(int32_t); - params[i+4].buffer = v.v4; - params[i+4].length = NULL; - params[i+4].is_null = is_null; - params[i+4].num = 10; - - params[i+5].buffer_type = TSDB_DATA_TYPE_BIGINT; - params[i+5].buffer_length = sizeof(int64_t); - params[i+5].buffer = v.v8; - params[i+5].length = NULL; - params[i+5].is_null = is_null; - params[i+5].num = 10; - - params[i+6].buffer_type = TSDB_DATA_TYPE_FLOAT; - params[i+6].buffer_length = sizeof(float); - params[i+6].buffer = v.f4; - params[i+6].length = NULL; - params[i+6].is_null = is_null; - params[i+6].num = 10; - - params[i+7].buffer_type = TSDB_DATA_TYPE_DOUBLE; - params[i+7].buffer_length = sizeof(double); - params[i+7].buffer = v.f8; - params[i+7].length = NULL; - params[i+7].is_null = is_null; - params[i+7].num = 10; - - params[i+8].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+8].buffer_length = 40; - params[i+8].buffer = v.bin; - params[i+8].length = lb; - params[i+8].is_null = is_null; - params[i+8].num = 10; - - params[i+9].buffer_type = TSDB_DATA_TYPE_BINARY; - params[i+9].buffer_length = 40; - params[i+9].buffer = v.bin; - params[i+9].length = lb; - params[i+9].is_null = is_null; - params[i+9].num = 10; - - } - - int64_t tts = 1591060628000; - for (int i = 0; i < 10; ++i) { - v.ts[i] = tts + i; - } - - - for (int i = 0; i < 1; ++i) { - tags[i+0].buffer_type = TSDB_DATA_TYPE_BOOL; - tags[i+0].buffer = v.b; - tags[i+0].is_null = &one_not_null; - tags[i+0].length = NULL; - - tags[i+1].buffer_type = TSDB_DATA_TYPE_SMALLINT; - tags[i+1].buffer = v.v2; - tags[i+1].is_null = &one_not_null; - tags[i+1].length = NULL; - - tags[i+2].buffer_type = TSDB_DATA_TYPE_FLOAT; - tags[i+2].buffer = v.f4; - tags[i+2].is_null = &one_not_null; - tags[i+2].length = NULL; - - tags[i+3].buffer_type = TSDB_DATA_TYPE_BINARY; - tags[i+3].buffer = v.bin; - tags[i+3].is_null = &one_not_null; - tags[i+3].length = (uintptr_t *)lb; - } - - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 (id1,id2,id3,id4,id5,id6,id7,id8,id9) tags(1,?,2,?,4,?,6.0,?,'b') values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); - return -1; - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - -//1 tables 10 records -int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, NULL); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:%s\n", taos_stmt_errstr(stmt)); - return -1; - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - - - -//1 tables 10 records -int stmt_funcb_autoctb_e3(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 (id1,id2,id3,id4,id5,id6,id7,id8,id9) tags(?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(stmt)); - return -1; - //exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, NULL); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. code:0x%x\n", code); - return -1; - } - - taos_stmt_bind_param_batch(stmt, params + id * 10); - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - -//1 tables 10 records -int stmt_funcb_autoctb_e4(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(stmt, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(stmt)); - exit(1); - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. error:%s\n", taos_stmt_errstr(stmt)); - exit(1); - } - - code = taos_stmt_bind_param_batch(stmt, params + id * 10); - if (code != 0) { - printf("failed to execute taos_stmt_bind_param_batch. error:%s\n", taos_stmt_errstr(stmt)); - exit(1); - } - - code = taos_stmt_bind_param_batch(stmt, params + id * 10); - if (code != 0) { - printf("failed to execute taos_stmt_bind_param_batch. error:%s\n", taos_stmt_errstr(stmt)); - return -1; - } - - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - - - - - - -//1 tables 10 records -int stmt_funcb_autoctb_e5(TAOS_STMT *stmt) { - struct { - int64_t *ts; - int8_t b[10]; - int8_t v1[10]; - int16_t v2[10]; - int32_t v4[10]; - int64_t v8[10]; - float f4[10]; - double f8[10]; - char bin[10][40]; - } v = {0}; - - v.ts = taosMemoryMalloc(sizeof(int64_t) * 1 * 10); - - int *lb = taosMemoryMalloc(10 * sizeof(int)); - - TAOS_BIND *tags = taosMemoryCalloc(1, sizeof(TAOS_BIND) * 9 * 1); - TAOS_MULTI_BIND *params = taosMemoryCalloc(1, sizeof(TAOS_MULTI_BIND) * 1*10); - - unsigned long long starttime = taosGetTimestampUs(); - - char *sql = "insert into ? using stb1 tags(?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?)"; - int code = taos_stmt_prepare(NULL, sql, 0); - if (code != 0){ - printf("failed to execute taos_stmt_prepare. code:%s\n", taos_stmt_errstr(NULL)); - return -1; - } - - int id = 0; - for (int zz = 0; zz < 1; zz++) { - char buf[32]; - sprintf(buf, "m%d", zz); - code = taos_stmt_set_tbname_tags(stmt, buf, tags); - if (code != 0){ - printf("failed to execute taos_stmt_set_tbname_tags. error:%s\n", taos_stmt_errstr(stmt)); - exit(1); - } - - code = taos_stmt_bind_param_batch(stmt, params + id * 10); - if (code != 0) { - printf("failed to execute taos_stmt_bind_param_batch. error:%s\n", taos_stmt_errstr(stmt)); - exit(1); - } - - code = taos_stmt_bind_param_batch(stmt, params + id * 10); - if (code != 0) { - printf("failed to execute taos_stmt_bind_param_batch. error:%s\n", taos_stmt_errstr(stmt)); - return -1; - } - - taos_stmt_add_batch(stmt); - } - - if (taos_stmt_execute(stmt) != 0) { - printf("failed to execute insert statement.\n"); - exit(1); - } - - ++id; - - unsigned long long endtime = taosGetTimestampUs(); - printf("insert total %d records, used %u seconds, avg:%u useconds\n", 10, (endtime-starttime)/1000000UL, (endtime-starttime)/(10)); - - taosMemoryFree(v.ts); - taosMemoryFree(lb); - taosMemoryFree(params); - taosMemoryFree(is_null); - taosMemoryFree(no_null); - taosMemoryFree(tags); - - return 0; -} - -#endif - - void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expected, bool silent) { char sql[255] = "SELECT * FROM "; int32_t rows = 0; @@ -3332,6 +2221,7 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { TAOS_STMT *stmt = NULL; int64_t beginUs, endUs, totalUs; CaseCfg cfg = gCase[caseIdx]; + CaseCfg cfgBk; gCurCase = &cfg; if ((gCaseCtrl.bindColTypeNum || gCaseCtrl.bindColNum) && (gCurCase->colNum != gFullColNum)) { @@ -3402,6 +2292,7 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { } totalUs = 0; + cfgBk = cfg; for (int32_t n = 0; n < gCurCase->runTimes; ++n) { if (gCurCase->preCaseIdx < 0) { prepare(taos, gCurCase->colNum, gCurCase->colList, gCurCase->autoCreateTbl); @@ -3423,6 +2314,8 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { totalUs += (endUs - beginUs); prepareCheckResult(taos, silent); + + cfg = cfgBk; } if (!silent) { @@ -3465,16 +2358,20 @@ void* runCaseList(TAOS *taos) { } void runAll(TAOS *taos) { -/* +#if 1 + strcpy(gCaseCtrl.caseCatalog, "Normal Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); -*/ + + strcpy(gCaseCtrl.caseCatalog, "Auto Create Table Test"); gCaseCtrl.autoCreateTbl = true; printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); gCaseCtrl.autoCreateTbl = false; + +#endif /* strcpy(gCaseCtrl.caseCatalog, "Null Test"); @@ -3496,6 +2393,7 @@ void runAll(TAOS *taos) { runCaseList(taos); gCaseCtrl.rowNum = 0; gCaseCtrl.printRes = true; +*/ strcpy(gCaseCtrl.caseCatalog, "Runtimes Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); @@ -3503,12 +2401,15 @@ void runAll(TAOS *taos) { runCaseList(taos); gCaseCtrl.runTimes = 0; +#if 1 strcpy(gCaseCtrl.caseCatalog, "Check Param Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.checkParamNum = true; runCaseList(taos); gCaseCtrl.checkParamNum = false; +#endif +/* strcpy(gCaseCtrl.caseCatalog, "Bind Col Num Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.bindColNum = 6;