Merge branch 'main' into feature/3_liaohj
This commit is contained in:
commit
8ca981b3f0
|
@ -141,12 +141,12 @@ ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
|
IF ("${SIMD_SUPPORT}" MATCHES "true")
|
||||||
IF (COMPILER_SUPPORT_FMA)
|
IF (COMPILER_SUPPORT_FMA)
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
IF ("${SIMD_SUPPORT}" MATCHES "true")
|
|
||||||
IF (COMPILER_SUPPORT_AVX)
|
IF (COMPILER_SUPPORT_AVX)
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG 261fcca
|
GIT_TAG 11b60a4
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -733,7 +733,7 @@ To prevent system resource from being exhausted by multiple concurrent streams,
|
||||||
| 42 | numOfCommitThreads | Yes | Yes |
|
| 42 | numOfCommitThreads | Yes | Yes |
|
||||||
| 43 | numOfMnodeReadThreads | No | Yes |
|
| 43 | numOfMnodeReadThreads | No | Yes |
|
||||||
| 44 | numOfVnodeQueryThreads | No | Yes |
|
| 44 | numOfVnodeQueryThreads | No | Yes |
|
||||||
| 45 | numOfVnodeStreamThreads | No | Yes |
|
| 45 | ratioOfVnodeStreamThreads | No | Yes |
|
||||||
| 46 | numOfVnodeFetchThreads | No | Yes |
|
| 46 | numOfVnodeFetchThreads | No | Yes |
|
||||||
| 47 | numOfVnodeRsmaThreads | No | Yes |
|
| 47 | numOfVnodeRsmaThreads | No | Yes |
|
||||||
| 48 | numOfQnodeQueryThreads | No | Yes |
|
| 48 | numOfQnodeQueryThreads | No | Yes |
|
||||||
|
|
|
@ -709,7 +709,7 @@ charset 的有效值是 UTF-8。
|
||||||
| 42 | numOfCommitThreads | 是 | 是 | |
|
| 42 | numOfCommitThreads | 是 | 是 | |
|
||||||
| 43 | numOfMnodeReadThreads | 否 | 是 | |
|
| 43 | numOfMnodeReadThreads | 否 | 是 | |
|
||||||
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
|
| 44 | numOfVnodeQueryThreads | 否 | 是 | |
|
||||||
| 45 | numOfVnodeStreamThreads | 否 | 是 | |
|
| 45 | ratioOfVnodeStreamThreads | 否 | 是 | |
|
||||||
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
|
| 46 | numOfVnodeFetchThreads | 否 | 是 | |
|
||||||
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
|
| 47 | numOfVnodeRsmaThreads | 否 | 是 | |
|
||||||
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
|
| 48 | numOfQnodeQueryThreads | 否 | 是 | |
|
||||||
|
|
|
@ -55,7 +55,7 @@ extern int32_t tsNumOfMnodeQueryThreads;
|
||||||
extern int32_t tsNumOfMnodeFetchThreads;
|
extern int32_t tsNumOfMnodeFetchThreads;
|
||||||
extern int32_t tsNumOfMnodeReadThreads;
|
extern int32_t tsNumOfMnodeReadThreads;
|
||||||
extern int32_t tsNumOfVnodeQueryThreads;
|
extern int32_t tsNumOfVnodeQueryThreads;
|
||||||
extern int32_t tsNumOfVnodeStreamThreads;
|
extern float tsRatioOfVnodeStreamThreads;
|
||||||
extern int32_t tsNumOfVnodeFetchThreads;
|
extern int32_t tsNumOfVnodeFetchThreads;
|
||||||
extern int32_t tsNumOfVnodeRsmaThreads;
|
extern int32_t tsNumOfVnodeRsmaThreads;
|
||||||
extern int32_t tsNumOfQnodeQueryThreads;
|
extern int32_t tsNumOfQnodeQueryThreads;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_UTIL_WORKER_H_
|
#define _TD_UTIL_WORKER_H_
|
||||||
|
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -29,7 +30,7 @@ typedef struct SQWorker {
|
||||||
int32_t id; // worker id
|
int32_t id; // worker id
|
||||||
int64_t pid; // thread pid
|
int64_t pid; // thread pid
|
||||||
TdThread thread; // thread id
|
TdThread thread; // thread id
|
||||||
SQWorkerPool *pool;
|
void *pool;
|
||||||
} SQWorker;
|
} SQWorker;
|
||||||
|
|
||||||
typedef struct SQWorkerPool {
|
typedef struct SQWorkerPool {
|
||||||
|
@ -42,6 +43,14 @@ typedef struct SQWorkerPool {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SQWorkerPool;
|
} SQWorkerPool;
|
||||||
|
|
||||||
|
typedef struct SAutoQWorkerPool {
|
||||||
|
float ratio;
|
||||||
|
STaosQset *qset;
|
||||||
|
const char *name;
|
||||||
|
SArray *workers;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
} SAutoQWorkerPool;
|
||||||
|
|
||||||
typedef struct SWWorker {
|
typedef struct SWWorker {
|
||||||
int32_t id; // worker id
|
int32_t id; // worker id
|
||||||
int64_t pid; // thread pid
|
int64_t pid; // thread pid
|
||||||
|
@ -65,6 +74,11 @@ void tQWorkerCleanup(SQWorkerPool *pool);
|
||||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
|
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
|
||||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
|
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool);
|
||||||
|
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool);
|
||||||
|
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp);
|
||||||
|
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue);
|
||||||
|
|
||||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||||
void tWWorkerCleanup(SWWorkerPool *pool);
|
void tWWorkerCleanup(SWWorkerPool *pool);
|
||||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
|
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
|
||||||
|
|
|
@ -47,7 +47,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfVnodeStreamThreads = 2;
|
float tsRatioOfVnodeStreamThreads = 1.0;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||||
|
@ -392,9 +392,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
|
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, 0) != 0) return -1;
|
||||||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
|
||||||
|
|
||||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||||
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||||
|
@ -513,11 +511,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem = cfgGetItem(tsCfg, "numOfVnodeStreamThreads");
|
pItem = cfgGetItem(tsCfg, "ratioOfVnodeStreamThreads");
|
||||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||||
tsNumOfVnodeStreamThreads = numOfCores / 4;
|
pItem->fval = tsRatioOfVnodeStreamThreads;
|
||||||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
|
||||||
pItem->i32 = tsNumOfVnodeStreamThreads;
|
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,7 +706,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
|
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
||||||
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
|
||||||
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
|
||||||
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
|
||||||
|
|
|
@ -268,6 +268,11 @@ int mainWindows(int argc, char **argv) {
|
||||||
|
|
||||||
if (dmInit() != 0) {
|
if (dmInit() != 0) {
|
||||||
dError("failed to init dnode since %s", terrstr());
|
dError("failed to init dnode since %s", terrstr());
|
||||||
|
|
||||||
|
taosCleanupCfg();
|
||||||
|
taosCloseLog();
|
||||||
|
taosCleanupArgs();
|
||||||
|
taosConvDestroy();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ typedef struct SVnodeMgmt {
|
||||||
const char *path;
|
const char *path;
|
||||||
const char *name;
|
const char *name;
|
||||||
SQWorkerPool queryPool;
|
SQWorkerPool queryPool;
|
||||||
SQWorkerPool streamPool;
|
SAutoQWorkerPool streamPool;
|
||||||
SWWorkerPool fetchPool;
|
SWWorkerPool fetchPool;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
SHashObj *hash;
|
SHashObj *hash;
|
||||||
|
|
|
@ -318,7 +318,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
(void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);
|
||||||
|
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||||
|
|
||||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
|
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncCtrlW.queue == NULL ||
|
||||||
|
@ -344,7 +344,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
|
|
||||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
pVnode->pQueryQ = NULL;
|
pVnode->pQueryQ = NULL;
|
||||||
pVnode->pStreamQ = NULL;
|
pVnode->pStreamQ = NULL;
|
||||||
|
@ -359,11 +359,10 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
pQPool->max = tsNumOfVnodeQueryThreads;
|
pQPool->max = tsNumOfVnodeQueryThreads;
|
||||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||||
|
|
||||||
SQWorkerPool *pStreamPool = &pMgmt->streamPool;
|
SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
|
||||||
pStreamPool->name = "vnode-stream";
|
pStreamPool->name = "vnode-stream";
|
||||||
pStreamPool->min = tsNumOfVnodeStreamThreads;
|
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||||
pStreamPool->max = tsNumOfVnodeStreamThreads;
|
if (tAutoQWorkerInit(pStreamPool) != 0) return -1;
|
||||||
if (tQWorkerInit(pStreamPool) != 0) return -1;
|
|
||||||
|
|
||||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
pFPool->name = "vnode-fetch";
|
pFPool->name = "vnode-fetch";
|
||||||
|
@ -385,7 +384,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||||
tQWorkerCleanup(&pMgmt->queryPool);
|
tQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tQWorkerCleanup(&pMgmt->streamPool);
|
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||||
dDebug("vnode workers are closed");
|
dDebug("vnode workers are closed");
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
|
dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType));
|
||||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} /* else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
|
} else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) &&
|
||||||
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
|
(!IsReq(pRpc)) && (pRpc->pCont == NULL)) {
|
||||||
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
|
dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code));
|
||||||
terrno = pRpc->code;
|
terrno = pRpc->code;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}*/
|
}
|
||||||
|
|
||||||
if (pHandle->defaultNtype == NODE_END) {
|
if (pHandle->defaultNtype == NODE_END) {
|
||||||
dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
|
dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));
|
||||||
|
@ -248,9 +248,9 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
|
||||||
|
|
||||||
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
|
||||||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
|
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_NOT_LEADER ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED ||
|
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING ||
|
||||||
code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
|
code == TSDB_CODE_APP_IS_STOPPING) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
|
||||||
msgType == TDMT_SCH_MERGE_FETCH) {
|
msgType == TDMT_SCH_MERGE_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -193,6 +193,7 @@ typedef struct {
|
||||||
int64_t lastAccessTime;
|
int64_t lastAccessTime;
|
||||||
int32_t accessTimes;
|
int32_t accessTimes;
|
||||||
int32_t numOfVnodes;
|
int32_t numOfVnodes;
|
||||||
|
int32_t numOfOtherNodes;
|
||||||
int32_t numOfSupportVnodes;
|
int32_t numOfSupportVnodes;
|
||||||
float numOfCores;
|
float numOfCores;
|
||||||
int64_t memTotal;
|
int64_t memTotal;
|
||||||
|
|
|
@ -397,8 +397,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
||||||
bool needCheck = !online || dnodeChanged || reboot;
|
bool needCheck = !online || dnodeChanged || reboot;
|
||||||
|
|
||||||
pDnode->accessTimes++;
|
|
||||||
pDnode->lastAccessTime = curMs;
|
|
||||||
const STraceId *trace = &pReq->info.traceId;
|
const STraceId *trace = &pReq->info.traceId;
|
||||||
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
|
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
|
||||||
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
|
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
|
||||||
|
@ -534,6 +532,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
pReq->info.rsp = pHead;
|
pReq->info.rsp = pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pDnode->accessTimes++;
|
||||||
|
pDnode->lastAccessTime = curMs;
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -425,6 +425,7 @@ void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgOb
|
||||||
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||||
SDnodeObj *pDnode = pObj;
|
SDnodeObj *pDnode = pObj;
|
||||||
pDnode->numOfVnodes = 0;
|
pDnode->numOfVnodes = 0;
|
||||||
|
pDnode->numOfOtherNodes = 0;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,7 +448,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
||||||
pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
|
pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
|
||||||
|
|
||||||
if (isMnode) {
|
if (isMnode) {
|
||||||
pDnode->numOfVnodes++;
|
pDnode->numOfOtherNodes++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (online && pDnode->numOfSupportVnodes > 0) {
|
if (online && pDnode->numOfSupportVnodes > 0) {
|
||||||
|
@ -468,14 +469,25 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
|
||||||
|
|
||||||
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
||||||
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL);
|
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL);
|
||||||
|
|
||||||
|
mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
|
||||||
|
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
|
mDebug("dnode:%d, vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
|
||||||
|
}
|
||||||
return pArray;
|
return pArray;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) { return *dnode1Id >= *dnode2Id ? 1 : 0; }
|
static int32_t mndCompareDnodeId(int32_t *dnode1Id, int32_t *dnode2Id) { return *dnode1Id >= *dnode2Id ? 1 : 0; }
|
||||||
|
|
||||||
|
static float mndGetDnodeScore(SDnodeObj *pDnode, int32_t additionDnodes, float ratio) {
|
||||||
|
float totalDnodes = pDnode->numOfVnodes + (float)pDnode->numOfOtherNodes * ratio + additionDnodes;
|
||||||
|
return totalDnodes / pDnode->numOfSupportVnodes;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
|
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
|
||||||
float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
|
float d1Score = mndGetDnodeScore(pDnode1, 0, 0.9);
|
||||||
float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
|
float d2Score = mndGetDnodeScore(pDnode2, 0, 0.9);
|
||||||
return d1Score >= d2Score ? 1 : 0;
|
return d1Score >= d2Score ? 1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -494,7 +506,12 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
|
||||||
int32_t allocedVnodes = 0;
|
int32_t allocedVnodes = 0;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
mDebug("start to sort %d dnodes", (int32_t)taosArrayGetSize(pArray));
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
|
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
|
mDebug("dnode:%d, score:%f", pDnode->id, mndGetDnodeScore(pDnode, 0, 0.9));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pArray);
|
int32_t size = taosArrayGetSize(pArray);
|
||||||
if (size < pVgroup->replica) {
|
if (size < pVgroup->replica) {
|
||||||
|
@ -875,7 +892,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
mInfo("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
|
||||||
|
@ -935,7 +952,7 @@ static int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
mInfo("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -1970,16 +1987,16 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
mInfo("dnode:%d, equivalent vnodes:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
|
mInfo("dnode:%d, equivalent vnodes:%d others:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
|
||||||
pDnode->numOfSupportVnodes, (float)pDnode->numOfVnodes / pDnode->numOfSupportVnodes);
|
pDnode->numOfSupportVnodes, pDnode->numOfOtherNodes, mndGetDnodeScore(pDnode, 0, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
|
SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
|
||||||
SDnodeObj *pDst = taosArrayGet(pArray, 0);
|
SDnodeObj *pDst = taosArrayGet(pArray, 0);
|
||||||
|
|
||||||
float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes;
|
float srcScore = mndGetDnodeScore(pSrc, -1, 1);
|
||||||
float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes;
|
float dstScore = mndGetDnodeScore(pDst, 1, 1);
|
||||||
mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, srcScore,
|
mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, dstScore,
|
||||||
pDst->id, dstScore);
|
pDst->id, dstScore);
|
||||||
|
|
||||||
if (srcScore > dstScore - 0.000001) {
|
if (srcScore > dstScore - 0.000001) {
|
||||||
|
|
|
@ -2342,6 +2342,17 @@ void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock*
|
||||||
buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
||||||
|
TSKEY* primaryKeys, int32_t prevPosition) {
|
||||||
|
int32_t startPos = prevPosition + 1;
|
||||||
|
if (startPos == pDataBlockInfo->rows) {
|
||||||
|
startPos = -1;
|
||||||
|
} else {
|
||||||
|
*pNext = getFinalTimeWindow(primaryKeys[startPos], pInterval);
|
||||||
|
}
|
||||||
|
return startPos;
|
||||||
|
}
|
||||||
|
|
||||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||||
SHashObj* pUpdatedMap) {
|
SHashObj* pUpdatedMap) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
|
||||||
|
@ -2451,8 +2462,12 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
||||||
}
|
}
|
||||||
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
||||||
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
||||||
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
|
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
|
||||||
|
} else {
|
||||||
startPos =
|
startPos =
|
||||||
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
|
getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
|
||||||
|
}
|
||||||
if (startPos < 0) {
|
if (startPos < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -841,36 +841,42 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
|
numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
|
||||||
}
|
}
|
||||||
output->info.rows = numOfRows;
|
|
||||||
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
// create the basic block info structure
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
if ((input+i)->numOfRows < numOfRows) {
|
SColumnInfoData* pInfo = input[i].columnData;
|
||||||
SColumnInfoData* pColInfoData = (input+i)->columnData;
|
SColumnInfoData d = {0};
|
||||||
int32_t startRow = (input+i)->numOfRows;
|
d.info = pInfo->info;
|
||||||
int32_t expandRows = numOfRows - startRow;
|
|
||||||
colInfoDataEnsureCapacity(pColInfoData, numOfRows, false);
|
blockDataAppendColInfo(output, &d);
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(output, numOfRows);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = input[i].columnData;
|
||||||
|
colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info);
|
||||||
|
|
||||||
|
if (input[i].numOfRows < numOfRows) {
|
||||||
|
int32_t startRow = input[i].numOfRows;
|
||||||
|
int expandRows = numOfRows - startRow;
|
||||||
bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1);
|
bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1);
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
colDataAppendNNULL(pColInfoData, startRow, expandRows);
|
colDataAppendNNULL(pDest, startRow, expandRows);
|
||||||
} else {
|
} else {
|
||||||
char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
|
char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
|
||||||
int32_t bytes = pColInfoData->info.bytes;
|
|
||||||
char* data = taosMemoryMalloc(bytes);
|
|
||||||
memcpy(data, src, bytes);
|
|
||||||
for (int j = 0; j < expandRows; ++j) {
|
for (int j = 0; j < expandRows; ++j) {
|
||||||
colDataAppend(pColInfoData, startRow+j, data, false);
|
colDataAppend(pDest, startRow+j, src, false);
|
||||||
}
|
}
|
||||||
//colDataAppendNItems(pColInfoData, startRow, data, expandRows);
|
//colDataAppendNItems(pColInfoData, startRow, data, expandRows);
|
||||||
taosMemoryFree(data);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(output->pDataBlock, (input + i)->columnData);
|
output->info.rows = numOfRows;
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) {
|
|
||||||
output->info.hasVarCol = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1825,7 +1831,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
|
||||||
taosArrayDestroy(resultBlock.pDataBlock);
|
taosArrayDestroy(resultBlock.pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(inputBlock.pDataBlock);
|
blockDataFreeRes(&inputBlock);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -196,7 +196,7 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
|
||||||
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
||||||
SNode* pQuery);
|
SNode* pQuery);
|
||||||
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
||||||
const SToken* pSubDbName, bool withMeta);
|
SToken* pSubDbName, bool withMeta);
|
||||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
||||||
SNode* pRealTable, bool withMeta);
|
SNode* pRealTable, bool withMeta);
|
||||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
|
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pTopicName);
|
||||||
|
|
|
@ -1579,8 +1579,11 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName,
|
||||||
const SToken* pSubDbName, bool withMeta) {
|
SToken* pSubDbName, bool withMeta) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
if (!checkDbName(pCxt, pSubDbName, true)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
SCreateTopicStmt* pStmt = (SCreateTopicStmt*)nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
|
SCreateTopicStmt* pStmt = (SCreateTopicStmt*)nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
|
||||||
CHECK_OUT_OF_MEM(pStmt);
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||||
|
|
|
@ -5640,7 +5640,8 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
||||||
crossTableWithUdaf(pSelect)) {
|
crossTableWithUdaf(pSelect)) {
|
||||||
|
@ -5650,6 +5651,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
"SUBTABLE expression must be of VARCHAR type");
|
"SUBTABLE expression must be of VARCHAR type");
|
||||||
}
|
}
|
||||||
|
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
|
"The trigger mode of non window query can only be AT_ONCE");
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5663,7 +5668,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
||||||
code = translateQuery(pCxt, pStmt->pQuery);
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt->pQuery);
|
code = checkStreamQuery(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
|
|
|
@ -668,6 +668,7 @@ void schFreeJobImpl(void *job) {
|
||||||
taosMemoryFreeClear(pJob->userRes.execRes);
|
taosMemoryFreeClear(pJob->userRes.execRes);
|
||||||
taosMemoryFreeClear(pJob->fetchRes);
|
taosMemoryFreeClear(pJob->fetchRes);
|
||||||
taosMemoryFreeClear(pJob->sql);
|
taosMemoryFreeClear(pJob->sql);
|
||||||
|
tsem_destroy(&pJob->rspSem);
|
||||||
taosMemoryFree(pJob);
|
taosMemoryFree(pJob);
|
||||||
|
|
||||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||||
|
@ -748,7 +749,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_init(&pJob->rspSem, 0, 0);
|
if (tsem_init(&pJob->rspSem, 0, 0)) {
|
||||||
|
SCH_JOB_ELOG("tsem_init failed, errno:%d", errno);
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
|
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
|
||||||
if (pJob->refId < 0) {
|
if (pJob->refId < 0) {
|
||||||
|
|
|
@ -487,8 +487,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
||||||
taosMemoryFree(pReqs);
|
taosMemoryFree(pReqs);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -514,7 +512,6 @@ int32_t streamDispatch(SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
|
if (streamDispatchAllBlocks(pTask, pBlock) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
code = -1;
|
code = -1;
|
||||||
streamQueueProcessFail(pTask->outputQueue);
|
streamQueueProcessFail(pTask->outputQueue);
|
||||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
|
|
|
@ -143,6 +143,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
taosArrayDestroy(pTask->checkReqIds);
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
|
pTask->checkReqIds = NULL;
|
||||||
streamTaskLaunchRecover(pTask, version);
|
streamTaskLaunchRecover(pTask, version);
|
||||||
}
|
}
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
|
|
@ -175,6 +175,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
|
pTask->checkReqIds = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->pState) streamStateClose(pTask->pState);
|
if (pTask->pState) streamStateClose(pTask->pState);
|
||||||
|
|
|
@ -45,7 +45,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
|
|
||||||
if (index - pBuf->startIndex >= pBuf->size) {
|
if (index - pBuf->startIndex >= pBuf->size) {
|
||||||
sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index);
|
sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index);
|
||||||
goto _out;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(index == pBuf->endIndex);
|
ASSERT(index == pBuf->endIndex);
|
||||||
|
@ -66,9 +66,8 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_out:
|
_err:
|
||||||
syncLogBufferValidate(pBuf);
|
syncLogBufferValidate(pBuf);
|
||||||
syncEntryDestroy(pEntry);
|
|
||||||
taosThreadMutexUnlock(&pBuf->mutex);
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1001,6 +1001,13 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
uv_loop_init(srv->loop);
|
uv_loop_init(srv->loop);
|
||||||
|
|
||||||
char pipeName[PATH_MAX];
|
char pipeName[PATH_MAX];
|
||||||
|
|
||||||
|
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
#if defined(WINDOWS) || defined(DARWIN)
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -1087,12 +1094,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
|
|
||||||
goto End;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (false == addHandleToAcceptloop(srv)) {
|
if (false == addHandleToAcceptloop(srv)) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
@ -1185,8 +1186,8 @@ void transCloseServer(void* arg) {
|
||||||
// impl later
|
// impl later
|
||||||
SServerObj* srv = arg;
|
SServerObj* srv = arg;
|
||||||
|
|
||||||
tDebug("send quit msg to accept thread");
|
|
||||||
if (srv->inited) {
|
if (srv->inited) {
|
||||||
|
tDebug("send quit msg to accept thread");
|
||||||
uv_async_send(srv->pAcceptAsync);
|
uv_async_send(srv->pAcceptAsync);
|
||||||
taosThreadJoin(srv->thread, NULL);
|
taosThreadJoin(srv->thread, NULL);
|
||||||
SRV_RELEASE_UV(srv->loop);
|
SRV_RELEASE_UV(srv->loop);
|
||||||
|
|
|
@ -643,13 +643,10 @@ const char* tstrerror(int32_t err) {
|
||||||
// this is a system errno
|
// this is a system errno
|
||||||
if ((err & 0x00ff0000) == 0x00ff0000) {
|
if ((err & 0x00ff0000) == 0x00ff0000) {
|
||||||
int32_t code = err & 0x0000ffff;
|
int32_t code = err & 0x0000ffff;
|
||||||
if (code >= 0 && code < 36) {
|
// strerror can handle any invalid code
|
||||||
|
// invalid code return Unknown error
|
||||||
return strerror(code);
|
return strerror(code);
|
||||||
} else {
|
|
||||||
return "unknown err";
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
int32_t s = 0;
|
int32_t s = 0;
|
||||||
int32_t e = sizeof(errors) / sizeof(errors[0]);
|
int32_t e = sizeof(errors) / sizeof(errors[0]);
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
|
||||||
worker->pool = pool;
|
worker->pool = pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,8 +51,10 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
||||||
for (int32_t i = 0; i < pool->max; ++i) {
|
for (int32_t i = 0; i < pool->max; ++i) {
|
||||||
SQWorker *worker = pool->workers + i;
|
SQWorker *worker = pool->workers + i;
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
taosThreadJoin(worker->thread, NULL);
|
taosThreadJoin(worker->thread, NULL);
|
||||||
taosThreadClear(&worker->thread);
|
taosThreadClear(&worker->thread);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +62,7 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
|
||||||
taosCloseQset(pool->qset);
|
taosCloseQset(pool->qset);
|
||||||
taosThreadMutexDestroy(&pool->mutex);
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
uDebug("worker:%s is closed", pool->name);
|
uInfo("worker:%s is closed", pool->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tQWorkerThreadFp(SQWorker *worker) {
|
static void *tQWorkerThreadFp(SQWorker *worker) {
|
||||||
|
@ -119,7 +121,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
|
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
pool->num++;
|
pool->num++;
|
||||||
uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
|
||||||
} while (pool->num < pool->min);
|
} while (pool->num < pool->min);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,7 +132,134 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
|
||||||
|
pool->qset = taosOpenQset();
|
||||||
|
pool->workers = taosArrayInit(2, sizeof(SQWorker *));
|
||||||
|
if (pool->workers == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)taosThreadMutexInit(&pool->mutex, NULL);
|
||||||
|
|
||||||
|
uInfo("worker:%s is initialized as auto", pool->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
|
||||||
|
int32_t size = taosArrayGetSize(pool->workers);
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SQWorker *worker = taosArrayGetP(pool->workers, i);
|
||||||
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
taosQsetThreadResume(pool->qset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SQWorker *worker = taosArrayGetP(pool->workers, i);
|
||||||
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
|
taosThreadJoin(worker->thread, NULL);
|
||||||
|
taosThreadClear(&worker->thread);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
|
}
|
||||||
|
taosMemoryFree(worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pool->workers);
|
||||||
|
taosCloseQset(pool->qset);
|
||||||
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
|
uInfo("worker:%s is closed", pool->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tAutoQWorkerThreadFp(SQWorker *worker) {
|
||||||
|
SAutoQWorkerPool *pool = worker->pool;
|
||||||
|
SQueueInfo qinfo = {0};
|
||||||
|
void *msg = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
taosBlockSIGPIPE();
|
||||||
|
setThreadName(pool->name);
|
||||||
|
worker->pid = taosGetSelfPthreadId();
|
||||||
|
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
|
||||||
|
uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
|
||||||
|
worker->pid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (qinfo.fp != NULL) {
|
||||||
|
qinfo.workerId = worker->id;
|
||||||
|
qinfo.threadNum = taosArrayGetSize(pool->workers);
|
||||||
|
(*((FItem)qinfo.fp))(&qinfo, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosUpdateItemSize(qinfo.queue, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||||
|
STaosQueue *queue = taosOpenQueue();
|
||||||
|
if (queue == NULL) return NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pool->mutex);
|
||||||
|
taosSetQueueFp(queue, fp, NULL);
|
||||||
|
taosAddIntoQset(pool->qset, queue, ahandle);
|
||||||
|
|
||||||
|
int32_t queueNum = taosGetQueueNumber(pool->qset);
|
||||||
|
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
|
||||||
|
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
|
||||||
|
if (dstWorkerNum < 1) dstWorkerNum = 1;
|
||||||
|
|
||||||
|
// spawn a thread to process queue
|
||||||
|
while (curWorkerNum < dstWorkerNum) {
|
||||||
|
SQWorker *worker = taosMemoryCalloc(1, sizeof(SQWorker));
|
||||||
|
if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
|
||||||
|
uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
|
||||||
|
taosMemoryFree(worker);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
worker->id = curWorkerNum;
|
||||||
|
worker->pool = pool;
|
||||||
|
|
||||||
|
TdThreadAttr thAttr;
|
||||||
|
taosThreadAttrInit(&thAttr);
|
||||||
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
|
||||||
|
uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
|
||||||
|
(void)taosArrayPop(pool->workers);
|
||||||
|
taosMemoryFree(worker);
|
||||||
|
taosCloseQueue(queue);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, (int32_t)taosArrayGetSize(pool->workers));
|
||||||
|
|
||||||
|
curWorkerNum++;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pool->mutex);
|
||||||
|
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
|
||||||
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +281,7 @@ int32_t tWWorkerInit(SWWorkerPool *pool) {
|
||||||
worker->pool = pool;
|
worker->pool = pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("worker:%s is initialized, max:%d", pool->name, pool->max);
|
uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,17 +298,19 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
|
||||||
for (int32_t i = 0; i < pool->max; ++i) {
|
for (int32_t i = 0; i < pool->max; ++i) {
|
||||||
SWWorker *worker = pool->workers + i;
|
SWWorker *worker = pool->workers + i;
|
||||||
if (taosCheckPthreadValid(worker->thread)) {
|
if (taosCheckPthreadValid(worker->thread)) {
|
||||||
|
uInfo("worker:%s:%d is stopping", pool->name, worker->id);
|
||||||
taosThreadJoin(worker->thread, NULL);
|
taosThreadJoin(worker->thread, NULL);
|
||||||
taosThreadClear(&worker->thread);
|
taosThreadClear(&worker->thread);
|
||||||
taosFreeQall(worker->qall);
|
taosFreeQall(worker->qall);
|
||||||
taosCloseQset(worker->qset);
|
taosCloseQset(worker->qset);
|
||||||
|
uInfo("worker:%s:%d is stopped", pool->name, worker->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pool->workers);
|
taosMemoryFreeClear(pool->workers);
|
||||||
taosThreadMutexDestroy(&pool->mutex);
|
taosThreadMutexDestroy(&pool->mutex);
|
||||||
|
|
||||||
uDebug("worker:%s is closed", pool->name);
|
uInfo("worker:%s is closed", pool->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *tWWorkerThreadFp(SWWorker *worker) {
|
static void *tWWorkerThreadFp(SWWorker *worker) {
|
||||||
|
@ -235,7 +366,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
||||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
|
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker) != 0) goto _OVER;
|
||||||
|
|
||||||
uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
|
||||||
pool->nextId = (pool->nextId + 1) % pool->max;
|
pool->nextId = (pool->nextId + 1) % pool->max;
|
||||||
|
|
||||||
taosThreadAttrDestroy(&thAttr);
|
taosThreadAttrDestroy(&thAttr);
|
||||||
|
@ -259,13 +390,14 @@ _OVER:
|
||||||
} else {
|
} else {
|
||||||
while (worker->pid <= 0) taosMsleep(10);
|
while (worker->pid <= 0) taosMsleep(10);
|
||||||
queue->threadId = worker->pid;
|
queue->threadId = worker->pid;
|
||||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId);
|
uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle,
|
||||||
|
queue->threadId);
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
||||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
uInfo("worker:%s, queue:%p is freed", pool->name, queue);
|
||||||
taosCloseQueue(queue);
|
taosCloseQueue(queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -677,8 +677,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/basic5.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb.py -N 3 -n 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb2.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb3.py
|
||||||
|
|
|
@ -74,10 +74,10 @@ sql insert into d2.t2 values(now+5s, 21)
|
||||||
sql select * from information_schema.ins_dnodes
|
sql select * from information_schema.ins_dnodes
|
||||||
print dnode1 openVnodes $data(1)[2]
|
print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(2)[2]
|
print dnode2 openVnodes $data(2)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(2)[2] != 2 then
|
if $data(2)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -161,13 +161,13 @@ print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode3 openVnodes $data(3)[2]
|
print dnode3 openVnodes $data(3)[2]
|
||||||
print dnode4 openVnodes $data(4)[2]
|
print dnode4 openVnodes $data(4)[2]
|
||||||
print dnode5 openVnodes $data(5)[2]
|
print dnode5 openVnodes $data(5)[2]
|
||||||
if $data(1)[2] != 2 then
|
if $data(1)[2] != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 3 then
|
if $data(3)[2] != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(4)[2] != 4 then
|
if $data(4)[2] != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(5)[2] != 3 then
|
if $data(5)[2] != 3 then
|
||||||
|
|
|
@ -127,10 +127,10 @@ print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(2)[2]
|
print dnode2 openVnodes $data(2)[2]
|
||||||
print dnode3 openVnodes $data(3)[2]
|
print dnode3 openVnodes $data(3)[2]
|
||||||
print dnode4 openVnodes $data(4)[2]
|
print dnode4 openVnodes $data(4)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(2)[2] != 2 then
|
if $data(2)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 2 then
|
if $data(3)[2] != 2 then
|
||||||
|
@ -228,10 +228,10 @@ print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode3 openVnodes $data(3)[2]
|
print dnode3 openVnodes $data(3)[2]
|
||||||
print dnode4 openVnodes $data(4)[2]
|
print dnode4 openVnodes $data(4)[2]
|
||||||
print dnode5 openVnodes $data(5)[2]
|
print dnode5 openVnodes $data(5)[2]
|
||||||
if $data(1)[2] != 1 then
|
if $data(1)[2] != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 3 then
|
if $data(3)[2] != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(4)[2] != 3 then
|
if $data(4)[2] != 3 then
|
||||||
|
|
|
@ -142,10 +142,10 @@ print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(2)[2]
|
print dnode2 openVnodes $data(2)[2]
|
||||||
print dnode2 openVnodes $data(3)[2]
|
print dnode2 openVnodes $data(3)[2]
|
||||||
print dnode2 openVnodes $data(4)[2]
|
print dnode2 openVnodes $data(4)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(2)[2] != 2 then
|
if $data(2)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 2 then
|
if $data(3)[2] != 2 then
|
||||||
|
|
|
@ -71,10 +71,10 @@ sql insert into d2.t2 values(now+5s, 21)
|
||||||
sql select * from information_schema.ins_dnodes
|
sql select * from information_schema.ins_dnodes
|
||||||
print dnode1 openVnodes $data(1)[2]
|
print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(2)[2]
|
print dnode2 openVnodes $data(2)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(2)[2] != 2 then
|
if $data(2)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -181,10 +181,10 @@ sql select * from information_schema.ins_dnodes
|
||||||
print dnode1 openVnodes $data(1)[2]
|
print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(3)[2]
|
print dnode2 openVnodes $data(3)[2]
|
||||||
print dnode2 openVnodes $data(4)[2]
|
print dnode2 openVnodes $data(4)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 2 then
|
if $data(3)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(4)[2] != 1 then
|
if $data(4)[2] != 1 then
|
||||||
|
@ -204,10 +204,10 @@ sql select * from information_schema.ins_dnodes
|
||||||
print dnode1 openVnodes $data(1)[2]
|
print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(3)[2]
|
print dnode2 openVnodes $data(3)[2]
|
||||||
print dnode2 openVnodes $data(4)[2]
|
print dnode2 openVnodes $data(4)[2]
|
||||||
if $data(1)[2] != 0 then
|
if $data(1)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != 2 then
|
if $data(3)[2] != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(4)[2] != 2 then
|
if $data(4)[2] != 2 then
|
||||||
|
@ -220,13 +220,13 @@ sql select * from information_schema.ins_dnodes
|
||||||
print dnode1 openVnodes $data(1)[2]
|
print dnode1 openVnodes $data(1)[2]
|
||||||
print dnode2 openVnodes $data(3)[2]
|
print dnode2 openVnodes $data(3)[2]
|
||||||
print dnode2 openVnodes $data(4)[2]
|
print dnode2 openVnodes $data(4)[2]
|
||||||
if $data(1)[2] != 1 then
|
if $data(1)[2] != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(3)[2] != null then
|
if $data(3)[2] != null then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data(4)[2] != 3 then
|
if $data(4)[2] != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
system sh/deploy.sh -n dnode1 -i 1
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
system sh/cfg.sh -n dnode1 -c debugflag -v 131
|
system sh/cfg.sh -n dnode1 -c debugflag 131
|
||||||
system sh/exec.sh -n dnode1 -s start -v
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
sleep 5000
|
sleep 5000
|
||||||
|
|
||||||
|
|
|
@ -672,6 +672,123 @@ if $data61 != 1 then
|
||||||
goto loop5
|
goto loop5
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 8
|
||||||
|
|
||||||
|
sql drop stream IF EXISTS streams4;
|
||||||
|
sql drop database IF EXISTS test4;
|
||||||
|
|
||||||
|
sql create database test4 vgroups 6;
|
||||||
|
sql use test4;
|
||||||
|
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams4 trigger at_once into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,2,1,1,1.0);
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791273000,3,1,1,1.0);
|
||||||
|
sql insert into t2 values(1648791313000,4,1,1,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt4 order by 1;
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 8 then
|
||||||
|
print ====loop6=rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 1 then
|
||||||
|
print ====loop6=data01=$data01
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print ====loop6=data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
print ====loop6=data11=$data11
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 1 then
|
||||||
|
print ====loop6=data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 1 then
|
||||||
|
print ====loop6=data21=$data21
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 2 then
|
||||||
|
print ====loop6=data22=$data22
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 1 then
|
||||||
|
print ====loop6=data31=$data31
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 2 then
|
||||||
|
print ====loop6=data32=$data32
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data41 != 1 then
|
||||||
|
print ====loop6=data41=$data41
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data42 != 3 then
|
||||||
|
print ====loop6=data42=$data42
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data51 != 1 then
|
||||||
|
print ====loop6=data51=$data51
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data52 != 3 then
|
||||||
|
print ====loop6=data52=$data52
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data61 != 1 then
|
||||||
|
print ====loop6=data61=$data61
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data62 != 4 then
|
||||||
|
print ====loop6=data62=$data62
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data71 != 1 then
|
||||||
|
print ====loop6=data71=$data71
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data72 != 4 then
|
||||||
|
print ====loop6=data72=$data72
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
$loop_all = $loop_all + 1
|
$loop_all = $loop_all + 1
|
||||||
print ============loop_all=$loop_all
|
print ============loop_all=$loop_all
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ class TDTestCase:
|
||||||
|
|
||||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
sql += "(now + %ds, %d, '%s', '%s', %d, %d, %d)"%(consumerId, consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
tdLog.info("consume info sql: %s"%sql)
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
@ -174,12 +174,13 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
self.initConsumerTable()
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d replica %d" %(parameterDict['dbName'], parameterDict['vgroups'], parameterDict['replica']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
prepareEnvThread.start()
|
prepareEnvThread.start()
|
||||||
|
@ -271,12 +272,13 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
self.initConsumerTable()
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d replica %d" %(parameterDict['dbName'], parameterDict['vgroups'], parameterDict['replica']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
prepareEnvThread.start()
|
prepareEnvThread.start()
|
||||||
|
@ -337,6 +339,7 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
@ -406,12 +409,13 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
self.initConsumerTable()
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d replica %d" %(parameterDict['dbName'], parameterDict['vgroups'], parameterDict['replica']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
prepareEnvThread.start()
|
prepareEnvThread.start()
|
||||||
|
|
|
@ -174,12 +174,13 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
self.initConsumerTable()
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d replica %d" %(parameterDict['dbName'], parameterDict['vgroups'], parameterDict['replica']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
prepareEnvThread.start()
|
prepareEnvThread.start()
|
||||||
|
@ -191,6 +192,7 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
@ -254,12 +256,13 @@ class TDTestCase:
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 5000, \
|
'rowsPerTbl': 5000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
|
'replica': self.replicaVar, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
self.initConsumerTable()
|
self.initConsumerTable()
|
||||||
|
|
||||||
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
tdSql.execute("create database if not exists %s vgroups %d replica %d" %(parameterDict['dbName'], parameterDict['vgroups'], parameterDict['replica']))
|
||||||
|
|
||||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
prepareEnvThread.start()
|
prepareEnvThread.start()
|
||||||
|
|
|
@ -60,7 +60,7 @@ class TMQCom:
|
||||||
|
|
||||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
sql += "(now + %ds, %d, '%s', '%s', %d, %d, %d)"%(consumerId, consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
tdLog.info("consume info sql: %s"%sql)
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
#!/bin/bash
|
||||||
|
ulimit -c unlimited
|
||||||
|
#======================p1-insert===============
|
||||||
|
|
||||||
|
python3 ./test.py -f 0-others/taosShell.py
|
||||||
|
python3 ./test.py -f 0-others/taosShellError.py
|
||||||
|
python3 ./test.py -f 0-others/taosShellNetChk.py
|
||||||
|
python3 ./test.py -f 1-insert/alter_database.py
|
||||||
|
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||||
|
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||||
|
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||||
|
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||||
|
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
|
||||||
|
python3 ./test.py -f 1-insert/alter_stable.py
|
||||||
|
python3 ./test.py -f 1-insert/alter_table.py
|
||||||
|
python3 ./test.py -f 1-insert/boundary.py
|
||||||
|
python3 ./test.py -f 2-query/top.py
|
||||||
|
python3 ./test.py -f 2-query/top.py -R
|
||||||
|
python3 ./test.py -f 2-query/tsbsQuery.py
|
||||||
|
python3 ./test.py -f 2-query/tsbsQuery.py -R
|
||||||
|
python3 ./test.py -f 2-query/ttl_comment.py
|
||||||
|
python3 ./test.py -f 2-query/ttl_comment.py -R
|
||||||
|
python3 ./test.py -f 2-query/twa.py
|
||||||
|
python3 ./test.py -f 2-query/twa.py -R
|
||||||
|
python3 ./test.py -f 2-query/union.py
|
||||||
|
python3 ./test.py -f 2-query/union.py -R
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 -i False
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 -i False
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3
|
||||||
|
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3 -n 3
|
||||||
|
python3 ./test.py -f 7-tmq/subscribeStb4.py
|
||||||
|
python3 ./test.py -f 7-tmq/db.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqError.py
|
||||||
|
python3 ./test.py -f 7-tmq/schema.py
|
||||||
|
python3 ./test.py -f 7-tmq/stbFilter.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqCheckData.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
||||||
|
python3 ./test.py -f 99-TDcase/TD-20582.py
|
Loading…
Reference in New Issue