fix: add ut cases

This commit is contained in:
dapan1121 2024-12-20 13:52:53 +08:00
parent a24c594746
commit b1702132b4
9 changed files with 228 additions and 11 deletions

View File

@ -11,6 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom nodes
)
#if(${BUILD_TEST})
# ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST})
if(${BUILD_TEST} AND NOT ${TD_WINDOWS})
ADD_SUBDIRECTORY(test)
endif()

View File

@ -83,6 +83,8 @@ int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
return code;
}
#ifdef HASH_JOIN_FULL
int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
@ -346,4 +348,5 @@ int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
#endif

View File

@ -89,7 +89,7 @@ int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) {
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
case JOIN_STYPE_OUTER:
pJoin->joinFp = hLeftJoinDo;
//pJoin->joinFp = hLeftJoinDo; TOOPEN
break;
default:
break;

View File

@ -137,6 +137,8 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
}
int32_t initTaskQueue() {
memset(&taskQueue, 0, sizeof(taskQueue));
taskQueue.wrokrerPool.name = "taskWorkPool";
taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;

View File

@ -531,8 +531,8 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x",
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
}
return TSDB_CODE_SUCCESS;
}
@ -545,8 +545,8 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("handle %p is broken", pMsg->handle);
if (head->isHbParam) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
@ -1293,6 +1293,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
break;
}
/*
case TDMT_SCH_QUERY_HEARTBEAT: {
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
@ -1320,6 +1321,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true;
break;
}
*/
case TDMT_SCH_TASK_NOTIFY: {
ETaskNotifyType* pType = param;
STaskNotifyReq qMsg;

View File

@ -452,6 +452,8 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
}
#if 0
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
@ -593,6 +595,7 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
#endif
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
@ -869,6 +872,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
#if 0
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
@ -900,6 +904,7 @@ _return:
return code;
}
#endif
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);

View File

@ -57,6 +57,9 @@ namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t sId, int32_t execId, SDataBuf *pMsg,
int32_t rspCode);
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode);
extern "C" int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code);
extern "C" int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code);
extern "C" int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask);
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
@ -316,7 +319,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan->execNode.nodeId = 1 + i;
scanPlan->execNode.epSet.inUse = 0;
scanPlan->execNodeStat.tableNum = taosRand() % 30;
scanPlan->execNodeStat.tableNum = taosRand() % 100;
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030);
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep1", 6030);
addEpIntoEpSet(&scanPlan->execNode.epSet, "ep2", 6030);
@ -982,8 +985,157 @@ TEST(queryTest, normalCase) {
schedulerFreeJob(&job, 0);
(void)taosThreadJoin(thread1, NULL);
schMgmt.jobRef = -1;
}
TEST(queryTest, rescheduleCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan *dag = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN, (SNode**)&dag);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
assert(taosArrayPush(qnodeList, &load) != NULL);
code = schedulerInit();
ASSERT_EQ(code, 0);
schtBuildQueryDag(dag);
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
int32_t queryDone = 0;
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = NULL;
code = schAcquireJob(job, &pJob);
ASSERT_EQ(code, 0);
schedulerEnableReSchedule(true);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
task->timeoutUsec = -1;
code = schRescheduleTask(pJob, task);
ASSERT_EQ(code, 0);
task->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SDataBuf msg = {0};
void *rmsg = NULL;
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
task->timeoutUsec = -1;
code = schRescheduleTask(pJob, task);
ASSERT_EQ(code, 0);
task->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
if (JOB_TASK_STATUS_EXEC == task->status) {
SDataBuf msg = {0};
void *rmsg = NULL;
assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg));
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0);
ASSERT_EQ(code, 0);
}
pIter = taosHashIterate(pJob->execTasks, pIter);
}
while (true) {
if (queryDone) {
break;
}
taosUsleep(10000);
}
TdThreadAttr thattr;
assert(0 == taosThreadAttrInit(&thattr));
TdThread thread1;
assert(0 == taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job));
void *data = NULL;
req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
taosMemoryFreeClear(data);
(void)schReleaseJob(job);
schedulerDestroy();
schedulerFreeJob(&job, 0);
(void)taosThreadJoin(thread1, NULL);
schMgmt.jobRef = -1;
}
TEST(queryTest, readyFirstCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
@ -1097,6 +1249,7 @@ TEST(queryTest, readyFirstCase) {
schedulerFreeJob(&job, 0);
(void)taosThreadJoin(thread1, NULL);
schMgmt.jobRef = -1;
}
TEST(queryTest, flowCtrlCase) {
@ -1196,6 +1349,9 @@ TEST(queryTest, flowCtrlCase) {
schedulerFreeJob(&job, 0);
(void)taosThreadJoin(thread1, NULL);
schMgmt.jobRef = -1;
cleanupTaskQueue();
}
TEST(insertTest, normalCase) {
@ -1260,6 +1416,7 @@ TEST(insertTest, normalCase) {
schedulerDestroy();
(void)taosThreadJoin(thread1, NULL);
schMgmt.jobRef = -1;
}
TEST(multiThread, forceFree) {
@ -1282,9 +1439,11 @@ TEST(multiThread, forceFree) {
schtTestStop = true;
// taosSsleep(3);
schMgmt.jobRef = -1;
}
TEST(otherTest, otherCase) {
TEST(otherTest, function) {
// excpet test
(void)schReleaseJob(0);
schFreeRpcCtx(NULL);
@ -1293,6 +1452,39 @@ TEST(otherTest, otherCase) {
ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS);
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
SSchTaskCallbackParam param = {0};
SDataBuf dataBuf = {0};
dataBuf.pData = taosMemoryMalloc(1);
dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet));
ASSERT_EQ(schHandleNotifyCallback(&param, &dataBuf, TSDB_CODE_SUCCESS), TSDB_CODE_SUCCESS);
SSchCallbackParamHeader param2 = {0};
dataBuf.pData = taosMemoryMalloc(1);
dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet));
schHandleLinkBrokenCallback(&param2, &dataBuf, TSDB_CODE_SUCCESS);
param2.isHbParam = true;
dataBuf.pData = taosMemoryMalloc(1);
dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet));
schHandleLinkBrokenCallback(&param2, &dataBuf, TSDB_CODE_SUCCESS);
schMgmt.jobRef = -1;
}
void schtReset() {
insertJobRefId = 0;
queryJobRefId = 0;
schtJobDone = false;
schtMergeTemplateId = 0x4;
schtFetchTaskId = 0;
schtQueryId = 1;
schtTestStop = false;
schtTestDeadLoop = false;
schtTestMTRunSec = 1;
schtTestPrintNum = 1000;
schtStartFetch = 0;
}
int main(int argc, char **argv) {
@ -1302,7 +1494,17 @@ int main(int argc, char **argv) {
}
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
int code = 0;
for (int32_t i = 0; i < 10; ++i) {
schtReset();
code = RUN_ALL_TESTS();
if (code) {
break;
}
}
return code;
}
#pragma GCC diagnostic pop

View File

@ -823,6 +823,8 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
int32_t code;
pool->exit = false;
(void)taosThreadMutexInit(&pool->poolLock, NULL);
(void)taosThreadMutexInit(&pool->backupLock, NULL);
(void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);

View File

@ -39,6 +39,7 @@ sql explain analyze verbose true select a.ts from sta a join sta b on a.col1 = b
sql explain analyze verbose true select a.ts from sta a join sta b where a.ts=b.ts;
sql_error explain analyze verbose true select a.ts from sta a ,sta b on a.ts=b.ts;
sql explain analyze verbose true select a.ts from sta a ,sta b where a.ts=b.ts;
sql explain analyze verbose true select a.ts from sta a ,sta b where a.t1 = b.t1 and a.ts=b.ts;
sql explain analyze verbose true select a.ts from sta a ,sta b where a.ts=b.ts and a.col1 + 1 = b.col1;
sql explain analyze verbose true select b.col1 from sta a ,sta b where a.ts=b.ts and a.col1 + 1 = b.col1 order by a.ts;
sql explain analyze verbose true select b.col1 from sta a join sta b join sta c where a.ts=b.ts and b.ts = c.ts order by a.ts;