diff --git a/source/libs/catalog/CMakeLists.txt b/source/libs/catalog/CMakeLists.txt index 3bdb0a9b1d..dd7220da15 100644 --- a/source/libs/catalog/CMakeLists.txt +++ b/source/libs/catalog/CMakeLists.txt @@ -11,6 +11,6 @@ target_link_libraries( PRIVATE os util transport qcom nodes ) -#if(${BUILD_TEST}) -# ADD_SUBDIRECTORY(test) -#endif(${BUILD_TEST}) +if(${BUILD_TEST} AND NOT ${TD_WINDOWS}) + ADD_SUBDIRECTORY(test) +endif() diff --git a/source/libs/executor/src/hashjoin.c b/source/libs/executor/src/hashjoin.c index f63b4093db..da7686cce6 100755 --- a/source/libs/executor/src/hashjoin.c +++ b/source/libs/executor/src/hashjoin.c @@ -83,6 +83,8 @@ int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) { return code; } +#ifdef HASH_JOIN_FULL + int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) { bool allFetched = false; SHJoinCtx* pCtx = &pJoin->ctx; @@ -346,4 +348,5 @@ int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +#endif diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 64ce62cb66..73a5139e43 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -89,7 +89,7 @@ int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) { case JOIN_TYPE_RIGHT: { switch (pJoin->subType) { case JOIN_STYPE_OUTER: - pJoin->joinFp = hLeftJoinDo; + //pJoin->joinFp = hLeftJoinDo; TOOPEN break; default: break; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 690d38aac0..6d637bee98 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -137,6 +137,8 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { } int32_t initTaskQueue() { + memset(&taskQueue, 0, sizeof(taskQueue)); + taskQueue.wrokrerPool.name = "taskWorkPool"; taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads; taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d15ac7a791..a031bc08de 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -531,8 +531,8 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code); if (pMsg) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); } return TSDB_CODE_SUCCESS; } @@ -545,8 +545,8 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { qDebug("handle %p is broken", pMsg->handle); if (head->isHbParam) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0}; @@ -1293,6 +1293,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } break; } +/* case TDMT_SCH_QUERY_HEARTBEAT: { SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx)); @@ -1320,6 +1321,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, persistHandle = true; break; } +*/ case TDMT_SCH_TASK_NOTIFY: { ETaskNotifyType* pType = param; STaskNotifyReq qMsg; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index cb8a68fe4f..cabaa65f19 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -452,6 +452,8 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } +#if 0 + int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; @@ -593,6 +595,7 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +#endif int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -869,6 +872,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +#if 0 int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { @@ -900,6 +904,7 @@ _return: return code; } +#endif int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index a9878ec9a9..f112376299 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -57,6 +57,9 @@ namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, uint64_t sId, int32_t execId, SDataBuf *pMsg, int32_t rspCode); extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode); +extern "C" int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code); +extern "C" int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code); +extern "C" int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask); int64_t insertJobRefId = 0; int64_t queryJobRefId = 0; @@ -316,7 +319,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { scanPlan->execNode.nodeId = 1 + i; scanPlan->execNode.epSet.inUse = 0; - scanPlan->execNodeStat.tableNum = taosRand() % 30; + scanPlan->execNodeStat.tableNum = taosRand() % 100; addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030); addEpIntoEpSet(&scanPlan->execNode.epSet, "ep1", 6030); addEpIntoEpSet(&scanPlan->execNode.epSet, "ep2", 6030); @@ -982,8 +985,157 @@ TEST(queryTest, normalCase) { schedulerFreeJob(&job, 0); (void)taosThreadJoin(thread1, NULL); + + schMgmt.jobRef = -1; } +TEST(queryTest, rescheduleCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + int64_t job = 0; + SQueryPlan *dag = NULL; + int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN, (SNode**)&dag); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); + + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + TAOS_STRCPY(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + assert(taosArrayPush(qnodeList, &load) != NULL); + + code = schedulerInit(); + ASSERT_EQ(code, 0); + + schtBuildQueryDag(dag); + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + int32_t queryDone = 0; + + SRequestConnInfo conn = {0}; + conn.pTrans = mockPointer; + SSchedulerReq req = {0}; + req.pConn = &conn; + req.pNodeList = qnodeList; + req.pDag = dag; + req.sql = "select * from tb"; + req.execFp = schtQueryCb; + req.cbParam = &queryDone; + + code = schedulerExecJob(&req, &job); + ASSERT_EQ(code, 0); + + SSchJob *pJob = NULL; + code = schAcquireJob(job, &pJob); + ASSERT_EQ(code, 0); + + schedulerEnableReSchedule(true); + + void *pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + task->timeoutUsec = -1; + + code = schRescheduleTask(pJob, task); + ASSERT_EQ(code, 0); + + task->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SDataBuf msg = {0}; + void *rmsg = NULL; + assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg)); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + task->timeoutUsec = -1; + + code = schRescheduleTask(pJob, task); + ASSERT_EQ(code, 0); + + task->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + if (JOB_TASK_STATUS_EXEC == task->status) { + SDataBuf msg = {0}; + void *rmsg = NULL; + assert(0 == schtBuildQueryRspMsg(&msg.len, &rmsg)); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->seriousId, task->execId, &msg, 0); + + ASSERT_EQ(code, 0); + } + + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + + TdThreadAttr thattr; + assert(0 == taosThreadAttrInit(&thattr)); + + TdThread thread1; + assert(0 == taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job)); + + void *data = NULL; + req.syncReq = true; + req.pFetchRes = &data; + + code = schedulerFetchRows(job, &req); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + taosMemoryFreeClear(data); + + (void)schReleaseJob(job); + + schedulerDestroy(); + + schedulerFreeJob(&job, 0); + + (void)taosThreadJoin(thread1, NULL); + + schMgmt.jobRef = -1; +} + + TEST(queryTest, readyFirstCase) { void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; @@ -1097,6 +1249,7 @@ TEST(queryTest, readyFirstCase) { schedulerFreeJob(&job, 0); (void)taosThreadJoin(thread1, NULL); + schMgmt.jobRef = -1; } TEST(queryTest, flowCtrlCase) { @@ -1196,6 +1349,9 @@ TEST(queryTest, flowCtrlCase) { schedulerFreeJob(&job, 0); (void)taosThreadJoin(thread1, NULL); + schMgmt.jobRef = -1; + + cleanupTaskQueue(); } TEST(insertTest, normalCase) { @@ -1260,6 +1416,7 @@ TEST(insertTest, normalCase) { schedulerDestroy(); (void)taosThreadJoin(thread1, NULL); + schMgmt.jobRef = -1; } TEST(multiThread, forceFree) { @@ -1282,9 +1439,11 @@ TEST(multiThread, forceFree) { schtTestStop = true; // taosSsleep(3); + + schMgmt.jobRef = -1; } -TEST(otherTest, otherCase) { +TEST(otherTest, function) { // excpet test (void)schReleaseJob(0); schFreeRpcCtx(NULL); @@ -1293,6 +1452,39 @@ TEST(otherTest, otherCase) { ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS); ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0); ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0); + + SSchTaskCallbackParam param = {0}; + SDataBuf dataBuf = {0}; + dataBuf.pData = taosMemoryMalloc(1); + dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet)); + ASSERT_EQ(schHandleNotifyCallback(¶m, &dataBuf, TSDB_CODE_SUCCESS), TSDB_CODE_SUCCESS); + + SSchCallbackParamHeader param2 = {0}; + dataBuf.pData = taosMemoryMalloc(1); + dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet)); + schHandleLinkBrokenCallback(¶m2, &dataBuf, TSDB_CODE_SUCCESS); + param2.isHbParam = true; + dataBuf.pData = taosMemoryMalloc(1); + dataBuf.pEpSet = (SEpSet*)taosMemoryMalloc(sizeof(*dataBuf.pEpSet)); + schHandleLinkBrokenCallback(¶m2, &dataBuf, TSDB_CODE_SUCCESS); + + schMgmt.jobRef = -1; +} + +void schtReset() { + insertJobRefId = 0; + queryJobRefId = 0; + + schtJobDone = false; + schtMergeTemplateId = 0x4; + schtFetchTaskId = 0; + schtQueryId = 1; + + schtTestStop = false; + schtTestDeadLoop = false; + schtTestMTRunSec = 1; + schtTestPrintNum = 1000; + schtStartFetch = 0; } int main(int argc, char **argv) { @@ -1302,7 +1494,17 @@ int main(int argc, char **argv) { } taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); + + int code = 0; + for (int32_t i = 0; i < 10; ++i) { + schtReset(); + code = RUN_ALL_TESTS(); + if (code) { + break; + } + } + + return code; } #pragma GCC diagnostic pop diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 6370e6ca50..dbd8cb159e 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -823,6 +823,8 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { int32_t code; + pool->exit = false; + (void)taosThreadMutexInit(&pool->poolLock, NULL); (void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); diff --git a/tests/script/tsim/join/join_explain.sim b/tests/script/tsim/join/join_explain.sim index f9d2f3eac1..2858999de5 100644 --- a/tests/script/tsim/join/join_explain.sim +++ b/tests/script/tsim/join/join_explain.sim @@ -39,6 +39,7 @@ sql explain analyze verbose true select a.ts from sta a join sta b on a.col1 = b sql explain analyze verbose true select a.ts from sta a join sta b where a.ts=b.ts; sql_error explain analyze verbose true select a.ts from sta a ,sta b on a.ts=b.ts; sql explain analyze verbose true select a.ts from sta a ,sta b where a.ts=b.ts; +sql explain analyze verbose true select a.ts from sta a ,sta b where a.t1 = b.t1 and a.ts=b.ts; sql explain analyze verbose true select a.ts from sta a ,sta b where a.ts=b.ts and a.col1 + 1 = b.col1; sql explain analyze verbose true select b.col1 from sta a ,sta b where a.ts=b.ts and a.col1 + 1 = b.col1 order by a.ts; sql explain analyze verbose true select b.col1 from sta a join sta b join sta c where a.ts=b.ts and b.ts = c.ts order by a.ts;