commit
1c774f021f
|
@ -36,9 +36,9 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tvariant.h"
|
#include "tvariant.h"
|
||||||
#include "tmisce.h"
|
|
||||||
|
|
||||||
#pragma GCC diagnostic push
|
#pragma GCC diagnostic push
|
||||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
@ -54,7 +54,8 @@
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode);
|
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg,
|
||||||
|
int32_t rspCode);
|
||||||
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
|
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
|
||||||
|
|
||||||
int64_t insertJobRefId = 0;
|
int64_t insertJobRefId = 0;
|
||||||
|
@ -74,8 +75,9 @@ int32_t schtStartFetch = 0;
|
||||||
void schtInitLogFile() {
|
void schtInitLogFile() {
|
||||||
const char *defaultLogFileNamePrefix = "taoslog";
|
const char *defaultLogFileNamePrefix = "taoslog";
|
||||||
const int32_t maxLogFileNum = 10;
|
const int32_t maxLogFileNum = 10;
|
||||||
|
rpcInit();
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
|
rpcInit();
|
||||||
qDebugFlag = 159;
|
qDebugFlag = 159;
|
||||||
strcpy(tsLogDir, TD_LOG_DIR_PATH);
|
strcpy(tsLogDir, TD_LOG_DIR_PATH);
|
||||||
|
|
||||||
|
@ -84,12 +86,10 @@ void schtInitLogFile() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void schtQueryCb(SExecResult *pResult, void *param, int32_t code) {
|
void schtQueryCb(SExecResult *pResult, void *param, int32_t code) { *(int32_t *)param = 1; }
|
||||||
*(int32_t *)param = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) {
|
int32_t schtBuildQueryRspMsg(uint32_t *msize, void **rspMsg) {
|
||||||
SQueryTableRsp rsp = {0};
|
SQueryTableRsp rsp = {0};
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
rsp.affectedRows = 0;
|
rsp.affectedRows = 0;
|
||||||
rsp.tbVerInfo = NULL;
|
rsp.tbVerInfo = NULL;
|
||||||
|
@ -99,7 +99,7 @@ int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) {
|
||||||
qError("tSerializeSQueryTableRsp failed");
|
qError("tSerializeSQueryTableRsp failed");
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pRsp = taosMemoryCalloc(msgSize, 1);
|
void *pRsp = taosMemoryCalloc(msgSize, 1);
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
qError("rpcMallocCont %d failed", msgSize);
|
qError("rpcMallocCont %d failed", msgSize);
|
||||||
|
@ -117,9 +117,8 @@ int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schtBuildFetchRspMsg(uint32_t *msize, void **rspMsg) {
|
||||||
int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) {
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1);
|
||||||
SRetrieveTableRsp* rsp = (SRetrieveTableRsp*)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1);
|
|
||||||
rsp->completed = 1;
|
rsp->completed = 1;
|
||||||
rsp->numOfRows = 10;
|
rsp->numOfRows = 10;
|
||||||
rsp->compLen = 0;
|
rsp->compLen = 0;
|
||||||
|
@ -130,14 +129,14 @@ int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) {
|
int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) {
|
||||||
SSubmitRsp2 submitRsp = {0};
|
SSubmitRsp2 submitRsp = {0};
|
||||||
int32_t msgSize = 0, ret = 0;
|
int32_t msgSize = 0, ret = 0;
|
||||||
SEncoder ec = {0};
|
SEncoder ec = {0};
|
||||||
|
|
||||||
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
|
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
|
||||||
void* msg = taosMemoryCalloc(1, msgSize);
|
void *msg = taosMemoryCalloc(1, msgSize);
|
||||||
tEncoderInit(&ec, (uint8_t*)msg, msgSize);
|
tEncoderInit(&ec, (uint8_t *)msg, msgSize);
|
||||||
tEncodeSSubmitRsp2(&ec, &submitRsp);
|
tEncodeSSubmitRsp2(&ec, &submitRsp);
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
|
|
||||||
|
@ -147,7 +146,6 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void schtBuildQueryDag(SQueryPlan *dag) {
|
void schtBuildQueryDag(SQueryPlan *dag) {
|
||||||
uint64_t qId = schtQueryId;
|
uint64_t qId = schtQueryId;
|
||||||
|
|
||||||
|
@ -157,8 +155,8 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
||||||
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||||
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||||
|
|
||||||
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
|
|
||||||
scanPlan->id.queryId = qId;
|
scanPlan->id.queryId = qId;
|
||||||
scanPlan->id.groupId = 0x0000000000000002;
|
scanPlan->id.groupId = 0x0000000000000002;
|
||||||
|
@ -210,7 +208,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
||||||
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||||
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||||
|
|
||||||
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
|
|
||||||
merge->pNodeList = nodesMakeList();
|
merge->pNodeList = nodesMakeList();
|
||||||
scan->pNodeList = nodesMakeList();
|
scan->pNodeList = nodesMakeList();
|
||||||
|
@ -218,7 +216,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
||||||
mergePlan->pChildren = nodesMakeList();
|
mergePlan->pChildren = nodesMakeList();
|
||||||
|
|
||||||
for (int32_t i = 0; i < scanPlanNum; ++i) {
|
for (int32_t i = 0; i < scanPlanNum; ++i) {
|
||||||
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
scanPlan->id.queryId = qId;
|
scanPlan->id.queryId = qId;
|
||||||
scanPlan->id.groupId = 0x0000000000000002;
|
scanPlan->id.groupId = 0x0000000000000002;
|
||||||
scanPlan->id.subplanId = 0x0000000000000003 + i;
|
scanPlan->id.subplanId = 0x0000000000000003 + i;
|
||||||
|
@ -272,7 +270,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
|
||||||
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||||
inserta->pNodeList = nodesMakeList();
|
inserta->pNodeList = nodesMakeList();
|
||||||
|
|
||||||
SSubplan *insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
|
|
||||||
insertPlan->id.queryId = qId;
|
insertPlan->id.queryId = qId;
|
||||||
insertPlan->id.groupId = 0x0000000000000003;
|
insertPlan->id.groupId = 0x0000000000000003;
|
||||||
|
@ -287,14 +285,14 @@ void schtBuildInsertDag(SQueryPlan *dag) {
|
||||||
insertPlan->pChildren = NULL;
|
insertPlan->pChildren = NULL;
|
||||||
insertPlan->pParents = NULL;
|
insertPlan->pParents = NULL;
|
||||||
insertPlan->pNode = NULL;
|
insertPlan->pNode = NULL;
|
||||||
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
||||||
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
|
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
|
||||||
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
||||||
insertPlan->msgType = TDMT_VND_SUBMIT;
|
insertPlan->msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
||||||
|
|
||||||
insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||||
|
|
||||||
insertPlan->id.queryId = qId;
|
insertPlan->id.queryId = qId;
|
||||||
insertPlan->id.groupId = 0x0000000000000003;
|
insertPlan->id.groupId = 0x0000000000000003;
|
||||||
|
@ -309,9 +307,9 @@ void schtBuildInsertDag(SQueryPlan *dag) {
|
||||||
insertPlan->pChildren = NULL;
|
insertPlan->pChildren = NULL;
|
||||||
insertPlan->pParents = NULL;
|
insertPlan->pParents = NULL;
|
||||||
insertPlan->pNode = NULL;
|
insertPlan->pNode = NULL;
|
||||||
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
|
||||||
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
|
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
|
||||||
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
|
||||||
insertPlan->msgType = TDMT_VND_SUBMIT;
|
insertPlan->msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
|
||||||
|
@ -389,7 +387,8 @@ void schtSetRpcSendRequest() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, bool persistHandle, void* rpcCtx) {
|
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo,
|
||||||
|
bool persistHandle, void *rpcCtx) {
|
||||||
if (pInfo) {
|
if (pInfo) {
|
||||||
taosMemoryFreeClear(pInfo->param);
|
taosMemoryFreeClear(pInfo->param);
|
||||||
taosMemoryFreeClear(pInfo->msgInfo.pData);
|
taosMemoryFreeClear(pInfo->msgInfo.pData);
|
||||||
|
@ -439,11 +438,11 @@ void *schtSendRsp(void *param) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildSubmitRspMsg(&msg.len, &rmsg);
|
schtBuildSubmitRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_VND_SUBMIT_RSP;
|
msg.msgType = TDMT_VND_SUBMIT_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||||
|
@ -452,7 +451,7 @@ void *schtSendRsp(void *param) {
|
||||||
schReleaseJob(job);
|
schReleaseJob(job);
|
||||||
|
|
||||||
schtJobDone = true;
|
schtJobDone = true;
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,13 +461,13 @@ void *schtCreateFetchRspThread(void *param) {
|
||||||
|
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildFetchRspMsg(&msg.len, &rmsg);
|
schtBuildFetchRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
|
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
|
||||||
|
|
||||||
schReleaseJob(job);
|
schReleaseJob(job);
|
||||||
|
@ -529,7 +528,7 @@ void *schtRunJobThread(void *aa) {
|
||||||
char *dbname = "1.db1";
|
char *dbname = "1.db1";
|
||||||
char *tablename = "table1";
|
char *tablename = "table1";
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||||
|
|
||||||
schtInitLogFile();
|
schtInitLogFile();
|
||||||
|
|
||||||
|
@ -601,7 +600,7 @@ void *schtRunJobThread(void *aa) {
|
||||||
param->taskId = task->taskId;
|
param->taskId = task->taskId;
|
||||||
|
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
@ -622,7 +621,7 @@ void *schtRunJobThread(void *aa) {
|
||||||
|
|
||||||
param->taskId = task->taskId - 1;
|
param->taskId = task->taskId - 1;
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
@ -686,7 +685,6 @@ void *schtFreeJobThread(void *aa) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
TEST(queryTest, normalCase) {
|
TEST(queryTest, normalCase) {
|
||||||
|
@ -696,7 +694,7 @@ TEST(queryTest, normalCase) {
|
||||||
char *tablename = "table1";
|
char *tablename = "table1";
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
int64_t job = 0;
|
int64_t job = 0;
|
||||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||||
|
|
||||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
|
@ -737,13 +735,13 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(pJob->execTasks, pIter);
|
pIter = taosHashIterate(pJob->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
@ -753,13 +751,13 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
if (JOB_TASK_STATUS_EXEC == task->status) {
|
if (JOB_TASK_STATUS_EXEC == task->status) {
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -793,7 +791,7 @@ TEST(queryTest, normalCase) {
|
||||||
taosMemoryFreeClear(data);
|
taosMemoryFreeClear(data);
|
||||||
|
|
||||||
schReleaseJob(job);
|
schReleaseJob(job);
|
||||||
|
|
||||||
schedulerDestroy();
|
schedulerDestroy();
|
||||||
|
|
||||||
schedulerFreeJob(&job, 0);
|
schedulerFreeJob(&job, 0);
|
||||||
|
@ -808,7 +806,7 @@ TEST(queryTest, readyFirstCase) {
|
||||||
char *tablename = "table1";
|
char *tablename = "table1";
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
int64_t job = 0;
|
int64_t job = 0;
|
||||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||||
|
|
||||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||||
|
|
||||||
|
@ -816,7 +814,7 @@ TEST(queryTest, readyFirstCase) {
|
||||||
load.addr.epSet.numOfEps = 1;
|
load.addr.epSet.numOfEps = 1;
|
||||||
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
|
||||||
load.addr.epSet.eps[0].port = 6031;
|
load.addr.epSet.eps[0].port = 6031;
|
||||||
taosArrayPush(qnodeList, &load);
|
taosArrayPush(qnodeList, &load);
|
||||||
|
|
||||||
int32_t code = schedulerInit();
|
int32_t code = schedulerInit();
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
@ -848,11 +846,11 @@ TEST(queryTest, readyFirstCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
@ -865,13 +863,13 @@ TEST(queryTest, readyFirstCase) {
|
||||||
|
|
||||||
if (JOB_TASK_STATUS_EXEC == task->status) {
|
if (JOB_TASK_STATUS_EXEC == task->status) {
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -919,7 +917,7 @@ TEST(queryTest, flowCtrlCase) {
|
||||||
char *tablename = "table1";
|
char *tablename = "table1";
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
int64_t job = 0;
|
int64_t job = 0;
|
||||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||||
|
|
||||||
schtInitLogFile();
|
schtInitLogFile();
|
||||||
|
|
||||||
|
@ -933,7 +931,6 @@ TEST(queryTest, flowCtrlCase) {
|
||||||
load.addr.epSet.eps[0].port = 6031;
|
load.addr.epSet.eps[0].port = 6031;
|
||||||
taosArrayPush(qnodeList, &load);
|
taosArrayPush(qnodeList, &load);
|
||||||
|
|
||||||
|
|
||||||
int32_t code = schedulerInit();
|
int32_t code = schedulerInit();
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
@ -968,13 +965,13 @@ TEST(queryTest, flowCtrlCase) {
|
||||||
|
|
||||||
if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) {
|
if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) {
|
||||||
SDataBuf msg = {0};
|
SDataBuf msg = {0};
|
||||||
void* rmsg = NULL;
|
void *rmsg = NULL;
|
||||||
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
schtBuildQueryRspMsg(&msg.len, &rmsg);
|
||||||
msg.msgType = TDMT_SCH_QUERY_RSP;
|
msg.msgType = TDMT_SCH_QUERY_RSP;
|
||||||
msg.pData = rmsg;
|
msg.pData = rmsg;
|
||||||
|
|
||||||
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1005,7 +1002,7 @@ TEST(queryTest, flowCtrlCase) {
|
||||||
|
|
||||||
schedulerFreeJob(&job, 0);
|
schedulerFreeJob(&job, 0);
|
||||||
|
|
||||||
taosThreadJoin(thread1, NULL);
|
taosThreadJoin(thread1, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(insertTest, normalCase) {
|
TEST(insertTest, normalCase) {
|
||||||
|
@ -1014,7 +1011,7 @@ TEST(insertTest, normalCase) {
|
||||||
char *dbname = "1.db1";
|
char *dbname = "1.db1";
|
||||||
char *tablename = "table1";
|
char *tablename = "table1";
|
||||||
SVgroupInfo vgInfo = {0};
|
SVgroupInfo vgInfo = {0};
|
||||||
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
|
||||||
uint64_t numOfRows = 0;
|
uint64_t numOfRows = 0;
|
||||||
|
|
||||||
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
|
||||||
|
@ -1067,7 +1064,7 @@ TEST(insertTest, normalCase) {
|
||||||
|
|
||||||
schedulerDestroy();
|
schedulerDestroy();
|
||||||
|
|
||||||
taosThreadJoin(thread1, NULL);
|
taosThreadJoin(thread1, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, forceFree) {
|
TEST(multiThread, forceFree) {
|
||||||
|
@ -1076,7 +1073,7 @@ TEST(multiThread, forceFree) {
|
||||||
|
|
||||||
TdThread thread1, thread2, thread3;
|
TdThread thread1, thread2, thread3;
|
||||||
taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
|
taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
|
||||||
// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
|
// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
|
||||||
taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
|
taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -1089,7 +1086,7 @@ TEST(multiThread, forceFree) {
|
||||||
}
|
}
|
||||||
|
|
||||||
schtTestStop = true;
|
schtTestStop = true;
|
||||||
//taosSsleep(3);
|
// taosSsleep(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(otherTest, otherCase) {
|
TEST(otherTest, otherCase) {
|
||||||
|
@ -1097,12 +1094,13 @@ TEST(otherTest, otherCase) {
|
||||||
schReleaseJob(0);
|
schReleaseJob(0);
|
||||||
schFreeRpcCtx(NULL);
|
schFreeRpcCtx(NULL);
|
||||||
|
|
||||||
ASSERT_EQ(schDumpEpSet(NULL), (char*)NULL);
|
ASSERT_EQ(schDumpEpSet(NULL), (char *)NULL);
|
||||||
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
|
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
|
||||||
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
|
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
|
schtInitLogFile();
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
testing::InitGoogleTest(&argc, argv);
|
testing::InitGoogleTest(&argc, argv);
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
|
|
|
@ -832,6 +832,9 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||||
taosInitRWLatch(&exh->latch);
|
taosInitRWLatch(&exh->latch);
|
||||||
|
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||||
|
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||||
|
ASSERT(exh == self);
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
taosInitRWLatch(&exh->latch);
|
taosInitRWLatch(&exh->latch);
|
||||||
|
|
||||||
|
@ -2829,10 +2832,11 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||||
|
|
||||||
int64_t transAllocHandle() {
|
int64_t transAllocHandle() {
|
||||||
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
|
||||||
QUEUE_INIT(&exh->q);
|
|
||||||
taosInitRWLatch(&exh->latch);
|
|
||||||
|
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
||||||
|
SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
||||||
|
ASSERT(exh == self);
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
taosInitRWLatch(&exh->latch);
|
taosInitRWLatch(&exh->latch);
|
||||||
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
tDebug("pre alloc refId %" PRId64 "", exh->refId);
|
||||||
|
|
Loading…
Reference in New Issue