diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 6d2215264e..78e876f82c 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -36,9 +36,9 @@ #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" +#include "tmisce.h" #include "trpc.h" #include "tvariant.h" -#include "tmisce.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -54,7 +54,8 @@ namespace { -extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode); +extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, + int32_t rspCode); extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode); int64_t insertJobRefId = 0; @@ -74,8 +75,9 @@ int32_t schtStartFetch = 0; void schtInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; - + rpcInit(); tsAsyncLog = 0; + rpcInit(); qDebugFlag = 159; strcpy(tsLogDir, TD_LOG_DIR_PATH); @@ -84,12 +86,10 @@ void schtInitLogFile() { } } -void schtQueryCb(SExecResult *pResult, void *param, int32_t code) { - *(int32_t *)param = 1; -} +void schtQueryCb(SExecResult *pResult, void *param, int32_t code) { *(int32_t *)param = 1; } -int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) { - SQueryTableRsp rsp = {0}; +int32_t schtBuildQueryRspMsg(uint32_t *msize, void **rspMsg) { + SQueryTableRsp rsp = {0}; rsp.code = 0; rsp.affectedRows = 0; rsp.tbVerInfo = NULL; @@ -99,7 +99,7 @@ int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) { qError("tSerializeSQueryTableRsp failed"); return TSDB_CODE_OUT_OF_MEMORY; } - + void *pRsp = taosMemoryCalloc(msgSize, 1); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); @@ -117,9 +117,8 @@ int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) { return TSDB_CODE_SUCCESS; } - -int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) { - SRetrieveTableRsp* rsp = (SRetrieveTableRsp*)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1); +int32_t schtBuildFetchRspMsg(uint32_t *msize, void **rspMsg) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1); rsp->completed = 1; rsp->numOfRows = 10; rsp->compLen = 0; @@ -130,14 +129,14 @@ int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) { return TSDB_CODE_SUCCESS; } -int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) { +int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) { SSubmitRsp2 submitRsp = {0}; - int32_t msgSize = 0, ret = 0; - SEncoder ec = {0}; - + int32_t msgSize = 0, ret = 0; + SEncoder ec = {0}; + tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret); - void* msg = taosMemoryCalloc(1, msgSize); - tEncoderInit(&ec, (uint8_t*)msg, msgSize); + void *msg = taosMemoryCalloc(1, msgSize); + tEncoderInit(&ec, (uint8_t *)msg, msgSize); tEncodeSSubmitRsp2(&ec, &submitRsp); tEncoderClear(&ec); @@ -147,7 +146,6 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) { return TSDB_CODE_SUCCESS; } - void schtBuildQueryDag(SQueryPlan *dag) { uint64_t qId = schtQueryId; @@ -157,8 +155,8 @@ void schtBuildQueryDag(SQueryPlan *dag) { SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); - SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); - SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; @@ -210,7 +208,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); - SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); merge->pNodeList = nodesMakeList(); scan->pNodeList = nodesMakeList(); @@ -218,7 +216,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { mergePlan->pChildren = nodesMakeList(); for (int32_t i = 0; i < scanPlanNum; ++i) { - SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003 + i; @@ -272,7 +270,7 @@ void schtBuildInsertDag(SQueryPlan *dag) { SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); inserta->pNodeList = nodesMakeList(); - SSubplan *insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -287,14 +285,14 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pChildren = NULL; insertPlan->pParents = NULL; insertPlan->pNode = NULL; - insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); - ((SDataInserterNode*)insertPlan->pDataSink)->size = 1; - ((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; + ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); insertPlan->msgType = TDMT_VND_SUBMIT; nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); - insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -309,9 +307,9 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pChildren = NULL; insertPlan->pParents = NULL; insertPlan->pNode = NULL; - insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); - ((SDataInserterNode*)insertPlan->pDataSink)->size = 1; - ((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; + ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); insertPlan->msgType = TDMT_VND_SUBMIT; nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); @@ -389,7 +387,8 @@ void schtSetRpcSendRequest() { } } -int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, bool persistHandle, void* rpcCtx) { +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, + bool persistHandle, void *rpcCtx) { if (pInfo) { taosMemoryFreeClear(pInfo->param); taosMemoryFreeClear(pInfo->msgInfo.pData); @@ -439,11 +438,11 @@ void *schtSendRsp(void *param) { SSchTask *task = *(SSchTask **)pIter; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildSubmitRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_VND_SUBMIT_RSP; msg.pData = rmsg; - + schHandleResponseMsg(pJob, task, task->execId, &msg, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -452,7 +451,7 @@ void *schtSendRsp(void *param) { schReleaseJob(job); schtJobDone = true; - + return NULL; } @@ -462,13 +461,13 @@ void *schtCreateFetchRspThread(void *param) { taosSsleep(1); - int32_t code = 0; + int32_t code = 0; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildFetchRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_MERGE_FETCH_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0); schReleaseJob(job); @@ -529,7 +528,7 @@ void *schtRunJobThread(void *aa) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); schtInitLogFile(); @@ -601,7 +600,7 @@ void *schtRunJobThread(void *aa) { param->taskId = task->taskId; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -622,7 +621,7 @@ void *schtRunJobThread(void *aa) { param->taskId = task->taskId - 1; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -686,7 +685,6 @@ void *schtFreeJobThread(void *aa) { return NULL; } - } // namespace TEST(queryTest, normalCase) { @@ -696,7 +694,7 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); @@ -737,13 +735,13 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); - + ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -753,13 +751,13 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; if (JOB_TASK_STATUS_EXEC == task->status) { SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); - + ASSERT_EQ(code, 0); } @@ -793,7 +791,7 @@ TEST(queryTest, normalCase) { taosMemoryFreeClear(data); schReleaseJob(job); - + schedulerDestroy(); schedulerFreeJob(&job, 0); @@ -808,7 +806,7 @@ TEST(queryTest, readyFirstCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); @@ -816,7 +814,7 @@ TEST(queryTest, readyFirstCase) { load.addr.epSet.numOfEps = 1; strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); load.addr.epSet.eps[0].port = 6031; - taosArrayPush(qnodeList, &load); + taosArrayPush(qnodeList, &load); int32_t code = schedulerInit(); ASSERT_EQ(code, 0); @@ -848,11 +846,11 @@ TEST(queryTest, readyFirstCase) { SSchTask *task = *(SSchTask **)pIter; SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); ASSERT_EQ(code, 0); @@ -865,13 +863,13 @@ TEST(queryTest, readyFirstCase) { if (JOB_TASK_STATUS_EXEC == task->status) { SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); - + ASSERT_EQ(code, 0); } @@ -919,7 +917,7 @@ TEST(queryTest, flowCtrlCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); schtInitLogFile(); @@ -933,7 +931,6 @@ TEST(queryTest, flowCtrlCase) { load.addr.epSet.eps[0].port = 6031; taosArrayPush(qnodeList, &load); - int32_t code = schedulerInit(); ASSERT_EQ(code, 0); @@ -968,13 +965,13 @@ TEST(queryTest, flowCtrlCase) { if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) { SDataBuf msg = {0}; - void* rmsg = NULL; + void *rmsg = NULL; schtBuildQueryRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; - + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); - + ASSERT_EQ(code, 0); } @@ -1005,7 +1002,7 @@ TEST(queryTest, flowCtrlCase) { schedulerFreeJob(&job, 0); - taosThreadJoin(thread1, NULL); + taosThreadJoin(thread1, NULL); } TEST(insertTest, normalCase) { @@ -1014,7 +1011,7 @@ TEST(insertTest, normalCase) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + SQueryPlan *dag = (SQueryPlan *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); uint64_t numOfRows = 0; SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); @@ -1067,7 +1064,7 @@ TEST(insertTest, normalCase) { schedulerDestroy(); - taosThreadJoin(thread1, NULL); + taosThreadJoin(thread1, NULL); } TEST(multiThread, forceFree) { @@ -1076,7 +1073,7 @@ TEST(multiThread, forceFree) { TdThread thread1, thread2, thread3; taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL); -// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL); + // taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL); taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL); while (true) { @@ -1089,7 +1086,7 @@ TEST(multiThread, forceFree) { } schtTestStop = true; - //taosSsleep(3); + // taosSsleep(3); } TEST(otherTest, otherCase) { @@ -1097,12 +1094,13 @@ TEST(otherTest, otherCase) { schReleaseJob(0); schFreeRpcCtx(NULL); - ASSERT_EQ(schDumpEpSet(NULL), (char*)NULL); + ASSERT_EQ(schDumpEpSet(NULL), (char *)NULL); ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0); ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0); } int main(int argc, char **argv) { + schtInitLogFile(); taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ffcb1fbdb5..9818394a2a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -832,6 +832,9 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { taosInitRWLatch(&exh->latch); exh->refId = transAddExHandle(transGetRefMgt(), exh); + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); + ASSERT(exh == self); + QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); @@ -2829,10 +2832,11 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); - QUEUE_INIT(&exh->q); - taosInitRWLatch(&exh->latch); exh->refId = transAddExHandle(transGetRefMgt(), exh); + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); + ASSERT(exh == self); + QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId);