fix: add more UT cases
This commit is contained in:
parent
f502b93f1b
commit
b6863a0eff
|
@ -41,6 +41,7 @@
|
||||||
#include "tvariant.h"
|
#include "tvariant.h"
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
|
#include "hashjoin.h"
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
@ -3766,6 +3767,21 @@ TEST(leftWinJoin, noCondProjectionTest) {
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
TEST(functionsTest, branch) {
|
||||||
|
struct SOperatorInfo op = {0};
|
||||||
|
SHJoinOperatorInfo join;
|
||||||
|
SBufRowInfo bufrow = {0};
|
||||||
|
SSDataBlock blk = {0};
|
||||||
|
|
||||||
|
op.info = &join;
|
||||||
|
memset(&join, 0, sizeof(join));
|
||||||
|
join.ctx.pBuildRow = &bufrow;
|
||||||
|
blk.info.rows = 1;
|
||||||
|
join.finBlk = &blk;
|
||||||
|
hInnerJoinDo(&op);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -161,6 +161,10 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
||||||
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
|
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
|
||||||
|
|
||||||
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
||||||
|
if (NULL == pSubplan) {
|
||||||
|
return terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
|
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
|
||||||
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
|
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
|
||||||
*pLen = insert->size;
|
*pLen = insert->size;
|
||||||
|
|
|
@ -149,11 +149,11 @@ typedef struct SSchedulerMgmt {
|
||||||
bool exit;
|
bool exit;
|
||||||
int32_t jobRef;
|
int32_t jobRef;
|
||||||
int32_t jobNum;
|
int32_t jobNum;
|
||||||
SSchStat stat;
|
|
||||||
void *timer;
|
void *timer;
|
||||||
SRWLatch hbLock;
|
SRWLatch hbLock;
|
||||||
SHashObj *hbConnections;
|
SHashObj *hbConnections;
|
||||||
void *queryMgmt;
|
void *queryMgmt;
|
||||||
|
SSchStat stat;
|
||||||
} SSchedulerMgmt;
|
} SSchedulerMgmt;
|
||||||
|
|
||||||
typedef struct SSchCallbackParamHeader {
|
typedef struct SSchCallbackParamHeader {
|
||||||
|
|
|
@ -50,11 +50,6 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
|
||||||
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
|
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
|
||||||
for (int32_t i = 0; i < taskNum; ++i) {
|
for (int32_t i = 0; i < taskNum; ++i) {
|
||||||
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
|
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
|
||||||
if (NULL == pTask) {
|
|
||||||
SCH_JOB_DLOG("fail to get the %dth task", i);
|
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
sum += pTask->plan->execNodeStat.tableNum;
|
sum += pTask->plan->execNodeStat.tableNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -273,18 +273,18 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
if (taskDone < pTask->level->taskNum) {
|
if (taskDone < pTask->level->taskNum) {
|
||||||
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (taskDone > pTask->level->taskNum) {
|
|
||||||
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("taskDone number reach level task number, done:%d, total:%d", taskDone, pTask->level->taskNum);
|
||||||
|
|
||||||
if (pTask->level->taskFailed > 0) {
|
if (pTask->level->taskFailed > 0) {
|
||||||
SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
|
SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
|
||||||
} else {
|
|
||||||
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
pJob->resNode = pTask->succeedAddr;
|
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pJob->resNode = pTask->succeedAddr;
|
||||||
|
|
||||||
pJob->fetchTask = pTask;
|
pJob->fetchTask = pTask;
|
||||||
|
|
||||||
|
@ -600,12 +600,12 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
|
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
|
||||||
if (0 != code) {
|
if (0 != code) {
|
||||||
if (HASH_NODE_EXIST(code)) {
|
if (HASH_NODE_EXIST(code)) {
|
||||||
SCH_TASK_DLOG("task already in execTask list, code:%x", code);
|
SCH_TASK_DLOG("task already in execTask list, code:0x%x", code);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno);
|
SCH_TASK_ELOG("taosHashPut task to execTask list failed, code:0x%x", code);
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
|
SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
|
||||||
|
@ -800,11 +800,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||||
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
|
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
|
||||||
if (NULL == nload) {
|
|
||||||
SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
|
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
SQueryNodeAddr *naddr = &nload->addr;
|
SQueryNodeAddr *naddr = &nload->addr;
|
||||||
|
|
||||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||||
|
|
|
@ -63,6 +63,15 @@ extern "C" int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask);
|
||||||
extern "C" int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType);
|
extern "C" int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType);
|
||||||
extern "C" int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
|
extern "C" int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
|
||||||
extern "C" int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
|
extern "C" int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
|
||||||
|
extern "C" void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel);
|
||||||
|
extern "C" int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
extern "C" int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId);
|
||||||
|
extern "C" int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
extern "C" int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
extern "C" int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type);
|
||||||
|
extern "C" int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask);
|
||||||
|
extern "C" int32_t schLaunchTaskImpl(void *param);
|
||||||
|
extern "C" void schHandleTimerEvent(void *param, void *tmrId);
|
||||||
|
|
||||||
|
|
||||||
int64_t insertJobRefId = 0;
|
int64_t insertJobRefId = 0;
|
||||||
|
@ -1480,6 +1489,8 @@ TEST(otherTest, function) {
|
||||||
TEST(otherTest, branch) {
|
TEST(otherTest, branch) {
|
||||||
SSchJob job = {0};
|
SSchJob job = {0};
|
||||||
SSchTask task = {0};
|
SSchTask task = {0};
|
||||||
|
memset(&schMgmt, 0, sizeof(schMgmt));
|
||||||
|
|
||||||
schValidateRspMsgType(&job, &task, TDMT_SCH_MERGE_FETCH_RSP);
|
schValidateRspMsgType(&job, &task, TDMT_SCH_MERGE_FETCH_RSP);
|
||||||
|
|
||||||
task.lastMsgType = TDMT_SCH_MERGE_FETCH_RSP - 1;
|
task.lastMsgType = TDMT_SCH_MERGE_FETCH_RSP - 1;
|
||||||
|
@ -1532,6 +1543,7 @@ TEST(otherTest, branch) {
|
||||||
databuf.pData = taosMemoryMalloc(0);
|
databuf.pData = taosMemoryMalloc(0);
|
||||||
job.status = JOB_TASK_STATUS_FAIL;
|
job.status = JOB_TASK_STATUS_FAIL;
|
||||||
schProcessResponseMsg(&job, &task, &databuf, 0);
|
schProcessResponseMsg(&job, &task, &databuf, 0);
|
||||||
|
job.status = 0;
|
||||||
|
|
||||||
job.attr.explainMode = EXPLAIN_MODE_ANALYZE;
|
job.attr.explainMode = EXPLAIN_MODE_ANALYZE;
|
||||||
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
|
databuf.msgType = TDMT_SCH_EXPLAIN_RSP;
|
||||||
|
@ -1570,7 +1582,163 @@ TEST(otherTest, branch) {
|
||||||
schBuildAndSendHbMsg(&ep, NULL);
|
schBuildAndSendHbMsg(&ep, NULL);
|
||||||
|
|
||||||
schBuildAndSendMsg(&job, &task, NULL, 0, NULL);
|
schBuildAndSendMsg(&job, &task, NULL, 0, NULL);
|
||||||
|
|
||||||
|
SSchLevel level = {0};
|
||||||
|
SSubplan subplan;
|
||||||
|
memset(&subplan, 0, sizeof(subplan));
|
||||||
|
job.attr.queryJob = true;
|
||||||
|
schMgmt.cfg.schPolicy = SCH_ALL;
|
||||||
|
task.plan = &subplan;
|
||||||
|
schInitTaskRetryTimes(&job, &task, &level);
|
||||||
|
job.attr.queryJob = false;
|
||||||
|
memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg));
|
||||||
|
memset(&level, 0, sizeof(level));
|
||||||
|
|
||||||
|
schRecordTaskSucceedNode(&job, &task);
|
||||||
|
|
||||||
|
schDropTaskExecNode(&job, &task, NULL, 0);
|
||||||
|
|
||||||
|
task.execNodes = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
schDropTaskExecNode(&job, &task, NULL, 0);
|
||||||
|
|
||||||
|
int32_t execId = 0;
|
||||||
|
task.execId = 1;
|
||||||
|
(void)taosHashPut(task.execNodes, &execId, sizeof(execId), &execId, sizeof(execId));
|
||||||
|
schDropTaskExecNode(&job, &task, NULL, execId);
|
||||||
|
task.execId = 0;
|
||||||
|
taosHashCleanup(task.execNodes);
|
||||||
|
task.execNodes = NULL;
|
||||||
|
|
||||||
|
job.status = JOB_TASK_STATUS_FAIL;
|
||||||
|
schProcessOnTaskFailure(&job, &task, 0);
|
||||||
|
job.status = 0;
|
||||||
|
|
||||||
|
task.status = JOB_TASK_STATUS_FAIL;
|
||||||
|
schProcessOnTaskFailure(&job, &task, 0);
|
||||||
|
task.status = 0;
|
||||||
|
|
||||||
|
task.level = &level;
|
||||||
|
schProcessOnTaskFailure(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR);
|
||||||
|
memset(&level, 0, sizeof(level));
|
||||||
|
task.level = NULL;
|
||||||
|
|
||||||
|
subplan.subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
|
task.plan = &subplan;
|
||||||
|
SEpSet epset = {0};
|
||||||
|
epset.numOfEps = 127;
|
||||||
|
schChkUpdateRedirectCtx(&job, &task, &epset, 0);
|
||||||
|
|
||||||
|
schChkUpdateRedirectCtx(&job, &task, NULL, 0);
|
||||||
|
task.plan = NULL;
|
||||||
|
|
||||||
|
schPushTaskToExecList(&job, &task);
|
||||||
|
|
||||||
|
job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
taosHashPut(job.execTasks, &task.taskId, sizeof(task.taskId), &task, POINTER_BYTES);
|
||||||
|
schPushTaskToExecList(&job, &task);
|
||||||
|
taosHashCleanup(job.execTasks);
|
||||||
|
job.execTasks = NULL;
|
||||||
|
|
||||||
|
bool needRetry = false;
|
||||||
|
task.timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC / 2 + 1;
|
||||||
|
task.retryTimes = 0;
|
||||||
|
task.maxRetryTimes = 0;
|
||||||
|
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
|
||||||
|
|
||||||
|
task.execId = 0;
|
||||||
|
task.retryTimes = 0;
|
||||||
|
task.maxRetryTimes = 100;
|
||||||
|
task.maxExecTimes = 1;
|
||||||
|
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
|
||||||
|
|
||||||
|
|
||||||
|
task.execId = 0;
|
||||||
|
task.retryTimes = 0;
|
||||||
|
task.maxRetryTimes = 100;
|
||||||
|
task.maxExecTimes = 100;
|
||||||
|
task.lastMsgType = TDMT_SCH_LINK_BROKEN;
|
||||||
|
schTaskCheckSetRetry(&job, &task, TSDB_CODE_SCH_TIMEOUT_ERROR, &needRetry);
|
||||||
|
|
||||||
|
schSetAddrsFromNodeList(&job, &task);
|
||||||
|
|
||||||
|
schSwitchTaskCandidateAddr(&job, &task);
|
||||||
|
|
||||||
|
|
||||||
|
task.candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
|
||||||
|
SQueryNodeAddr addr = {0};
|
||||||
|
taosArrayPush(task.candidateAddrs, &addr);
|
||||||
|
taosArrayPush(task.candidateAddrs, &addr);
|
||||||
|
schMgmt.cfg.schPolicy = SCH_LOAD_SEQ;
|
||||||
|
task.candidateIdx = 1;
|
||||||
|
schSwitchTaskCandidateAddr(&job, &task);
|
||||||
|
|
||||||
|
schMgmt.cfg.schPolicy = SCH_RANDOM;
|
||||||
|
schSwitchTaskCandidateAddr(&job, &task);
|
||||||
|
taosArrayDestroy(task.candidateAddrs);
|
||||||
|
task.candidateAddrs = NULL;
|
||||||
|
memset(&schMgmt.cfg, 0, sizeof(schMgmt.cfg));
|
||||||
|
task.candidateIdx = 0;
|
||||||
|
|
||||||
|
schDropTaskOnExecNode(&job, &task);
|
||||||
|
|
||||||
|
schNotifyTaskOnExecNode(&job, &task, TASK_NOTIFY_FINISHED);
|
||||||
|
|
||||||
|
schLaunchRemoteTask(&job, &task);
|
||||||
|
|
||||||
|
SSchTaskCtx* pCtx = (SSchTaskCtx*)taosMemoryCalloc(1, sizeof(SSchTaskCtx));
|
||||||
|
pCtx->jobRid = -1;
|
||||||
|
schLaunchTaskImpl((void*)pCtx);
|
||||||
|
|
||||||
|
task.plan = &subplan;
|
||||||
|
subplan.subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
|
job.attr.needFlowCtrl = true;
|
||||||
|
level.taskNum = 1000;
|
||||||
|
task.level = &level;
|
||||||
|
schLaunchTask(&job, &task);
|
||||||
|
task.plan = NULL;
|
||||||
|
task.level = NULL;
|
||||||
|
job.attr.needFlowCtrl = false;
|
||||||
|
|
||||||
|
SSchTimerParam param = {0};
|
||||||
|
param.rId = -1;
|
||||||
|
schHandleTimerEvent(¶m, NULL);
|
||||||
|
|
||||||
|
job.execTasks = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
task.delayExecMs = 1;
|
||||||
|
schMgmt.timer = NULL;
|
||||||
|
schDelayLaunchTask(&job, &task);
|
||||||
|
task.delayExecMs = 0;
|
||||||
|
taosHashCleanup(job.execTasks);
|
||||||
|
job.execTasks = NULL;
|
||||||
|
|
||||||
|
job.fetchRes = (void*)0x1;
|
||||||
|
schLaunchFetchTask(&job);
|
||||||
|
job.fetchRes = NULL;
|
||||||
|
|
||||||
|
job.fetchTask = &task;
|
||||||
|
job.attr.localExec = true;
|
||||||
|
job.attr.queryJob = true;
|
||||||
|
subplan.subplanType = SUBPLAN_TYPE_MERGE;
|
||||||
|
task.plan = &subplan;
|
||||||
|
void* p = taosMemoryCalloc(1, 1024);
|
||||||
|
schMgmt.queryMgmt = p;
|
||||||
|
schLaunchFetchTask(&job);
|
||||||
|
memset(&job, 0, sizeof(job));
|
||||||
|
memset(&subplan, 0, sizeof(subplan));
|
||||||
|
task.plan = NULL;
|
||||||
|
taosMemoryFreeClear(schMgmt.queryMgmt);
|
||||||
|
|
||||||
|
// flow ctrl
|
||||||
|
|
||||||
|
job.flowCtrl = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
SEp sep = {0};
|
||||||
|
SSchFlowControl nctrl = {0};
|
||||||
|
nctrl.taskList = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
taosHashPut(job.flowCtrl, &sep, sizeof(SEp), &nctrl, sizeof(nctrl));
|
||||||
|
schFreeFlowCtrl(&job);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
schMgmt.jobRef = -1;
|
schMgmt.jobRef = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue