Merge pull request #24770 from taosdata/fix/TD-28011

fix: scheduler UT issues
This commit is contained in:
dapan1121 2024-02-19 15:36:15 +08:00 committed by GitHub
commit a5434fa92d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 274 additions and 207 deletions

View File

@ -66,7 +66,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
return true;
}
if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) {
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
return true;
}

View File

@ -54,9 +54,8 @@
namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode);
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode);
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode);
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
@ -67,7 +66,7 @@ uint64_t schtQueryId = 1;
bool schtTestStop = false;
bool schtTestDeadLoop = false;
int32_t schtTestMTRunSec = 10;
int32_t schtTestMTRunSec = 1;
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
@ -85,10 +84,69 @@ void schtInitLogFile() {
}
void schtQueryCb(SExecResult *pResult, void *param, int32_t code) {
assert(TSDB_CODE_SUCCESS == code);
*(int32_t *)param = 1;
}
int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) {
SQueryTableRsp rsp = {0};
rsp.code = 0;
rsp.affectedRows = 0;
rsp.tbVerInfo = NULL;
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
if (msgSize < 0) {
qError("tSerializeSQueryTableRsp failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
void *pRsp = taosMemoryCalloc(msgSize, 1);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) {
qError("tSerializeSQueryTableRsp %d failed", msgSize);
return TSDB_CODE_OUT_OF_MEMORY;
}
*rspMsg = pRsp;
*msize = msgSize;
return TSDB_CODE_SUCCESS;
}
int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) {
SRetrieveTableRsp* rsp = (SRetrieveTableRsp*)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1);
rsp->completed = 1;
rsp->numOfRows = 10;
rsp->compLen = 0;
*rspMsg = rsp;
*msize = sizeof(SRetrieveTableRsp);
return TSDB_CODE_SUCCESS;
}
int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) {
SSubmitRsp2 submitRsp = {0};
int32_t msgSize = 0, ret = 0;
SEncoder ec = {0};
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
void* msg = taosMemoryCalloc(1, msgSize);
tEncoderInit(&ec, (uint8_t*)msg, msgSize);
tEncodeSSubmitRsp2(&ec, &submitRsp);
tEncoderClear(&ec);
*rspMsg = msg;
*msize = msgSize;
return TSDB_CODE_SUCCESS;
}
void schtBuildQueryDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId;
@ -98,8 +156,8 @@ void schtBuildQueryDag(SQueryPlan *dag) {
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
@ -113,7 +171,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId;
@ -125,7 +183,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pChildren = nodesMakeList();
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList();
@ -151,8 +209,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
@ -160,29 +217,30 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->pChildren = nodesMakeList();
for (int32_t i = 0; i < scanPlanNum; ++i) {
scanPlan[i].id.queryId = qId;
scanPlan[i].id.groupId = 0x0000000000000002;
scanPlan[i].id.subplanId = 0x0000000000000003 + i;
scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN;
SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003 + i;
scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
scanPlan[i].execNode.nodeId = 1 + i;
scanPlan[i].execNode.epSet.inUse = 0;
scanPlan[i].execNodeStat.tableNum = taosRand() % 30;
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030);
addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030);
scanPlan[i].execNode.epSet.inUse = taosRand() % 3;
scanPlan->execNode.nodeId = 1 + i;
scanPlan->execNode.epSet.inUse = 0;
scanPlan->execNodeStat.tableNum = taosRand() % 30;
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep1", 6030);
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep2", 6030);
scanPlan->execNode.epSet.inUse = taosRand() % 3;
scanPlan[i].pChildren = NULL;
scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_SCH_QUERY;
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
scanPlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode *)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode *)(scanPlan + i));
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scan->pNodeList, (SNode *)(scanPlan + i));
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
}
mergePlan->id.queryId = qId;
@ -193,7 +251,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->execNode.epSet.numOfEps = 0;
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
@ -211,45 +269,50 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
insertPlan[0].id.queryId = qId;
insertPlan[0].id.groupId = 0x0000000000000003;
insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[0].level = 0;
insertPlan[0].execNode.nodeId = 1;
insertPlan[0].execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030);
insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
insertPlan[0].msgType = TDMT_VND_SUBMIT;
insertPlan[1].id.queryId = qId;
insertPlan[1].id.groupId = 0x0000000000000003;
insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[1].level = 0;
insertPlan[1].execNode.nodeId = 1;
insertPlan[1].execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030);
insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
insertPlan[1].msgType = TDMT_VND_SUBMIT;
inserta->pNodeList = nodesMakeList();
SSubplan *insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
insertPlan->id.subplanId = 0x0000000000000004;
insertPlan->subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan->level = 0;
insertPlan->execNode.nodeId = 1;
insertPlan->execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030);
insertPlan->pChildren = NULL;
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
insertPlan += 1;
insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
insertPlan->id.subplanId = 0x0000000000000005;
insertPlan->subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan->level = 0;
insertPlan->execNode.nodeId = 1;
insertPlan->execNode.epSet.inUse = 0;
addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030);
insertPlan->pChildren = NULL;
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
((SDataInserterNode*)insertPlan->pDataSink)->size = 1;
((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
nodesListAppend(dag->pSubplans, (SNode *)inserta);
@ -325,7 +388,7 @@ void schtSetRpcSendRequest() {
}
}
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo) {
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, bool persistHandle, void* rpcCtx) {
if (pInfo) {
taosMemoryFreeClear(pInfo->param);
taosMemoryFreeClear(pInfo->msgInfo.pData);
@ -336,17 +399,17 @@ int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTr
void schtSetAsyncSendMsgToServer() {
static Stub stub;
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
stub.set(asyncSendMsgToServerExt, schtAsyncSendMsgToServer);
{
#ifdef WINDOWS
AddrAny any;
std::map<std::string, void *> result;
any.get_func_addr("asyncSendMsgToServer", result);
any.get_func_addr("asyncSendMsgToServerExt", result);
#endif
#ifdef LINUX
AddrAny any("libtransport.so");
std::map<std::string, void *> result;
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
any.get_global_func_addr_dynsym("^asyncSendMsgToServerExt$", result);
#endif
for (const auto &f : result) {
stub.set(f.second, schtAsyncSendMsgToServer);
@ -374,9 +437,13 @@ void *schtSendRsp(void *param) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildSubmitRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_VND_SUBMIT_RSP;
msg.pData = rmsg;
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
@ -393,11 +460,13 @@ void *schtCreateFetchRspThread(void *param) {
taosSsleep(1);
int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
rsp->completed = 1;
rsp->numOfRows = 10;
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildFetchRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
schReleaseJob(job);
@ -414,7 +483,7 @@ void *schtFetchRspThread(void *aa) {
continue;
}
taosUsleep(1);
taosUsleep(100);
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
@ -426,10 +495,11 @@ void *schtFetchRspThread(void *aa) {
rsp->completed = 1;
rsp->numOfRows = 10;
dataBuf.msgType = TDMT_SCH_FETCH_RSP;
dataBuf.pData = rsp;
dataBuf.len = sizeof(*rsp);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
code = schHandleCallback(param, &dataBuf, 0);
assert(code == 0 || code);
}
@ -456,7 +526,7 @@ void *schtRunJobThread(void *aa) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SQueryPlan dag;
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
schtInitLogFile();
@ -470,19 +540,19 @@ void *schtRunJobThread(void *aa) {
SSchJob *pJob = NULL;
SSchTaskCallbackParam *param = NULL;
SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0};
uint32_t jobFinished = 0;
int32_t queryDone = 0;
while (!schtTestStop) {
schtBuildQueryDag(&dag);
schtBuildQueryDag(dag);
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
queryDone = 0;
@ -492,7 +562,7 @@ void *schtRunJobThread(void *aa) {
req.syncReq = false;
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.pDag = dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.cbParam = &queryDone;
@ -503,7 +573,7 @@ void *schtRunJobThread(void *aa) {
pJob = schAcquireJob(queryJobRefId);
if (NULL == pJob) {
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
schtFreeQueryDag(dag);
continue;
}
@ -526,11 +596,14 @@ void *schtRunJobThread(void *aa) {
SSchTask *task = (SSchTask *)pIter;
param->taskId = task->taskId;
SQueryTableRsp rsp = {0};
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleCallback(param, &msg, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
@ -545,11 +618,13 @@ void *schtRunJobThread(void *aa) {
SSchTask *task = (SSchTask *)pIter;
param->taskId = task->taskId - 1;
SQueryTableRsp rsp = {0};
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
code = schHandleCallback(param, &msg, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
@ -575,7 +650,6 @@ void *schtRunJobThread(void *aa) {
if (0 == code) {
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
assert(pRsp->completed == 1);
assert(pRsp->numOfRows == 10);
}
data = NULL;
@ -587,7 +661,7 @@ void *schtRunJobThread(void *aa) {
taosHashCleanup(execTasks);
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
schtFreeQueryDag(dag);
if (++jobFinished % schtTestPrintNum == 0) {
printf("jobFinished:%d\n", jobFinished);
@ -609,6 +683,7 @@ void *schtFreeJobThread(void *aa) {
return NULL;
}
} // namespace
TEST(queryTest, normalCase) {
@ -618,21 +693,20 @@ TEST(queryTest, normalCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
memset(&dag, 0, sizeof(dag));
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag);
schtBuildQueryDag(dag);
schtSetPlanToString();
schtSetExecNode();
@ -645,7 +719,7 @@ TEST(queryTest, normalCase) {
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.pDag = dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.cbParam = &queryDone;
@ -659,8 +733,13 @@ TEST(queryTest, normalCase) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -669,11 +748,18 @@ TEST(queryTest, normalCase) {
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
if (JOB_TASK_STATUS_EXEC == task->status) {
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
@ -703,18 +789,12 @@ TEST(queryTest, normalCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerDestroy();
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
schedulerDestroy();
}
TEST(queryTest, readyFirstCase) {
@ -724,21 +804,20 @@ TEST(queryTest, readyFirstCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
memset(&dag, 0, sizeof(dag));
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag);
schtBuildQueryDag(dag);
schtSetPlanToString();
schtSetExecNode();
@ -751,7 +830,7 @@ TEST(queryTest, readyFirstCase) {
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.pDag = dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.cbParam = &queryDone;
@ -764,8 +843,13 @@ TEST(queryTest, readyFirstCase) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -775,10 +859,18 @@ TEST(queryTest, readyFirstCase) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
if (JOB_TASK_STATUS_EXEC == task->status) {
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
@ -807,18 +899,11 @@ TEST(queryTest, readyFirstCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
schedulerDestroy();
schedulerFreeJob(&job, 0);
}
TEST(queryTest, flowCtrlCase) {
@ -828,35 +913,39 @@ TEST(queryTest, flowCtrlCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
schtInitLogFile();
taosSeedRand(taosGetTimestampSec());
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
schtBuildQueryFlowCtrlDag(&dag);
schtBuildQueryFlowCtrlDag(dag);
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
initTaskQueue();
int32_t queryDone = 0;
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.pDag = dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.cbParam = &queryDone;
@ -866,41 +955,27 @@ TEST(queryTest, flowCtrlCase) {
SSchJob *pJob = schAcquireJob(job);
bool qDone = false;
while (!qDone) {
while (!queryDone) {
void *pIter = taosHashIterate(pJob->execTasks, NULL);
if (NULL == pIter) {
break;
}
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
taosHashCancelIterate(pJob->execTasks, pIter);
if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) {
SDataBuf msg = {0};
void* rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
if (task->lastMsgType == TDMT_SCH_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
} else {
qDone = true;
break;
}
pIter = NULL;
pIter = taosHashIterate(pJob->execTasks, pIter);
}
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
@ -918,18 +993,11 @@ TEST(queryTest, flowCtrlCase) {
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(&job, 0);
schtFreeQueryDag(&dag);
schedulerDestroy();
schedulerFreeJob(&job, 0);
}
TEST(insertTest, normalCase) {
@ -938,20 +1006,21 @@ TEST(insertTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SQueryPlan dag;
SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
uint64_t numOfRows = 0;
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
int32_t code = schedulerInit();
ASSERT_EQ(code, 0);
schtBuildInsertDag(&dag);
schtBuildInsertDag(dag);
schtSetPlanToString();
schtSetAsyncSendMsgToServer();
@ -962,21 +1031,19 @@ TEST(insertTest, normalCase) {
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SExecResult res = {0};
int32_t queryDone = 0;
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.pDag = dag;
req.sql = "insert into tb values(now,1)";
req.execFp = schtQueryCb;
req.cbParam = NULL;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &insertJobRefId);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
schedulerFreeJob(&insertJobRefId, 0);
@ -989,7 +1056,7 @@ TEST(multiThread, forceFree) {
TdThread thread1, thread2, thread3;
taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL);
taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL);
taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL);
while (true) {
@ -1002,7 +1069,7 @@ TEST(multiThread, forceFree) {
}
schtTestStop = true;
taosSsleep(3);
//taosSsleep(3);
}
int main(int argc, char **argv) {