Merge pull request #17710 from taosdata/fix/TD-19815
fix: add query heartbeat configuration
This commit is contained in:
commit
b6326c7f14
|
@ -91,6 +91,7 @@ extern bool tsQueryPlannerTrace;
|
||||||
extern int32_t tsQueryNodeChunkSize;
|
extern int32_t tsQueryNodeChunkSize;
|
||||||
extern bool tsQueryUseNodeAllocator;
|
extern bool tsQueryUseNodeAllocator;
|
||||||
extern bool tsKeepColumnName;
|
extern bool tsKeepColumnName;
|
||||||
|
extern bool tsEnableQueryHb;
|
||||||
|
|
||||||
// client
|
// client
|
||||||
extern int32_t tsMinSlidingTime;
|
extern int32_t tsMinSlidingTime;
|
||||||
|
|
|
@ -82,6 +82,7 @@ bool tsSmlDataFormat = false;
|
||||||
// query
|
// query
|
||||||
int32_t tsQueryPolicy = 1;
|
int32_t tsQueryPolicy = 1;
|
||||||
int32_t tsQueryRspPolicy = 0;
|
int32_t tsQueryRspPolicy = 0;
|
||||||
|
bool tsEnableQueryHb = false;
|
||||||
int32_t tsQuerySmaOptimize = 0;
|
int32_t tsQuerySmaOptimize = 0;
|
||||||
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
|
||||||
bool tsQueryPlannerTrace = false;
|
bool tsQueryPlannerTrace = false;
|
||||||
|
@ -284,6 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
|
||||||
|
if (cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, false) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
|
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
|
||||||
|
@ -644,6 +646,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
||||||
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
||||||
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
|
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
|
||||||
|
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
|
||||||
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
|
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
|
||||||
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
|
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
|
||||||
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
|
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
|
||||||
|
@ -780,6 +783,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||||
if (strcasecmp("enableCoreFile", name) == 0) {
|
if (strcasecmp("enableCoreFile", name) == 0) {
|
||||||
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
|
bool enableCore = cfgGetItem(pCfg, "enableCoreFile")->bval;
|
||||||
taosSetCoreDump(enableCore);
|
taosSetCoreDump(enableCore);
|
||||||
|
} else if (strcasecmp("enableQueryHb", name) == 0) {
|
||||||
|
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
||||||
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_CANCEL_TASK:
|
case TDMT_SCH_CANCEL_TASK:
|
||||||
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
//code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
||||||
|
|
|
@ -369,8 +369,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_CANCEL_TASK:
|
//case TDMT_SCH_CANCEL_TASK:
|
||||||
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
// return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_QUERY_HEARTBEAT:
|
case TDMT_SCH_QUERY_HEARTBEAT:
|
||||||
|
|
|
@ -621,14 +621,18 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
||||||
node->op = operation;
|
node->op = operation;
|
||||||
|
|
||||||
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||||
|
|
||||||
if (gCtgMgmt.queue.stopQueue) {
|
if (gCtgMgmt.queue.stopQueue) {
|
||||||
ctgFreeQNode(node);
|
ctgFreeQNode(node);
|
||||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||||
CTG_RET(TSDB_CODE_CTG_EXIT);
|
CTG_RET(TSDB_CODE_CTG_EXIT);
|
||||||
}
|
}
|
||||||
gCtgMgmt.queue.stopQueue = operation->stopQueue;
|
|
||||||
gCtgMgmt.queue.tail->next = node;
|
gCtgMgmt.queue.tail->next = node;
|
||||||
gCtgMgmt.queue.tail = node;
|
gCtgMgmt.queue.tail = node;
|
||||||
|
|
||||||
|
gCtgMgmt.queue.stopQueue = operation->stopQueue;
|
||||||
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||||
|
|
||||||
ctgDebug("action [%s] added into queue", opName);
|
ctgDebug("action [%s] added into queue", opName);
|
||||||
|
@ -1997,6 +2001,59 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
|
||||||
|
if (NULL == op || NULL == op->data) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (op->opId) {
|
||||||
|
case CTG_OP_UPDATE_VGROUP: {
|
||||||
|
SCtgUpdateVgMsg *msg = op->data;
|
||||||
|
ctgFreeVgInfo(msg->dbInfo);
|
||||||
|
taosMemoryFreeClear(op->data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CTG_OP_UPDATE_TB_META: {
|
||||||
|
SCtgUpdateTbMetaMsg *msg = op->data;
|
||||||
|
taosMemoryFreeClear(msg->pMeta->tbMeta);
|
||||||
|
taosMemoryFreeClear(msg->pMeta);
|
||||||
|
taosMemoryFreeClear(op->data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CTG_OP_DROP_DB_CACHE:
|
||||||
|
case CTG_OP_DROP_DB_VGROUP:
|
||||||
|
case CTG_OP_DROP_STB_META:
|
||||||
|
case CTG_OP_DROP_TB_META:
|
||||||
|
case CTG_OP_UPDATE_VG_EPSET:
|
||||||
|
case CTG_OP_DROP_TB_INDEX:
|
||||||
|
case CTG_OP_CLEAR_CACHE: {
|
||||||
|
taosMemoryFreeClear(op->data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CTG_OP_UPDATE_USER: {
|
||||||
|
SCtgUpdateUserMsg *msg = op->data;
|
||||||
|
taosHashCleanup(msg->userAuth.createdDbs);
|
||||||
|
taosHashCleanup(msg->userAuth.readDbs);
|
||||||
|
taosHashCleanup(msg->userAuth.writeDbs);
|
||||||
|
taosMemoryFreeClear(op->data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CTG_OP_UPDATE_TB_INDEX: {
|
||||||
|
SCtgUpdateTbIndexMsg *msg = op->data;
|
||||||
|
if (msg->pIndex) {
|
||||||
|
taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
|
||||||
|
taosMemoryFreeClear(msg->pIndex);
|
||||||
|
}
|
||||||
|
taosMemoryFreeClear(op->data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
qError("invalid cache op id:%d", op->opId);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ctgCleanupCacheQueue(void) {
|
void ctgCleanupCacheQueue(void) {
|
||||||
SCtgQNode *node = NULL;
|
SCtgQNode *node = NULL;
|
||||||
SCtgQNode *nodeNext = NULL;
|
SCtgQNode *nodeNext = NULL;
|
||||||
|
@ -2015,7 +2072,7 @@ void ctgCleanupCacheQueue(void) {
|
||||||
stopQueue = true;
|
stopQueue = true;
|
||||||
CTG_RT_STAT_INC(numOfOpDequeue, 1);
|
CTG_RT_STAT_INC(numOfOpDequeue, 1);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(op->data);
|
ctgFreeCacheOperationData(op);
|
||||||
CTG_RT_STAT_INC(numOfOpAbort, 1);
|
CTG_RT_STAT_INC(numOfOpAbort, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2053,7 +2110,7 @@ void *ctgUpdateThreadFunc(void *param) {
|
||||||
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8((int8_t *)&gCtgMgmt.exit)) {
|
if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
|
||||||
ctgCleanupCacheQueue();
|
ctgCleanupCacheQueue();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,8 @@ IF(NOT TD_DARWIN)
|
||||||
PRIVATE "${TD_SOURCE_DIR}/source/libs/catalog/inc"
|
PRIVATE "${TD_SOURCE_DIR}/source/libs/catalog/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
#add_test(
|
add_test(
|
||||||
# NAME catalogTest
|
NAME catalogTest
|
||||||
# COMMAND catalogTest
|
COMMAND catalogTest
|
||||||
#)
|
)
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
|
@ -27,8 +27,8 @@
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
#define TD_USE_WINSOCK
|
#define TD_USE_WINSOCK
|
||||||
#endif
|
#endif
|
||||||
#include "catalog.h"
|
|
||||||
#include "catalogInt.h"
|
#include "catalogInt.h"
|
||||||
|
#include "catalog.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
@ -49,14 +49,13 @@ void ctgTestSetRspCTableMeta();
|
||||||
void ctgTestSetRspSTableMeta();
|
void ctgTestSetRspSTableMeta();
|
||||||
void ctgTestSetRspMultiSTableMeta();
|
void ctgTestSetRspMultiSTableMeta();
|
||||||
|
|
||||||
// extern "C" SCatalogMgmt gCtgMgmt;
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
CTGT_RSP_VGINFO = 1,
|
CTGT_RSP_VGINFO = 1,
|
||||||
CTGT_RSP_TBMETA,
|
CTGT_RSP_TBMETA,
|
||||||
CTGT_RSP_CTBMETA,
|
CTGT_RSP_CTBMETA,
|
||||||
CTGT_RSP_STBMETA,
|
CTGT_RSP_STBMETA,
|
||||||
CTGT_RSP_MSTBMETA,
|
CTGT_RSP_MSTBMETA,
|
||||||
|
CTGT_RSP_INDEXINFO_E,
|
||||||
CTGT_RSP_TBMETA_NOT_EXIST,
|
CTGT_RSP_TBMETA_NOT_EXIST,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -284,6 +283,8 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
SUseDbRsp usedbRsp = {0};
|
SUseDbRsp usedbRsp = {0};
|
||||||
strcpy(usedbRsp.db, ctgTestDbname);
|
strcpy(usedbRsp.db, ctgTestDbname);
|
||||||
usedbRsp.vgVersion = ctgTestVgVersion;
|
usedbRsp.vgVersion = ctgTestVgVersion;
|
||||||
|
@ -321,9 +322,13 @@ void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *
|
||||||
pRsp->code = 0;
|
pRsp->code = 0;
|
||||||
pRsp->contLen = contLen;
|
pRsp->contLen = contLen;
|
||||||
pRsp->pCont = pReq;
|
pRsp->pCont = pReq;
|
||||||
|
|
||||||
|
taosArrayDestroy(usedbRsp.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
strcpy(metaRsp.dbFName, ctgTestDbname);
|
strcpy(metaRsp.dbFName, ctgTestDbname);
|
||||||
strcpy(metaRsp.tbName, ctgTestTablename);
|
strcpy(metaRsp.tbName, ctgTestTablename);
|
||||||
|
@ -363,10 +368,14 @@ void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspTableMetaNotExist(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspTableMetaNotExist(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
pRsp->code = CTG_ERR_CODE_TABLE_NOT_EXIST;
|
pRsp->code = CTG_ERR_CODE_TABLE_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
strcpy(metaRsp.dbFName, ctgTestDbname);
|
strcpy(metaRsp.dbFName, ctgTestDbname);
|
||||||
strcpy(metaRsp.tbName, ctgTestCurrentCTableName ? ctgTestCurrentCTableName : ctgTestCTablename);
|
strcpy(metaRsp.tbName, ctgTestCurrentCTableName ? ctgTestCurrentCTableName : ctgTestCTablename);
|
||||||
|
@ -413,6 +422,8 @@ void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
strcpy(metaRsp.dbFName, ctgTestDbname);
|
strcpy(metaRsp.dbFName, ctgTestDbname);
|
||||||
strcpy(metaRsp.tbName, ctgTestCurrentSTableName ? ctgTestCurrentSTableName : ctgTestSTablename);
|
strcpy(metaRsp.tbName, ctgTestCurrentSTableName ? ctgTestCurrentSTableName : ctgTestSTablename);
|
||||||
|
@ -459,6 +470,8 @@ void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgTestRspMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
static int32_t idx = 1;
|
static int32_t idx = 1;
|
||||||
|
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
|
@ -508,6 +521,16 @@ void ctgTestRspMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRp
|
||||||
tFreeSTableMetaRsp(&metaRsp);
|
tFreeSTableMetaRsp(&metaRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ctgTestRspErrIndexInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
|
pRsp->code = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
|
||||||
|
pRsp->contLen = 0;
|
||||||
|
pRsp->pCont = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
switch (ctgTestRspFunc[ctgTestRspIdx]) {
|
switch (ctgTestRspFunc[ctgTestRspIdx]) {
|
||||||
case CTGT_RSP_VGINFO:
|
case CTGT_RSP_VGINFO:
|
||||||
|
@ -525,6 +548,9 @@ void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp
|
||||||
case CTGT_RSP_MSTBMETA:
|
case CTGT_RSP_MSTBMETA:
|
||||||
ctgTestRspMultiSTableMeta(shandle, pEpSet, pMsg, pRsp);
|
ctgTestRspMultiSTableMeta(shandle, pEpSet, pMsg, pRsp);
|
||||||
break;
|
break;
|
||||||
|
case CTGT_RSP_INDEXINFO_E:
|
||||||
|
ctgTestRspErrIndexInfo(shandle, pEpSet, pMsg, pRsp);
|
||||||
|
break;
|
||||||
case CTGT_RSP_TBMETA_NOT_EXIST:
|
case CTGT_RSP_TBMETA_NOT_EXIST:
|
||||||
ctgTestRspTableMetaNotExist(shandle, pEpSet, pMsg, pRsp);
|
ctgTestRspTableMetaNotExist(shandle, pEpSet, pMsg, pRsp);
|
||||||
break;
|
break;
|
||||||
|
@ -969,6 +995,8 @@ TEST(tableMeta, normalTable) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
||||||
if (0 == n) {
|
if (0 == n) {
|
||||||
|
@ -990,6 +1018,8 @@ TEST(tableMeta, normalTable) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbVgVersion *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
|
@ -1026,7 +1056,6 @@ TEST(tableMeta, normalTable) {
|
||||||
ASSERT_EQ(allStbNum, 0);
|
ASSERT_EQ(allStbNum, 0);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableMeta, childTableCase) {
|
TEST(tableMeta, childTableCase) {
|
||||||
|
@ -1064,6 +1093,8 @@ TEST(tableMeta, childTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
||||||
if (0 == n) {
|
if (0 == n) {
|
||||||
|
@ -1099,6 +1130,8 @@ TEST(tableMeta, childTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbVgVersion *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
|
@ -1135,7 +1168,6 @@ TEST(tableMeta, childTableCase) {
|
||||||
ASSERT_EQ(allStbNum, 1);
|
ASSERT_EQ(allStbNum, 1);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableMeta, superTableCase) {
|
TEST(tableMeta, superTableCase) {
|
||||||
|
@ -1173,6 +1205,8 @@ TEST(tableMeta, superTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
||||||
if (0 == n) {
|
if (0 == n) {
|
||||||
|
@ -1199,6 +1233,8 @@ TEST(tableMeta, superTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
||||||
if (2 != n) {
|
if (2 != n) {
|
||||||
|
@ -1220,6 +1256,8 @@ TEST(tableMeta, superTableCase) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
SDbVgVersion *dbs = NULL;
|
SDbVgVersion *dbs = NULL;
|
||||||
SSTableVersion *stb = NULL;
|
SSTableVersion *stb = NULL;
|
||||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||||
|
@ -1257,7 +1295,6 @@ TEST(tableMeta, superTableCase) {
|
||||||
ASSERT_EQ(allStbNum, 1);
|
ASSERT_EQ(allStbNum, 1);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableMeta, rmStbMeta) {
|
TEST(tableMeta, rmStbMeta) {
|
||||||
|
@ -1297,6 +1334,8 @@ TEST(tableMeta, rmStbMeta) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
uint32_t n = ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM);
|
||||||
if (0 == n) {
|
if (0 == n) {
|
||||||
|
@ -1326,7 +1365,6 @@ TEST(tableMeta, rmStbMeta) {
|
||||||
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
|
ASSERT_EQ(ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableMeta, updateStbMeta) {
|
TEST(tableMeta, updateStbMeta) {
|
||||||
|
@ -1416,7 +1454,36 @@ TEST(tableMeta, updateStbMeta) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt.stat, 0, sizeof(gCtgMgmt.stat));
|
}
|
||||||
|
|
||||||
|
TEST(getIndexInfo, notExists) {
|
||||||
|
struct SCatalog *pCtg = NULL;
|
||||||
|
SRequestConnInfo connInfo = {0};
|
||||||
|
SRequestConnInfo *mockPointer = (SRequestConnInfo *)&connInfo;
|
||||||
|
SVgroupInfo vgInfo = {0};
|
||||||
|
SArray *vgList = NULL;
|
||||||
|
|
||||||
|
ctgTestInitLogFile();
|
||||||
|
|
||||||
|
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
|
||||||
|
ctgTestRspIdx = 0;
|
||||||
|
ctgTestRspFunc[0] = CTGT_RSP_INDEXINFO_E;
|
||||||
|
|
||||||
|
ctgTestSetRspByIdx();
|
||||||
|
|
||||||
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
|
int32_t code = catalogInit(NULL);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
SIndexInfo info;
|
||||||
|
code = catalogGetIndexMeta(pCtg, mockPointer, "index1", &info);
|
||||||
|
ASSERT_TRUE(code != 0);
|
||||||
|
|
||||||
|
catalogDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(refreshGetMeta, normal2normal) {
|
TEST(refreshGetMeta, normal2normal) {
|
||||||
|
@ -1496,7 +1563,6 @@ TEST(refreshGetMeta, normal2normal) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(refreshGetMeta, normal2notexist) {
|
TEST(refreshGetMeta, normal2notexist) {
|
||||||
|
@ -1567,7 +1633,6 @@ TEST(refreshGetMeta, normal2notexist) {
|
||||||
ASSERT_TRUE(tableMeta == NULL);
|
ASSERT_TRUE(tableMeta == NULL);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(refreshGetMeta, normal2child) {
|
TEST(refreshGetMeta, normal2child) {
|
||||||
|
@ -1649,7 +1714,6 @@ TEST(refreshGetMeta, normal2child) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
ctgTestCurrentCTableName = NULL;
|
ctgTestCurrentCTableName = NULL;
|
||||||
ctgTestCurrentSTableName = NULL;
|
ctgTestCurrentSTableName = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1737,7 +1801,6 @@ TEST(refreshGetMeta, stable2child) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
ctgTestCurrentCTableName = NULL;
|
ctgTestCurrentCTableName = NULL;
|
||||||
ctgTestCurrentSTableName = NULL;
|
ctgTestCurrentSTableName = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1824,7 +1887,6 @@ TEST(refreshGetMeta, stable2stable) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
ctgTestCurrentCTableName = NULL;
|
ctgTestCurrentCTableName = NULL;
|
||||||
ctgTestCurrentSTableName = NULL;
|
ctgTestCurrentSTableName = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1911,7 +1973,6 @@ TEST(refreshGetMeta, child2stable) {
|
||||||
taosMemoryFreeClear(tableMeta);
|
taosMemoryFreeClear(tableMeta);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
ctgTestCurrentCTableName = NULL;
|
ctgTestCurrentCTableName = NULL;
|
||||||
ctgTestCurrentSTableName = NULL;
|
ctgTestCurrentSTableName = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1951,7 +2012,6 @@ TEST(tableDistVgroup, normalTable) {
|
||||||
ASSERT_TRUE(code != 0);
|
ASSERT_TRUE(code != 0);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableDistVgroup, childTableCase) {
|
TEST(tableDistVgroup, childTableCase) {
|
||||||
|
@ -1990,7 +2050,6 @@ TEST(tableDistVgroup, childTableCase) {
|
||||||
ASSERT_TRUE(code != 0);
|
ASSERT_TRUE(code != 0);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tableDistVgroup, superTableCase) {
|
TEST(tableDistVgroup, superTableCase) {
|
||||||
|
@ -2037,8 +2096,9 @@ TEST(tableDistVgroup, superTableCase) {
|
||||||
ASSERT_EQ(vgInfo->vgId, 3);
|
ASSERT_EQ(vgInfo->vgId, 3);
|
||||||
ASSERT_EQ(vgInfo->epSet.numOfEps, 3);
|
ASSERT_EQ(vgInfo->epSet.numOfEps, 3);
|
||||||
|
|
||||||
|
taosArrayDestroy(vgList);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(dbVgroup, getSetDbVgroupCase) {
|
TEST(dbVgroup, getSetDbVgroupCase) {
|
||||||
|
@ -2077,6 +2137,8 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
|
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
|
||||||
|
|
||||||
|
taosArrayDestroy(vgList);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
uint64_t n = 0;
|
uint64_t n = 0;
|
||||||
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
|
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
|
||||||
|
@ -2117,7 +2179,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
||||||
ASSERT_TRUE(code != 0);
|
ASSERT_TRUE(code != 0);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, getSetRmSameDbVgroup) {
|
TEST(multiThread, getSetRmSameDbVgroup) {
|
||||||
|
@ -2170,7 +2231,6 @@ TEST(multiThread, getSetRmSameDbVgroup) {
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, getSetRmDiffDbVgroup) {
|
TEST(multiThread, getSetRmDiffDbVgroup) {
|
||||||
|
@ -2223,7 +2283,6 @@ TEST(multiThread, getSetRmDiffDbVgroup) {
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, ctableMeta) {
|
TEST(multiThread, ctableMeta) {
|
||||||
|
@ -2275,7 +2334,6 @@ TEST(multiThread, ctableMeta) {
|
||||||
taosSsleep(2);
|
taosSsleep(2);
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(rentTest, allRent) {
|
TEST(rentTest, allRent) {
|
||||||
|
@ -2323,6 +2381,8 @@ TEST(rentTest, allRent) {
|
||||||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
taosMemoryFree(tableMeta);
|
||||||
|
|
||||||
while (ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) {
|
while (ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) {
|
||||||
taosMsleep(50);
|
taosMsleep(50);
|
||||||
}
|
}
|
||||||
|
@ -2353,7 +2413,6 @@ TEST(rentTest, allRent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -202,6 +202,7 @@ char* jobTaskStatusStr(int32_t status) {
|
||||||
return "UNKNOWN";
|
return "UNKNOWN";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
|
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
|
||||||
SSchema s = {0};
|
SSchema s = {0};
|
||||||
s.type = type;
|
s.type = type;
|
||||||
|
@ -211,6 +212,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
|
||||||
tstrncpy(s.name, name, tListLen(s.name));
|
tstrncpy(s.name, name, tListLen(s.name));
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void freeSTableMetaRspPointer(void *p) {
|
void freeSTableMetaRspPointer(void *p) {
|
||||||
tFreeSTableMetaRsp(*(void**)p);
|
tFreeSTableMetaRsp(*(void**)p);
|
||||||
|
|
|
@ -253,7 +253,7 @@ int32_t qwDbgEnableDebug(char *option) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == strcasecmp(option, "dead")) {
|
if (0 == strcasecmp(option, "dead")) {
|
||||||
gQWDebug.sleepSimulate = true;
|
gQWDebug.deadSimulate = true;
|
||||||
qError("qw dead debug enabled");
|
qError("qw dead debug enabled");
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,6 +146,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
@ -177,6 +178,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||||
tmsgSendRsp(&rpcRsp);
|
tmsgSendRsp(&rpcRsp);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
|
||||||
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
|
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
|
||||||
|
@ -490,6 +492,7 @@ int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -530,6 +533,7 @@ _return:
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||||
|
|
|
@ -796,7 +796,7 @@ void *fetchQueueThread(void *param) {
|
||||||
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_CANCEL_TASK:
|
case TDMT_SCH_CANCEL_TASK:
|
||||||
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
|
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
|
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||||
|
|
|
@ -915,7 +915,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
|
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||||
|
|
||||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||||
// SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
|
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
|
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "command.h"
|
#include "command.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "schInt.h"
|
#include "schInt.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
@ -184,6 +185,10 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
if (!tsEnableQueryHb) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
SQueryNodeEpId epId = {0};
|
SQueryNodeEpId epId = {0};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue