diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 6dd42c3a93..efbe1fcc83 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -41,6 +41,7 @@ #include "tvariant.h" #include "stub.h" #include "querytask.h" +#include "hashjoin.h" namespace { @@ -3766,6 +3767,21 @@ TEST(leftWinJoin, noCondProjectionTest) { #endif +#if 1 +TEST(functionsTest, branch) { + struct SOperatorInfo op = {0}; + SHJoinOperatorInfo join; + SBufRowInfo bufrow = {0}; + SSDataBlock blk = {0}; + + op.info = &join; + memset(&join, 0, sizeof(join)); + join.ctx.pBuildRow = &bufrow; + blk.info.rows = 1; + join.finBlk = &blk; + hInnerJoinDo(&op); +} +#endif int main(int argc, char** argv) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index c3aa95f5b7..ee460e2610 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -161,6 +161,10 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); } int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { + if (NULL == pSubplan) { + return terrno = TSDB_CODE_INVALID_PARA; + } + if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; *pLen = insert->size; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 607f43a06f..cdbeb2a2e0 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -149,11 +149,11 @@ typedef struct SSchedulerMgmt { bool exit; int32_t jobRef; int32_t jobNum; - SSchStat stat; void *timer; SRWLatch hbLock; SHashObj *hbConnections; void *queryMgmt; + SSchStat stat; } SSchedulerMgmt; typedef struct SSchCallbackParamHeader { diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index d9a0b88e40..c66f1dafe6 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -50,11 +50,6 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); for (int32_t i = 0; i < taskNum; ++i) { SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); - if (NULL == pTask) { - SCH_JOB_DLOG("fail to get the %dth task", i); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - sum += pTask->plan->execNodeStat.tableNum; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index b31353e97f..0e76e21aff 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -273,18 +273,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (taskDone < pTask->level->taskNum) { SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; - } else if (taskDone > pTask->level->taskNum) { - SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum); } + + SCH_TASK_DLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum); if (pTask->level->taskFailed > 0) { SCH_RET(schHandleJobFailure(pJob, pJob->errCode)); - } else { - SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } - } else { - pJob->resNode = pTask->succeedAddr; + + SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } + + pJob->resNode = pTask->succeedAddr; pJob->fetchTask = pTask; @@ -600,12 +600,12 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - SCH_TASK_DLOG("task already in execTask list, code:%x", code); + SCH_TASK_DLOG("task already in execTask list, code:0x%x", code); return TSDB_CODE_SUCCESS; } - SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno); - SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code); + SCH_ERR_RET(code); } SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); @@ -800,11 +800,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < nodeNum; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); - if (NULL == nload) { - SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - SQueryNodeAddr *naddr = &nload->addr; if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 50b2527802..135c151743 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -63,6 +63,15 @@ extern "C" int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask); extern "C" int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType); extern "C" int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); extern "C" int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode); +extern "C" void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel); +extern "C" int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask); +extern "C" int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId); +extern "C" int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask); +extern "C" int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask); +extern "C" int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type); +extern "C" int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask); +extern "C" int32_t schLaunchTaskImpl(void *param); +extern "C" void schHandleTimerEvent(void *param, void *tmrId); int64_t insertJobRefId = 0; @@ -1480,6 +1489,8 @@ TEST(otherTest, function) { TEST(otherTest, branch) { SSchJob job = {0}; SSchTask task = {0}; + memset(&schMgmt, 0, sizeof(schMgmt)); + schValidateRspMsgType(&job, &task, TDMT_SCH_MERGE_FETCH_RSP); task.lastMsgType = TDMT_SCH_MERGE_FETCH_RSP - 1; @@ -1532,6 +1543,7 @@ TEST(otherTest, branch) { databuf.pData = taosMemoryMalloc(0); job.status = JOB_TASK_STATUS_FAIL; schProcessResponseMsg(&job, &task, &databuf, 0); + job.status = 0; job.attr.explainMode = EXPLAIN_MODE_ANALYZE; databuf.msgType = TDMT_SCH_EXPLAIN_RSP; @@ -1570,7 +1582,163 @@ TEST(otherTest, branch) { schBuildAndSendHbMsg(&ep, NULL); schBuildAndSendMsg(&job, &task, NULL, 0, NULL); - + + SSchLevel level = {0}; + SSubplan subplan; + memset(&subplan, 0, sizeof(subplan)); + job.attr.queryJob = true; + schMgmt.cfg.schPolicy = SCH_ALL; + task.plan = &subplan; + schInitTaskRetryTimes(&job, &task, &level); + job.attr.queryJob = false; + memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg)); + memset(&level, 0, sizeof(level)); + + schRecordTaskSucceedNode(&job, &task); + + schDropTaskExecNode(&job, &task, NULL, 0); + + task.execNodes = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + schDropTaskExecNode(&job, &task, NULL, 0); + + int32_t execId = 0; + task.execId = 1; + (void)taosHashPut(task.execNodes, &execId, sizeof(execId), &execId, sizeof(execId)); + schDropTaskExecNode(&job, &task, NULL, execId); + task.execId = 0; + taosHashCleanup(task.execNodes); + task.execNodes = NULL; + + job.status = JOB_TASK_STATUS_FAIL; + schProcessOnTaskFailure(&job, &task, 0); + job.status = 0; + + task.status = JOB_TASK_STATUS_FAIL; + schProcessOnTaskFailure(&job, &task, 0); + task.status = 0; + + task.level = &level; + schProcessOnTaskFailure(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR); + memset(&level, 0, sizeof(level)); + task.level = NULL; + + subplan.subplanType = SUBPLAN_TYPE_SCAN; + task.plan = &subplan; + SEpSet epset = {0}; + epset.numOfEps = 127; + schChkUpdateRedirectCtx(&job, &task, &epset, 0); + + schChkUpdateRedirectCtx(&job, &task, NULL, 0); + task.plan = NULL; + + schPushTaskToExecList(&job, &task); + + job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + taosHashPut(job.execTasks, &task.taskId, sizeof(task.taskId), &task, POINTER_BYTES); + schPushTaskToExecList(&job, &task); + taosHashCleanup(job.execTasks); + job.execTasks = NULL; + + bool needRetry = false; + task.timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC / 2 + 1; + task.retryTimes = 0; + task.maxRetryTimes = 0; + schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry); + + task.execId = 0; + task.retryTimes = 0; + task.maxRetryTimes = 100; + task.maxExecTimes = 1; + schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry); + + + task.execId = 0; + task.retryTimes = 0; + task.maxRetryTimes = 100; + task.maxExecTimes = 100; + task.lastMsgType = TDMT_SCH_LINK_BROKEN; + schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry); + + schSetAddrsFromNodeList(&job, &task); + + schSwitchTaskCandidateAddr(&job, &task); + + + task.candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr)); + SQueryNodeAddr addr = {0}; + taosArrayPush(task.candidateAddrs, &addr); + taosArrayPush(task.candidateAddrs, &addr); + schMgmt.cfg.schPolicy = SCH_LOAD_SEQ; + task.candidateIdx = 1; + schSwitchTaskCandidateAddr(&job, &task); + + schMgmt.cfg.schPolicy = SCH_RANDOM; + schSwitchTaskCandidateAddr(&job, &task); + taosArrayDestroy(task.candidateAddrs); + task.candidateAddrs = NULL; + memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg)); + task.candidateIdx = 0; + + schDropTaskOnExecNode(&job, &task); + + schNotifyTaskOnExecNode(&job, &task, TASK_NOTIFY_FINISHED); + + schLaunchRemoteTask(&job, &task); + + SSchTaskCtx* pCtx = (SSchTaskCtx*)taosMemoryCalloc(1, sizeof(SSchTaskCtx)); + pCtx->jobRid = -1; + schLaunchTaskImpl((void*)pCtx); + + task.plan = &subplan; + subplan.subplanType = SUBPLAN_TYPE_SCAN; + job.attr.needFlowCtrl = true; + level.taskNum = 1000; + task.level = &level; + schLaunchTask(&job, &task); + task.plan = NULL; + task.level = NULL; + job.attr.needFlowCtrl = false; + + SSchTimerParam param = {0}; + param.rId = -1; + schHandleTimerEvent(¶m, NULL); + + job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + task.delayExecMs = 1; + schMgmt.timer = NULL; + schDelayLaunchTask(&job, &task); + task.delayExecMs = 0; + taosHashCleanup(job.execTasks); + job.execTasks = NULL; + + job.fetchRes = (void*)0x1; + schLaunchFetchTask(&job); + job.fetchRes = NULL; + + job.fetchTask = &task; + job.attr.localExec = true; + job.attr.queryJob = true; + subplan.subplanType = SUBPLAN_TYPE_MERGE; + task.plan = &subplan; + void* p = taosMemoryCalloc(1, 1024); + schMgmt.queryMgmt = p; + schLaunchFetchTask(&job); + memset(&job, 0, sizeof(job)); + memset(&subplan, 0, sizeof(subplan)); + task.plan = NULL; + taosMemoryFreeClear(schMgmt.queryMgmt); + + // flow ctrl + + job.flowCtrl = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + SEp sep = {0}; + SSchFlowControl nctrl = {0}; + nctrl.taskList = taosArrayInit(1, POINTER_BYTES); + taosHashPut(job.flowCtrl, &sep, sizeof(SEp), &nctrl, sizeof(nctrl)); + schFreeFlowCtrl(&job); + + + schMgmt.jobRef = -1; }