Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-29367-3.0
This commit is contained in:
commit
169bb122e6
|
@ -2,7 +2,7 @@
|
||||||
# taosadapter
|
# taosadapter
|
||||||
ExternalProject_Add(taosadapter
|
ExternalProject_Add(taosadapter
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||||
GIT_TAG main
|
GIT_TAG 3.0
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG main
|
GIT_TAG 3.0
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taosws-rs
|
# taosws-rs
|
||||||
ExternalProject_Add(taosws-rs
|
ExternalProject_Add(taosws-rs
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
|
||||||
GIT_TAG main
|
GIT_TAG 3.0
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -85,7 +85,7 @@ extern int32_t tsQueryConcurrentTaskNum;
|
||||||
extern int32_t tsSingleQueryMaxMemorySize;
|
extern int32_t tsSingleQueryMaxMemorySize;
|
||||||
extern int8_t tsQueryUseMemoryPool;
|
extern int8_t tsQueryUseMemoryPool;
|
||||||
extern int8_t tsMemPoolFullFunc;
|
extern int8_t tsMemPoolFullFunc;
|
||||||
//extern int32_t tsQueryBufferPoolSize;
|
// extern int32_t tsQueryBufferPoolSize;
|
||||||
extern int32_t tsMinReservedMemorySize;
|
extern int32_t tsMinReservedMemorySize;
|
||||||
extern int64_t tsCurrentAvailMemorySize;
|
extern int64_t tsCurrentAvailMemorySize;
|
||||||
extern int8_t tsNeedTrim;
|
extern int8_t tsNeedTrim;
|
||||||
|
@ -283,7 +283,7 @@ extern int32_t tsS3MigrateIntervalSec;
|
||||||
extern bool tsS3MigrateEnabled;
|
extern bool tsS3MigrateEnabled;
|
||||||
extern int32_t tsGrantHBInterval;
|
extern int32_t tsGrantHBInterval;
|
||||||
extern int32_t tsUptimeInterval;
|
extern int32_t tsUptimeInterval;
|
||||||
|
extern bool tsUpdateCacheBatch;
|
||||||
extern bool tsDisableStream;
|
extern bool tsDisableStream;
|
||||||
extern int64_t tsStreamBufferSize;
|
extern int64_t tsStreamBufferSize;
|
||||||
extern int tsStreamAggCnt;
|
extern int tsStreamAggCnt;
|
||||||
|
|
|
@ -1112,6 +1112,7 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL
|
||||||
STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
|
STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
|
||||||
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
|
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
|
qDestroyStmtDataBlock(*pDataBlock);
|
||||||
if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
|
if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
|
||||||
tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
|
tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
|
||||||
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
|
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||||
|
|
|
@ -329,6 +329,8 @@ bool tsFilterScalarMode = false;
|
||||||
int tsResolveFQDNRetryTime = 100; // seconds
|
int tsResolveFQDNRetryTime = 100; // seconds
|
||||||
int tsStreamAggCnt = 100000;
|
int tsStreamAggCnt = 100000;
|
||||||
|
|
||||||
|
bool tsUpdateCacheBatch = true;
|
||||||
|
|
||||||
int8_t tsS3EpNum = 0;
|
int8_t tsS3EpNum = 0;
|
||||||
char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<endpoint>"};
|
char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<endpoint>"};
|
||||||
char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskey>"};
|
char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskey>"};
|
||||||
|
|
|
@ -893,7 +893,7 @@ static void mndCompactPullup(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
|
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
|
||||||
if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
|
if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -995,6 +995,7 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
|
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
|
|
|
@ -108,30 +108,94 @@ static bool checkStatusForEachReplica(SVgObj *pVgroup) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
|
||||||
|
SSnodeObj *pObj = NULL;
|
||||||
|
void *pIter = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeEntry entry = {.nodeId = SNODE_HANDLE};
|
||||||
|
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
|
if (code) {
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
char buf[256] = {0};
|
||||||
|
code = epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
if (code != 0) { // print error and continue
|
||||||
|
mError("failed to convert epset to str, code:%s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
void *p = taosArrayPush(pVgroupList, &entry);
|
||||||
|
if (p == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
ESdbStatus objStatus;
|
||||||
|
void *pIter = NULL;
|
||||||
|
SMnodeObj *pObj = NULL;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (objStatus != SDB_STATUS_READY) {
|
||||||
|
mWarn("mnode status:%d not ready", objStatus);
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArray *pVgroupList = NULL;
|
|
||||||
SHashObj *pHash = NULL;
|
SHashObj *pHash = NULL;
|
||||||
|
|
||||||
pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
|
|
||||||
if (pVgroupList == NULL) {
|
|
||||||
mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
|
|
||||||
code = terrno;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
if (pHash == NULL) {
|
if (pHash == NULL) {
|
||||||
mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
|
mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
|
||||||
code = terrno;
|
return terrno;
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*allReady = true;
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -148,7 +212,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
|
mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto _err; // take snapshot failed, and not all ready
|
goto _end; // take snapshot failed, and not all ready
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (*pReplica != pVgroup->replica) {
|
if (*pReplica != pVgroup->replica) {
|
||||||
|
@ -158,7 +222,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not all ready till now, no need to check the remaining vgroups.
|
// if not all ready till now, no need to check the remaining vgroups,
|
||||||
// but still we need to put the info of the existed vgroups into the snapshot list
|
// but still we need to put the info of the existed vgroups into the snapshot list
|
||||||
if (*allReady) {
|
if (*allReady) {
|
||||||
*allReady = checkStatusForEachReplica(pVgroup);
|
*allReady = checkStatusForEachReplica(pVgroup);
|
||||||
|
@ -176,7 +240,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto _err;
|
goto _end;
|
||||||
} else {
|
} else {
|
||||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
}
|
}
|
||||||
|
@ -184,51 +248,49 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnodeObj *pObj = NULL;
|
_end:
|
||||||
while (1) {
|
taosHashCleanup(pHash);
|
||||||
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
return code;
|
||||||
if (pIter == NULL) {
|
}
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNodeEntry entry = {.nodeId = SNODE_HANDLE};
|
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
int32_t code = 0;
|
||||||
if (code) {
|
SArray *pVgroupList = NULL;
|
||||||
sdbRelease(pSdb, pObj);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
char buf[256] = {0};
|
*pList = NULL;
|
||||||
code = epsetToStr(&entry.epset, buf, tListLen(buf));
|
*allReady = true;
|
||||||
if (code != 0) { // print error and continue
|
|
||||||
mError("failed to convert epset to str, code:%s", tstrerror(code));
|
|
||||||
}
|
|
||||||
|
|
||||||
void *p = taosArrayPush(pVgroupList, &entry);
|
pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
if (p == NULL) {
|
if (pVgroupList == NULL) {
|
||||||
code = terrno;
|
mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
|
||||||
sdbRelease(pSdb, pObj);
|
code = terrno;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
goto _err;
|
||||||
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
|
}
|
||||||
goto _err;
|
|
||||||
} else {
|
|
||||||
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pObj);
|
// 1. check for all vnodes status
|
||||||
|
code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. add snode info
|
||||||
|
code = mndAddSnodeInfo(pMnode, pVgroupList);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. check for mnode status
|
||||||
|
code = mndCheckMnodeStatus(pMnode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
*allReady = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pList = pVgroupList;
|
*pList = pVgroupList;
|
||||||
taosHashCleanup(pHash);
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
taosArrayDestroy(pVgroupList);
|
taosArrayDestroy(pVgroupList);
|
||||||
taosHashCleanup(pHash);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -250,6 +250,7 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
||||||
|
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func);
|
||||||
|
|
||||||
// STbData
|
// STbData
|
||||||
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
||||||
|
@ -335,7 +336,6 @@ struct STsdbFS {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
rocksdb_t *db;
|
rocksdb_t *db;
|
||||||
rocksdb_comparator_t *my_comparator;
|
rocksdb_comparator_t *my_comparator;
|
||||||
rocksdb_cache_t *blockcache;
|
|
||||||
rocksdb_block_based_table_options_t *tableoptions;
|
rocksdb_block_based_table_options_t *tableoptions;
|
||||||
rocksdb_options_t *options;
|
rocksdb_options_t *options;
|
||||||
rocksdb_flushoptions_t *flushoptions;
|
rocksdb_flushoptions_t *flushoptions;
|
||||||
|
@ -347,6 +347,7 @@ typedef struct {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
STSchema *pTSchema;
|
STSchema *pTSchema;
|
||||||
|
SArray *ctxArray;
|
||||||
} SRocksCache;
|
} SRocksCache;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -325,6 +325,27 @@ void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *
|
||||||
taosRUnLockLatch(&pMemTable->latch);
|
taosRUnLockLatch(&pMemTable->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef int32_t (*__tsdb_cache_update)(SMemTable *imem, int64_t suid, int64_t uid);
|
||||||
|
|
||||||
|
int32_t tsdbMemTableSaveToCache(SMemTable *pMemTable, void *func) {
|
||||||
|
int32_t code = 0;
|
||||||
|
__tsdb_cache_update cb = (__tsdb_cache_update)func;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
|
||||||
|
STbData *pTbData = pMemTable->aBucket[i];
|
||||||
|
while (pTbData) {
|
||||||
|
code = (*cb)(pMemTable, pTbData->suid, pTbData->uid);
|
||||||
|
if (code) {
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
pTbData = pTbData->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -659,7 +680,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
pTbData->maxKey = key.key.ts;
|
pTbData->maxKey = key.key.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
|
||||||
if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
|
if (tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData) != 0) {
|
||||||
tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
|
tsdbError("vgId:%d, failed to update cache data from table suid:%" PRId64 " uid:%" PRId64 " at version %" PRId64,
|
||||||
TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
|
TD_VID(pMemTable->pTsdb->pVnode), pTbData->suid, pTbData->uid, version);
|
||||||
|
@ -721,7 +742,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
|
||||||
if (key.key.ts >= pTbData->maxKey) {
|
if (key.key.ts >= pTbData->maxKey) {
|
||||||
pTbData->maxKey = key.key.ts;
|
pTbData->maxKey = key.key.ts;
|
||||||
}
|
}
|
||||||
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) {
|
if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config) && !tsUpdateCacheBatch) {
|
||||||
TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
|
TAOS_UNUSED(tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1849,7 +1849,6 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
||||||
tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
|
tstrncpy(pStbRowsCxt->ctbName.tname, tbName, sizeof(pStbRowsCxt->ctbName.tname));
|
||||||
tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
|
tstrncpy(pStmt->usingTableName.tname, pStmt->targetTableName.tname, sizeof(pStmt->usingTableName.tname));
|
||||||
tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
|
tstrncpy(pStmt->targetTableName.tname, tbName, sizeof(pStmt->targetTableName.tname));
|
||||||
|
|
||||||
tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
|
tstrncpy(pStmt->usingTableName.dbname, pStmt->targetTableName.dbname, sizeof(pStmt->usingTableName.dbname));
|
||||||
pStmt->usingTableName.type = 1;
|
pStmt->usingTableName.type = 1;
|
||||||
pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; // set the table type to child table for parse cache
|
pStmt->pTableMeta->tableType = TSDB_CHILD_TABLE; // set the table type to child table for parse cache
|
||||||
|
@ -2060,9 +2059,7 @@ static int32_t parseStbBoundInfo(SVnodeModifyOpStmt* pStmt, SStbRowsDataContext*
|
||||||
|
|
||||||
insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo));
|
insDestroyBoundColInfo(&((*ppTableDataCxt)->boundColsInfo));
|
||||||
(*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo;
|
(*ppTableDataCxt)->boundColsInfo = pStbRowsCxt->boundColsInfo;
|
||||||
(*ppTableDataCxt)->boundColsInfo.numOfCols = pStbRowsCxt->boundColsInfo.numOfBound;
|
|
||||||
(*ppTableDataCxt)->boundColsInfo.numOfBound = pStbRowsCxt->boundColsInfo.numOfBound;
|
|
||||||
(*ppTableDataCxt)->boundColsInfo.hasBoundCols = pStbRowsCxt->boundColsInfo.hasBoundCols;
|
|
||||||
(*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t));
|
(*ppTableDataCxt)->boundColsInfo.pColIndex = taosMemoryCalloc(pStbRowsCxt->boundColsInfo.numOfBound, sizeof(int16_t));
|
||||||
if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) {
|
if (NULL == (*ppTableDataCxt)->boundColsInfo.pColIndex) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -3175,9 +3172,8 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
||||||
.isStmtBind = pCxt->isStmtBind};
|
.isStmtBind = pCxt->isStmtBind};
|
||||||
|
|
||||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||||
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)((*pQuery)->pRoot);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = parseInsertSqlImpl(&context, pStmt);
|
code = parseInsertSqlImpl(&context, (SVnodeModifyOpStmt*)((*pQuery)->pRoot));
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setNextStageInfo(&context, *pQuery, pCatalogReq);
|
code = setNextStageInfo(&context, *pQuery, pCatalogReq);
|
||||||
|
|
|
@ -606,6 +606,13 @@ int32_t qBindStmtTagsValue2(void* pBlock, void* boundTags, int64_t suid, const c
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
SVCreateTbReq* tmp = pDataBlock->pData->pCreateTbReq;
|
||||||
|
taosMemoryFreeClear(tmp->name);
|
||||||
|
taosMemoryFreeClear(tmp->ctb.pTag);
|
||||||
|
taosMemoryFreeClear(tmp->ctb.stbName);
|
||||||
|
taosArrayDestroy(tmp->ctb.tagName);
|
||||||
|
tmp->ctb.tagName = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
|
code = insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName,
|
||||||
|
|
|
@ -475,7 +475,13 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy
|
||||||
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND);
|
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(cfgSetItemVal(pItem, name, value, stype));
|
code = cfgSetItemVal(pItem, name, value, stype);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (lock) {
|
||||||
|
(void)taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
}
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
if (lock) {
|
if (lock) {
|
||||||
(void)taosThreadMutexUnlock(&pCfg->lock);
|
(void)taosThreadMutexUnlock(&pCfg->lock);
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
[10/28 19:12:21.666563] SUCC: created database (db_sub)
|
|
||||||
[10/28 19:12:21.694603] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 19:12:21.823202] SUCC: Spent 0.1290 seconds to create 1000 table(s) with 8 thread(s) speed: 7752 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 19:12:22.127442] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 441047.79 records/second
|
|
||||||
[10/28 19:12:22.128649] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 440895.33 records/second
|
|
||||||
[10/28 19:12:22.129478] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 440151.69 records/second
|
|
||||||
[10/28 19:12:22.133756] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 433268.05 records/second
|
|
||||||
[10/28 19:12:22.135211] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 430329.63 records/second
|
|
||||||
[10/28 19:12:22.137335] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 425800.08 records/second
|
|
||||||
[10/28 19:12:22.138252] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 426330.15 records/second
|
|
||||||
[10/28 19:12:22.141351] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 422778.64 records/second
|
|
||||||
[10/28 19:12:22.141585] SUCC: Spent 0.311648 (real 0.289041) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3208748.33 (real 3459716.79) records/second
|
|
||||||
[10/28 19:12:22.141590] SUCC: insert delay, min: 0.9600ms, avg: 2.3123ms, p90: 3.1790ms, p95: 3.5080ms, p99: 4.2230ms, max: 4.9040ms
|
|
||||||
[10/28 19:28:50.798427] SUCC: created database (db_sub)
|
|
||||||
[10/28 19:28:50.828326] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 19:28:50.936429] SUCC: Spent 0.1080 seconds to create 1000 table(s) with 8 thread(s) speed: 9259 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 19:28:51.187235] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 539204.48 records/second
|
|
||||||
[10/28 19:28:51.189941] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 532329.43 records/second
|
|
||||||
[10/28 19:28:51.191551] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 530954.66 records/second
|
|
||||||
[10/28 19:28:51.191858] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 529259.59 records/second
|
|
||||||
[10/28 19:28:51.192459] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 530229.44 records/second
|
|
||||||
[10/28 19:28:51.195372] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 522099.42 records/second
|
|
||||||
[10/28 19:28:51.197727] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 516620.72 records/second
|
|
||||||
[10/28 19:28:51.197883] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 517125.12 records/second
|
|
||||||
[10/28 19:28:51.198123] SUCC: Spent 0.255536 (real 0.237135) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3913342.93 (real 4217007.19) records/second
|
|
||||||
[10/28 19:28:51.198130] SUCC: insert delay, min: 0.9200ms, avg: 1.8971ms, p90: 2.6870ms, p95: 2.9520ms, p99: 3.5880ms, max: 4.0710ms
|
|
||||||
[10/28 19:31:44.377691] SUCC: created database (db_sub)
|
|
||||||
[10/28 19:31:44.392998] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 19:31:44.696768] SUCC: Spent 0.3040 seconds to create 1000 table(s) with 8 thread(s) speed: 3289 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 19:31:45.126910] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 304775.47 records/second
|
|
||||||
[10/28 19:31:45.131979] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 301117.75 records/second
|
|
||||||
[10/28 19:31:45.135106] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 299854.39 records/second
|
|
||||||
[10/28 19:31:45.135675] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 298322.24 records/second
|
|
||||||
[10/28 19:31:45.137069] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 297733.89 records/second
|
|
||||||
[10/28 19:31:45.137952] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 296900.13 records/second
|
|
||||||
[10/28 19:31:45.138834] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 295170.54 records/second
|
|
||||||
[10/28 19:31:45.145048] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 291966.71 records/second
|
|
||||||
[10/28 19:31:45.145369] SUCC: Spent 0.442506 (real 0.419200) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2259856.36 (real 2385496.18) records/second
|
|
||||||
[10/28 19:31:45.145377] SUCC: insert delay, min: 1.0400ms, avg: 3.3536ms, p90: 5.3120ms, p95: 7.9660ms, p99: 13.1570ms, max: 19.1410ms
|
|
||||||
[10/28 19:44:19.873056] SUCC: created database (db_sub)
|
|
||||||
[10/28 19:44:19.904701] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 19:44:20.053846] SUCC: Spent 0.1490 seconds to create 1000 table(s) with 8 thread(s) speed: 6711 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 19:44:20.328698] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 485742.49 records/second
|
|
||||||
[10/28 19:44:20.330777] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 481686.29 records/second
|
|
||||||
[10/28 19:44:20.331290] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 480911.65 records/second
|
|
||||||
[10/28 19:44:20.331665] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 481043.06 records/second
|
|
||||||
[10/28 19:44:20.333451] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 477172.09 records/second
|
|
||||||
[10/28 19:44:20.334745] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 475675.84 records/second
|
|
||||||
[10/28 19:44:20.335056] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 474158.37 records/second
|
|
||||||
[10/28 19:44:20.337919] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 470816.89 records/second
|
|
||||||
[10/28 19:44:20.338144] SUCC: Spent 0.277921 (real 0.261310) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3598144.80 (real 3826872.30) records/second
|
|
||||||
[10/28 19:44:20.338153] SUCC: insert delay, min: 0.9180ms, avg: 2.0905ms, p90: 2.6490ms, p95: 3.0620ms, p99: 4.1480ms, max: 4.7840ms
|
|
||||||
[10/28 19:58:27.100989] SUCC: created database (db_sub)
|
|
||||||
[10/28 19:58:27.115572] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 19:58:27.362948] SUCC: Spent 0.2470 seconds to create 1000 table(s) with 8 thread(s) speed: 4049 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 19:58:27.807669] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 291891.03 records/second
|
|
||||||
[10/28 19:58:27.818785] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 285413.54 records/second
|
|
||||||
[10/28 19:58:27.819649] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 284193.61 records/second
|
|
||||||
[10/28 19:58:27.819844] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 284352.64 records/second
|
|
||||||
[10/28 19:58:27.820170] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 284576.63 records/second
|
|
||||||
[10/28 19:58:27.821489] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 283781.33 records/second
|
|
||||||
[10/28 19:58:27.822061] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 283112.24 records/second
|
|
||||||
[10/28 19:58:27.823513] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 282730.59 records/second
|
|
||||||
[10/28 19:58:27.823779] SUCC: Spent 0.455783 (real 0.438625) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2194026.54 (real 2279851.81) records/second
|
|
||||||
[10/28 19:58:27.823786] SUCC: insert delay, min: 0.9780ms, avg: 3.5090ms, p90: 5.5650ms, p95: 6.8600ms, p99: 10.6010ms, max: 13.4400ms
|
|
||||||
[10/28 20:00:06.417182] SUCC: created database (db_sub)
|
|
||||||
[10/28 20:00:06.448202] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 20:00:06.596961] SUCC: Spent 0.1480 seconds to create 1000 table(s) with 8 thread(s) speed: 6757 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 20:00:06.895455] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 443978.76 records/second
|
|
||||||
[10/28 20:00:06.896986] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 442549.94 records/second
|
|
||||||
[10/28 20:00:06.897536] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 440927.99 records/second
|
|
||||||
[10/28 20:00:06.898905] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 439131.15 records/second
|
|
||||||
[10/28 20:00:06.899024] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 439628.46 records/second
|
|
||||||
[10/28 20:00:06.901861] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 435197.37 records/second
|
|
||||||
[10/28 20:00:06.902305] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 434812.86 records/second
|
|
||||||
[10/28 20:00:06.904698] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 433406.26 records/second
|
|
||||||
[10/28 20:00:06.904905] SUCC: Spent 0.301788 (real 0.284949) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3313584.37 (real 3509399.93) records/second
|
|
||||||
[10/28 20:00:06.904912] SUCC: insert delay, min: 0.8770ms, avg: 2.2796ms, p90: 3.1340ms, p95: 3.6480ms, p99: 4.8280ms, max: 6.0880ms
|
|
||||||
[10/28 20:05:34.756207] SUCC: created database (db_sub)
|
|
||||||
[10/28 20:05:34.784793] INFO: start creating 1000 table(s) with 8 thread(s)
|
|
||||||
[10/28 20:05:34.927068] SUCC: Spent 0.1430 seconds to create 1000 table(s) with 8 thread(s) speed: 6993 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
|
||||||
[10/28 20:05:35.213741] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 466952.82 records/second
|
|
||||||
[10/28 20:05:35.215403] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 463804.68 records/second
|
|
||||||
[10/28 20:05:35.221132] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 453322.31 records/second
|
|
||||||
[10/28 20:05:35.221224] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 453671.11 records/second
|
|
||||||
[10/28 20:05:35.222003] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 452641.07 records/second
|
|
||||||
[10/28 20:05:35.222536] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 451796.89 records/second
|
|
||||||
[10/28 20:05:35.223663] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 449643.52 records/second
|
|
||||||
[10/28 20:05:35.225246] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 447768.68 records/second
|
|
||||||
[10/28 20:05:35.225659] SUCC: Spent 0.290871 (real 0.274808) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3437950.16 (real 3638904.25) records/second
|
|
||||||
[10/28 20:05:35.225666] SUCC: insert delay, min: 0.9360ms, avg: 2.1985ms, p90: 2.9290ms, p95: 3.4580ms, p99: 4.6030ms, max: 6.2660ms
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
// g++ --std=c++17 -o multiQueryLastrow multiQueryLastrow.cpp -ltaos -lpthread -ltaosws
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <iostream>
|
||||||
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taosws.h"
|
||||||
|
|
||||||
|
int numThreads = 5;
|
||||||
|
int numQuerys = 100;
|
||||||
|
int queryType = 0;
|
||||||
|
int numConnections = 1;
|
||||||
|
bool useWebSocket = 0;
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
const std::string dbName = "iot";
|
||||||
|
const std::string sTableName = "m";
|
||||||
|
int maxTableIndex = 50000;
|
||||||
|
|
||||||
|
std::mutex mtx;
|
||||||
|
std::condition_variable cv;
|
||||||
|
vector<TAOS*> taosArray;
|
||||||
|
vector<WS_TAOS*> wtaosArray;
|
||||||
|
|
||||||
|
std::atomic<int> finishCounter;
|
||||||
|
std::chrono::system_clock::time_point startTime;
|
||||||
|
std::chrono::system_clock::time_point stopTime;
|
||||||
|
unordered_map<int, chrono::nanoseconds> consumeHash;
|
||||||
|
|
||||||
|
static void query(int numQuerys, int id, int type);
|
||||||
|
|
||||||
|
void threadFunction(int id) {
|
||||||
|
// std::unique_lock<std::mutex> lock(mtx);
|
||||||
|
// cv.wait(lock);
|
||||||
|
|
||||||
|
// lock.unlock();
|
||||||
|
|
||||||
|
//auto startQueryTime = std::chrono::system_clock::now();
|
||||||
|
|
||||||
|
query(numQuerys, id, queryType);
|
||||||
|
|
||||||
|
//consumeHash[id] = std::chrono::system_clock::now() - startQueryTime;
|
||||||
|
|
||||||
|
// int counter = finishCounter.fetch_add(1);
|
||||||
|
// if (counter == numThreads - 1) {
|
||||||
|
// stopTime = std::chrono::system_clock::now();
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
void createThreads(const int numThreads, std::vector<std::thread>* pThreads) {
|
||||||
|
for (int i = 0; i < numThreads; ++i) {
|
||||||
|
pThreads->emplace_back(threadFunction, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "2. Threads created\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
void connect() {
|
||||||
|
void* res = NULL;
|
||||||
|
|
||||||
|
for (auto i = 0; i < numConnections; i++) {
|
||||||
|
if (useWebSocket) {
|
||||||
|
const char* dsn = "taos+ws://localhost:6041";
|
||||||
|
WS_TAOS* wtaos = ws_connect(dsn);
|
||||||
|
int32_t code = 0;
|
||||||
|
if (wtaos == NULL) {
|
||||||
|
code = ws_errno(NULL);
|
||||||
|
const char* errstr = ws_errstr(NULL);
|
||||||
|
std::cout << "Connection failed[" << code << "]: " << errstr << "\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
code = ws_select_db(wtaos, dbName.c_str());
|
||||||
|
const char* errstr = ws_errstr(wtaos);
|
||||||
|
if (code) {
|
||||||
|
std::cout << "Connection failed on select db[" << code << "]: " << errstr << "\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
wtaosArray.push_back(wtaos);
|
||||||
|
} else {
|
||||||
|
TAOS* taos = taos_connect("127.0.0.1", "root", "taosdata", dbName.c_str(), 0);
|
||||||
|
if (!taos) {
|
||||||
|
std::cerr << "Failed to connect to TDengine\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosArray.push_back(taos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "1. Success to connect to TDengine\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
void query(int numQuerys, int id, int type) {
|
||||||
|
int connIdx = id % numConnections;
|
||||||
|
|
||||||
|
for (int i = 0; i < numQuerys; i++) {
|
||||||
|
std::string sql;
|
||||||
|
if (type == 0) {
|
||||||
|
sql = "select last_row(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex);
|
||||||
|
} else {
|
||||||
|
sql = "select first(ts) from " + sTableName + std::to_string((i * numThreads + id) % maxTableIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!useWebSocket) {
|
||||||
|
TAOS* taos = taosArray[connIdx];
|
||||||
|
|
||||||
|
TAOS_RES* res = taos_query(taos, sql.c_str());
|
||||||
|
if (!res) {
|
||||||
|
std::cerr << "Failed to query TDengine\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taos_errno(res) != 0) {
|
||||||
|
std::cerr << "Failed to query TDengine since: " << taos_errstr(res) << "\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
} else {
|
||||||
|
WS_TAOS* wtaos = wtaosArray[connIdx];
|
||||||
|
|
||||||
|
WS_RES* wres = ws_query(wtaos, sql.c_str());
|
||||||
|
if (!wres) {
|
||||||
|
std::cerr << "Failed to query TDengine\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = ws_errno(wres);
|
||||||
|
if (code != 0) {
|
||||||
|
std::cerr << "Failed to query TDengine since: " << ws_errstr(wres) << "\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ws_free_result(wres);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void printHelp() {
|
||||||
|
std::cout << "./multiQueryLastrow {numThreads} {numQuerys} {queryType} {numConnections} {useWebSocket}\n";
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
if (argc != 6) {
|
||||||
|
printHelp();
|
||||||
|
}
|
||||||
|
|
||||||
|
numThreads = atoi(argv[1]);
|
||||||
|
numQuerys = atoi(argv[2]);
|
||||||
|
queryType = atoi(argv[3]);
|
||||||
|
numConnections = atoi(argv[4]);
|
||||||
|
useWebSocket = atoi(argv[5]);
|
||||||
|
|
||||||
|
std::string queryTypeStr = (queryType == 0) ? "last_row(ts)" : "first(ts)";
|
||||||
|
std::cout << "numThreads:" << numThreads << ", queryTimes:" << numQuerys << ", queryType:" << queryTypeStr
|
||||||
|
<< ", numConnections:" << numConnections << ", useWebSocket:" << useWebSocket << "\n";
|
||||||
|
|
||||||
|
finishCounter.store(0);
|
||||||
|
|
||||||
|
connect();
|
||||||
|
|
||||||
|
//startTime = std::chrono::system_clock::now();
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
createThreads(numThreads, &threads);
|
||||||
|
|
||||||
|
//std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
|
||||||
|
std::cout << "3. Start quering\n";
|
||||||
|
|
||||||
|
startTime = std::chrono::system_clock::now();
|
||||||
|
|
||||||
|
//cv.notify_all();
|
||||||
|
|
||||||
|
for (auto& t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
stopTime = std::chrono::system_clock::now();
|
||||||
|
|
||||||
|
for (auto& taos : taosArray) {
|
||||||
|
taos_close(taos);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& wtaos : wtaosArray) {
|
||||||
|
ws_close(wtaos);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "4. All job done\n";
|
||||||
|
|
||||||
|
int64_t totalQueryConsumeMs = 0;
|
||||||
|
for (auto& res : consumeHash) {
|
||||||
|
totalQueryConsumeMs += res.second.count() /1000000;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::nanoseconds elp = stopTime - startTime;
|
||||||
|
int64_t elpMs = elp.count() / 1000000;
|
||||||
|
int64_t totalQueryCount = numThreads * numQuerys;
|
||||||
|
|
||||||
|
std::cout << totalQueryCount << " queries finished in " << elpMs << " ms\n";
|
||||||
|
std::cout << (float)totalQueryCount * 1000 / elpMs << "q/s\n";
|
||||||
|
std::cout << "avg cost:" << totalQueryConsumeMs / totalQueryCount << " ms/q\n";
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -33,7 +33,7 @@ void do_stmt(TAOS* taos) {
|
||||||
|
|
||||||
char* tbs[2] = {"tb", "tb2"};
|
char* tbs[2] = {"tb", "tb2"};
|
||||||
int t1_val[2] = {0, 1};
|
int t1_val[2] = {0, 1};
|
||||||
int t2_len[2] = {10, 10};
|
int t2_len[2] = {5, 5};
|
||||||
int t3_len[2] = {sizeof(int), sizeof(int)};
|
int t3_len[2] = {sizeof(int), sizeof(int)};
|
||||||
TAOS_STMT2_BIND tags[2][2] = {{{0, &t1_val[0], &t3_len[0], NULL, 0}, {0, "after1", &t2_len[0], NULL, 0}},
|
TAOS_STMT2_BIND tags[2][2] = {{{0, &t1_val[0], &t3_len[0], NULL, 0}, {0, "after1", &t2_len[0], NULL, 0}},
|
||||||
{{0, &t1_val[1], &t3_len[1], NULL, 0}, {0, "after2", &t2_len[1], NULL, 0}}};
|
{{0, &t1_val[1], &t3_len[1], NULL, 0}, {0, "after2", &t2_len[1], NULL, 0}}};
|
||||||
|
@ -87,7 +87,7 @@ void do_stmt(TAOS* taos) {
|
||||||
taos_stmt2_close(stmt);
|
taos_stmt2_close(stmt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
taos_stmt2_free_stb_fields(stmt, pFields);
|
||||||
taos_stmt2_close(stmt);
|
taos_stmt2_close(stmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue