fix: scheduler UT issues

This commit is contained in:
dapan1121 2024-07-17 09:12:42 +08:00
parent 352abe0b5c
commit a62c0fda83
1 changed files with 159 additions and 31 deletions

View File

@ -75,9 +75,7 @@ int32_t schtStartFetch = 0;
void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
rpcInit();
tsAsyncLog = 0;
rpcInit();
qDebugFlag = 159;
strcpy(tsLogDir, TD_LOG_DIR_PATH);
@ -136,8 +134,13 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) {
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
void *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return terrno;
}
tEncoderInit(&ec, (uint8_t *)msg, msgSize);
tEncodeSSubmitRsp2(&ec, &submitRsp);
if (tEncodeSSubmitRsp2(&ec, &submitRsp) < 0) {
return -1;
}
tEncoderClear(&ec);
*rspMsg = msg;
@ -152,11 +155,26 @@ void schtBuildQueryDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == scan) {
return;
}
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == merge) {
return;
}
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == scanPlan) {
return;
}
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == mergePlan) {
return;
}
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
@ -170,7 +188,13 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
if (NULL == scanPlan->pParents) {
return;
}
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == scanPlan->pNode) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId;
@ -181,21 +205,33 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->execNode.epSet.numOfEps = 0;
mergePlan->pChildren = nodesMakeList();
if (NULL == mergePlan->pChildren) {
return;
}
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == mergePlan->pNode) {
return;
}
mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList();
if (NULL == merge->pNodeList) {
return;
}
scan->pNodeList = nodesMakeList();
if (NULL == scan->pNodeList) {
return;
}
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode *)scan);
(void)nodesListAppend(dag->pSubplans, (SNode *)merge);
(void)nodesListAppend(dag->pSubplans, (SNode *)scan);
}
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
@ -205,18 +241,42 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == scan) {
return;
}
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == merge) {
return;
}
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == mergePlan) {
return;
}
merge->pNodeList = nodesMakeList();
if (NULL == merge->pNodeList) {
return;
}
scan->pNodeList = nodesMakeList();
if (NULL == scan->pNodeList) {
return;
}
mergePlan->pChildren = nodesMakeList();
if (NULL == mergePlan->pChildren) {
return;
}
for (int32_t i = 0; i < scanPlanNum; ++i) {
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == scanPlan) {
return;
}
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003 + i;
@ -233,13 +293,19 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
if (NULL == scanPlan->pParents) {
return;
}
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == scanPlan->pNode) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
}
mergePlan->id.queryId = qId;
@ -251,12 +317,15 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == mergePlan->pNode) {
return;
}
mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode *)scan);
(void)nodesListAppend(dag->pSubplans, (SNode *)merge);
(void)nodesListAppend(dag->pSubplans, (SNode *)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {}
@ -267,10 +336,22 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == inserta) {
return;
}
inserta->pNodeList = nodesMakeList();
if (NULL == inserta->pNodeList) {
return;
}
SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == insertPlan) {
return;
}
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
@ -286,13 +367,22 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
if (NULL == insertPlan->pDataSink) {
return;
}
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) {
return;
}
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
(void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == insertPlan) {
return;
}
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
@ -308,22 +398,31 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
if (NULL == insertPlan->pDataSink) {
return;
}
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) {
return;
}
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
(void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
nodesListAppend(dag->pSubplans, (SNode *)inserta);
(void)nodesListAppend(dag->pSubplans, (SNode *)inserta);
}
int32_t schtPlanToString(const SSubplan *subplan, char **str, int32_t *len) {
*str = (char *)taosMemoryCalloc(1, 20);
if (NULL == *str) {
return -1;
}
*len = 20;
return 0;
}
void schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) {}
int32_t schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) { return 0; }
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {}
@ -442,16 +541,16 @@ void *schtSendRsp(void *param) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildSubmitRspMsg(&msg.len, &rmsg);
(void)schtBuildSubmitRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_VND_SUBMIT_RSP;
msg.pData = rmsg;
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
(void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
schReleaseJob(job);
(void)schReleaseJob(job);
schtJobDone = true;
@ -472,13 +571,13 @@ void *schtCreateFetchRspThread(void *param) {
int32_t code = 0;
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildFetchRspMsg(&msg.len, &rmsg);
(void)schtBuildFetchRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
schReleaseJob(job);
(void)schReleaseJob(job);
assert(code == 0);
return NULL;
@ -496,12 +595,17 @@ void *schtFetchRspThread(void *aa) {
taosUsleep(100);
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
return NULL;
}
param->queryId = schtQueryId;
param->taskId = schtFetchTaskId;
int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (NULL == rsp) {
return NULL;
}
rsp->completed = 1;
rsp->numOfRows = 10;
@ -557,12 +661,17 @@ void *schtRunJobThread(void *aa) {
schtBuildQueryDag(dag);
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
if (NULL == qnodeList) {
assert(0);
}
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
if (NULL == taosArrayPush(qnodeList, &load)) {
assert(0);
}
queryDone = 0;
@ -590,16 +699,24 @@ void *schtRunJobThread(void *aa) {
}
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == execTasks) {
assert(0);
}
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schtFetchTaskId = task->taskId - 1;
taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
if (taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task))) {
assert(0);
}
pIter = taosHashIterate(pJob->execTasks, pIter);
}
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
assert(0);
}
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
@ -611,7 +728,9 @@ void *schtRunJobThread(void *aa) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
if (schtBuildQueryRspMsg(&msg.len, &rmsg)) {
assert(0);
}
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -622,6 +741,9 @@ void *schtRunJobThread(void *aa) {
}
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
assert(0);
}
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
@ -632,7 +754,9 @@ void *schtRunJobThread(void *aa) {
param->taskId = task->taskId - 1;
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
if (schtBuildQueryRspMsg(&msg.len, &rmsg)) {
assert(0);
}
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -1110,13 +1234,17 @@ TEST(otherTest, otherCase) {
schReleaseJob(0);
schFreeRpcCtx(NULL);
ASSERT_EQ(schDumpEpSet(NULL, NULL), TSDB_CODE_SUCCESS);
char* ep = NULL;
ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS);
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
}
int main(int argc, char **argv) {
schtInitLogFile();
if (rpcInit()) {
assert(0);
}
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();