Merge pull request #26576 from taosdata/fix/TD-30966.2
fix: qworker UT case
This commit is contained in:
commit
7bbe84a1b0
|
@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
|
||||||
|
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
contLen = tSerializeSEpSet(NULL, 0, pEpSet);
|
contLen = tSerializeSEpSet(NULL, 0, pEpSet);
|
||||||
|
if (contLen < 0) {
|
||||||
|
qError("tSerializeSEpSet failed, code:%x", terrno);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
rsp = rpcMallocCont(contLen);
|
rsp = rpcMallocCont(contLen);
|
||||||
tSerializeSEpSet(rsp, contLen, pEpSet);
|
if (NULL == rsp) {
|
||||||
|
qError("rpcMallocCont %d failed, code:%x", contLen, terrno);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
contLen = tSerializeSEpSet(rsp, contLen, pEpSet);
|
||||||
|
if (contLen < 0) {
|
||||||
|
qError("tSerializeSEpSet second failed, code:%x", terrno);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
|
@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
|
||||||
epSet.eps[2].port = 7300;
|
epSet.eps[2].port = 7300;
|
||||||
|
|
||||||
ctx->phase = QW_PHASE_POST_QUERY;
|
ctx->phase = QW_PHASE_POST_QUERY;
|
||||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
|
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error
|
||||||
*rsped = true;
|
*rsped = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
|
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
|
||||||
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
|
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
|
||||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
|
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
|
||||||
*rsped = true;
|
*rsped = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
|
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
|
||||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
|
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
|
||||||
*rsped = true;
|
*rsped = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
||||||
int32_t code) {
|
int32_t code) {
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
|
if (NULL == pRsp) {
|
||||||
|
QW_RET(terrno);
|
||||||
|
}
|
||||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
dataLength = 0;
|
dataLength = 0;
|
||||||
}
|
}
|
||||||
|
@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
||||||
#if 0
|
#if 0
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||||
|
if (NULL == pRsp) {
|
||||||
|
QW_RET(terrno);
|
||||||
|
}
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
|
@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||||
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
||||||
|
if (NULL == pRsp) {
|
||||||
|
QW_RET(terrno);
|
||||||
|
}
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
|
@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
SSubQueryMsg msg = {0};
|
SSubQueryMsg msg = {0};
|
||||||
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||||
|
@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
int32_t eId = msg.execId;
|
int32_t eId = msg.execId;
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
|
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||||
qwAbortPrerocessQuery(QW_FPARAMS());
|
code = qwAbortPrerocessQuery(QW_FPARAMS());
|
||||||
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
|
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
|
||||||
|
|
||||||
tFreeSSubQueryMsg(&msg);
|
tFreeSSubQueryMsg(&msg);
|
||||||
|
|
||||||
|
@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
|
||||||
|
|
||||||
SSubQueryMsg msg = {0};
|
SSubQueryMsg msg = {0};
|
||||||
|
@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
||||||
SQWTaskCtx *handles = NULL;
|
SQWTaskCtx *handles = NULL;
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
|
||||||
|
|
||||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||||
|
@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
SResFetchReq req = {0};
|
SResFetchReq req = {0};
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
|
||||||
|
|
||||||
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||||
|
@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||||
|
|
||||||
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
|
int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg);
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
|
QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
if (mgmt) {
|
if (mgmt) {
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STaskCancelReq *msg = pMsg->pCont;
|
STaskCancelReq *msg = pMsg->pCont;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
|
||||||
|
|
||||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||||
|
@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
|
||||||
|
|
||||||
STaskDropReq msg = {0};
|
STaskDropReq msg = {0};
|
||||||
|
@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
|
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||||
|
|
||||||
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
|
code = qwProcessDrop(QW_FPARAMS(), &qwMsg);
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
|
QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
|
||||||
|
|
||||||
STaskNotifyReq msg = {0};
|
STaskNotifyReq msg = {0};
|
||||||
|
@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
|
QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||||
|
|
||||||
QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg));
|
code = qwProcessNotify(QW_FPARAMS(), &qwMsg);
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processNotify end, node:%p", node);
|
QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
||||||
SSchedulerHbReq req = {0};
|
SSchedulerHbReq req = {0};
|
||||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||||
|
|
||||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
|
||||||
|
|
||||||
if (NULL == pMsg->pCont) {
|
if (NULL == pMsg->pCont) {
|
||||||
|
@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
||||||
|
|
||||||
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
|
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||||
|
|
||||||
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
|
code = qwProcessHb(mgmt, &qwMsg, &req);
|
||||||
|
|
||||||
QW_SCH_DLOG("processHb end, node:%p", node);
|
QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
|
||||||
|
|
||||||
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
|
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
|
||||||
|
|
||||||
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
|
QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req));
|
||||||
|
|
||||||
uint64_t sId = req.sId;
|
uint64_t sId = req.sId;
|
||||||
uint64_t qId = req.queryId;
|
uint64_t qId = req.queryId;
|
||||||
|
|
|
@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) {
|
||||||
|
|
||||||
|
|
||||||
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||||
|
|
||||||
ctx->explainRsped = true;
|
ctx->explainRsped = true;
|
||||||
|
|
||||||
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
|
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
|
||||||
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
|
if (NULL == execInfoList) {
|
||||||
|
QW_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList));
|
||||||
|
|
||||||
if (ctx->localExec) {
|
if (ctx->localExec) {
|
||||||
SExplainLocalRsp localRsp = {0};
|
SExplainLocalRsp localRsp = {0};
|
||||||
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
|
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
|
||||||
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
|
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
|
||||||
|
if (NULL == pExec) {
|
||||||
|
QW_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
||||||
localRsp.rsp.subplanInfo = pExec;
|
localRsp.rsp.subplanInfo = pExec;
|
||||||
localRsp.qId = qId;
|
localRsp.qId = qId;
|
||||||
localRsp.tId = tId;
|
localRsp.tId = tId;
|
||||||
localRsp.rId = rId;
|
localRsp.rId = rId;
|
||||||
localRsp.eId = eId;
|
localRsp.eId = eId;
|
||||||
taosArrayPush(ctx->explainRes, &localRsp);
|
if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) {
|
||||||
|
QW_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(execInfoList);
|
taosArrayDestroy(execInfoList);
|
||||||
|
execInfoList = NULL;
|
||||||
} else {
|
} else {
|
||||||
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
||||||
connInfo.ahandle = NULL;
|
connInfo.ahandle = NULL;
|
||||||
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
|
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
|
||||||
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
|
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
|
||||||
|
execInfoList = NULL;
|
||||||
|
|
||||||
QW_ERR_RET(code);
|
QW_ERR_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_return:
|
||||||
|
|
||||||
|
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||||
void qwCloseRef(void) {
|
void qwCloseRef(void) {
|
||||||
taosWLockLatch(&gQwMgmt.lock);
|
taosWLockLatch(&gQwMgmt.lock);
|
||||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||||
taosCloseRef(gQwMgmt.qwRef);
|
(void)taosCloseRef(gQwMgmt.qwRef); // ignore error
|
||||||
gQwMgmt.qwRef = -1;
|
gQwMgmt.qwRef = -1;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&gQwMgmt.lock);
|
taosWUnLockLatch(&gQwMgmt.lock);
|
||||||
|
@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
int32_t schStatusCount = 0;
|
int32_t schStatusCount = 0;
|
||||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||||
|
|
||||||
taosTmrStop(mgmt->hbTimer);
|
(void)taosTmrStop(mgmt->hbTimer); //ignore error
|
||||||
mgmt->hbTimer = NULL;
|
mgmt->hbTimer = NULL;
|
||||||
taosTmrCleanUp(mgmt->timer);
|
taosTmrCleanUp(mgmt->timer);
|
||||||
|
|
||||||
|
@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
||||||
return pStat->num ? (pStat->total / pStat->num) : 0;
|
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||||
default:
|
default:
|
||||||
qError("unsupported queue type %d", type);
|
qError("unsupported queue type %d", type);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
|
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t num = taosArrayGetSize(pExpiredSch);
|
int32_t num = taosArrayGetSize(pExpiredSch);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
uint64_t *sId = taosArrayGet(pExpiredSch, i);
|
uint64_t *sId = taosArrayGet(pExpiredSch, i);
|
||||||
SQWSchStatus *pSch = NULL;
|
SQWSchStatus *pSch = NULL;
|
||||||
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
|
if (NULL == sId) {
|
||||||
|
qError("get the %dth sch failed, code:%x", i, terrno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashGetSize(pSch->tasksHash) <= 0) {
|
if (taosHashGetSize(pSch->tasksHash) <= 0) {
|
||||||
qwDestroySchStatus(pSch);
|
qwDestroySchStatus(pSch);
|
||||||
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
|
code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
|
||||||
qDebug("sch %" PRIx64 " destroyed", *sId);
|
qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||||
|
|
|
@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (mgmt->refId >= 0) {
|
if (mgmt->refId >= 0) {
|
||||||
qwRelease(mgmt->refId); // ignore error
|
(void)qwRelease(mgmt->refId); // ignore error
|
||||||
} else {
|
} else {
|
||||||
taosHashCleanup(mgmt->schHash);
|
taosHashCleanup(mgmt->schHash);
|
||||||
taosHashCleanup(mgmt->ctxHash);
|
taosHashCleanup(mgmt->ctxHash);
|
||||||
|
|
|
@ -130,30 +130,32 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
|
||||||
fetchRpc->contLen = sizeof(SResFetchReq);
|
fetchRpc->contLen = sizeof(SResFetchReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
|
int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
|
||||||
dropMsg->sId = 1;
|
dropMsg->sId = 1;
|
||||||
dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
|
dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
|
||||||
dropMsg->taskId = 1;
|
dropMsg->taskId = 1;
|
||||||
|
|
||||||
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
|
int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, dropMsg);
|
||||||
if (msgSize < 0) {
|
if (msgSize < 0) {
|
||||||
return;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *msg = (char*)taosMemoryCalloc(1, msgSize);
|
char *msg = (char*)taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
return;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
|
if (tSerializeSTaskDropReq(msg, msgSize, dropMsg) < 0) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
return;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
dropRpc->msgType = TDMT_SCH_DROP_TASK;
|
dropRpc->msgType = TDMT_SCH_DROP_TASK;
|
||||||
dropRpc->pCont = msg;
|
dropRpc->pCont = msg;
|
||||||
dropRpc->contLen = msgSize;
|
dropRpc->contLen = msgSize;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
|
int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
|
||||||
|
@ -164,6 +166,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
|
||||||
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
taosWLockLatch(&qwtTestFetchQueueLock);
|
taosWLockLatch(&qwtTestFetchQueueLock);
|
||||||
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
|
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
|
||||||
|
if (NULL == newMsg) {
|
||||||
|
printf("malloc failed");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
||||||
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
|
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
|
||||||
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
|
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
|
||||||
|
@ -178,7 +184,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&qwtTestFetchQueueLock);
|
taosWUnLockLatch(&qwtTestFetchQueueLock);
|
||||||
|
|
||||||
tsem_post(&qwtTestFetchSem);
|
if (tsem_post(&qwtTestFetchSem) < 0) {
|
||||||
|
printf("tsem_post failed, errno:%d", errno);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -186,6 +195,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
|
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
|
||||||
taosWLockLatch(&qwtTestQueryQueueLock);
|
taosWLockLatch(&qwtTestQueryQueueLock);
|
||||||
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
|
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
|
||||||
|
if (NULL == newMsg) {
|
||||||
|
printf("malloc failed");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
||||||
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
|
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
|
||||||
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
|
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
|
||||||
|
@ -200,22 +213,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&qwtTestQueryQueueLock);
|
taosWUnLockLatch(&qwtTestQueryQueueLock);
|
||||||
|
|
||||||
tsem_post(&qwtTestQuerySem);
|
if (tsem_post(&qwtTestQuerySem) < 0) {
|
||||||
|
printf("tsem_post failed, errno:%d", errno);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
|
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
|
||||||
|
|
||||||
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
int qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
switch (pRsp->msgType) {
|
switch (pRsp->msgType) {
|
||||||
case TDMT_SCH_QUERY_RSP:
|
case TDMT_SCH_QUERY_RSP:
|
||||||
case TDMT_SCH_MERGE_QUERY_RSP: {
|
case TDMT_SCH_MERGE_QUERY_RSP: {
|
||||||
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
|
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
|
||||||
|
|
||||||
if (pRsp->code) {
|
if (pRsp->code) {
|
||||||
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||||
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
if (code) {
|
||||||
|
assert(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
||||||
|
if (code) {
|
||||||
|
assert(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rsp);
|
rpcFreeCont(rsp);
|
||||||
|
@ -227,13 +252,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
|
|
||||||
if (0 == pRsp->code && 0 == rsp->completed) {
|
if (0 == pRsp->code && 0 == rsp->completed) {
|
||||||
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
|
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
|
||||||
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
|
code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
|
||||||
|
if (code) {
|
||||||
|
assert(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
rpcFreeCont(rsp);
|
rpcFreeCont(rsp);
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
|
||||||
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
if (code) {
|
||||||
|
assert(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
|
||||||
|
if (code) {
|
||||||
|
assert(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
rpcFreeCont(rsp);
|
rpcFreeCont(rsp);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -245,9 +282,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
qwtTestCaseFinished = true;
|
qwtTestCaseFinished = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
|
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
|
||||||
|
@ -292,6 +331,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
|
||||||
|
|
||||||
if (endExec) {
|
if (endExec) {
|
||||||
*pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
|
*pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
if (NULL == *pRes) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
(*pRes)->info.rows = taosRand() % 1000 + 1;
|
(*pRes)->info.rows = taosRand() % 1000 + 1;
|
||||||
} else {
|
} else {
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
|
@ -631,7 +673,7 @@ void *queryThread(void *param) {
|
||||||
|
|
||||||
while (!qwtTestStop) {
|
while (!qwtTestStop) {
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
|
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
taosUsleep(taosRand() % 5);
|
taosUsleep(taosRand() % 5);
|
||||||
}
|
}
|
||||||
|
@ -653,7 +695,7 @@ void *fetchThread(void *param) {
|
||||||
|
|
||||||
while (!qwtTestStop) {
|
while (!qwtTestStop) {
|
||||||
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
||||||
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
|
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
taosUsleep(taosRand() % 5);
|
taosUsleep(taosRand() % 5);
|
||||||
}
|
}
|
||||||
|
@ -674,8 +716,11 @@ void *dropThread(void *param) {
|
||||||
STaskDropReq dropMsg = {0};
|
STaskDropReq dropMsg = {0};
|
||||||
|
|
||||||
while (!qwtTestStop) {
|
while (!qwtTestStop) {
|
||||||
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
|
if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) {
|
||||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
|
break;
|
||||||
|
}
|
||||||
|
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error
|
||||||
|
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
taosUsleep(taosRand() % 5);
|
taosUsleep(taosRand() % 5);
|
||||||
}
|
}
|
||||||
|
@ -700,7 +745,7 @@ void *qwtclientThread(void *param) {
|
||||||
qwtTestCaseFinished = false;
|
qwtTestCaseFinished = false;
|
||||||
|
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
|
(void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error
|
||||||
|
|
||||||
while (!qwtTestCaseFinished) {
|
while (!qwtTestCaseFinished) {
|
||||||
taosUsleep(1);
|
taosUsleep(1);
|
||||||
|
@ -752,9 +797,9 @@ void *queryQueueThread(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TDMT_SCH_QUERY == queryRpc->msgType) {
|
if (TDMT_SCH_QUERY == queryRpc->msgType) {
|
||||||
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
|
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
|
||||||
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
|
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
|
||||||
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
|
(void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
|
||||||
} else {
|
} else {
|
||||||
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
|
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -810,16 +855,16 @@ void *fetchQueueThread(void *param) {
|
||||||
switch (fetchRpc->msgType) {
|
switch (fetchRpc->msgType) {
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
case TDMT_SCH_MERGE_FETCH:
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_CANCEL_TASK:
|
case TDMT_SCH_CANCEL_TASK:
|
||||||
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
|
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
|
(void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_TASK_NOTIFY:
|
case TDMT_SCH_TASK_NOTIFY:
|
||||||
qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0);
|
(void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
|
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
|
||||||
|
@ -853,7 +898,7 @@ TEST(seqTest, normalCase) {
|
||||||
|
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
|
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
|
||||||
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
|
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
|
||||||
|
|
||||||
stubSetStringToPlan();
|
stubSetStringToPlan();
|
||||||
stubSetRpcSendResponse();
|
stubSetRpcSendResponse();
|
||||||
|
@ -898,7 +943,7 @@ TEST(seqTest, cancelFirst) {
|
||||||
qwtInitLogFile();
|
qwtInitLogFile();
|
||||||
|
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
|
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
|
||||||
|
|
||||||
stubSetStringToPlan();
|
stubSetStringToPlan();
|
||||||
stubSetRpcSendResponse();
|
stubSetRpcSendResponse();
|
||||||
|
@ -954,7 +999,7 @@ TEST(seqTest, randCase) {
|
||||||
if (r >= 0 && r < maxr / 5) {
|
if (r >= 0 && r < maxr / 5) {
|
||||||
printf("Query,%d\n", t++);
|
printf("Query,%d\n", t++);
|
||||||
qwtBuildQueryReqMsg(&queryRpc);
|
qwtBuildQueryReqMsg(&queryRpc);
|
||||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
|
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error
|
||||||
} else if (r >= maxr / 5 && r < maxr * 2 / 5) {
|
} else if (r >= maxr / 5 && r < maxr * 2 / 5) {
|
||||||
// printf("Ready,%d\n", t++);
|
// printf("Ready,%d\n", t++);
|
||||||
// qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
|
// qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
|
||||||
|
@ -965,14 +1010,14 @@ TEST(seqTest, randCase) {
|
||||||
} else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
|
} else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
|
||||||
printf("Fetch,%d\n", t++);
|
printf("Fetch,%d\n", t++);
|
||||||
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
|
||||||
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
|
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
taosUsleep(1);
|
taosUsleep(1);
|
||||||
}
|
}
|
||||||
} else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
|
} else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
|
||||||
printf("Drop,%d\n", t++);
|
printf("Drop,%d\n", t++);
|
||||||
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
|
(void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error
|
||||||
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
|
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error
|
||||||
if (qwtTestEnableSleep) {
|
if (qwtTestEnableSleep) {
|
||||||
taosUsleep(1);
|
taosUsleep(1);
|
||||||
}
|
}
|
||||||
|
@ -1018,14 +1063,14 @@ TEST(seqTest, multithreadRand) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
(void)taosThreadAttrInit(&thattr); //ignore error
|
||||||
|
|
||||||
TdThread t1, t2, t3, t4, t5, t6;
|
TdThread t1, t2, t3, t4, t5, t6;
|
||||||
taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
|
(void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error
|
||||||
// taosThreadCreate(&(t2), &thattr, readyThread, NULL);
|
// (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error
|
||||||
taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
|
(void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error
|
||||||
taosThreadCreate(&(t4), &thattr, dropThread, NULL);
|
(void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error
|
||||||
taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
|
(void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -1083,16 +1128,16 @@ TEST(rcTest, shortExecshortDelay) {
|
||||||
qwtTestMaxExecTaskUsec = 0;
|
qwtTestMaxExecTaskUsec = 0;
|
||||||
qwtTestReqMaxDelayUsec = 0;
|
qwtTestReqMaxDelayUsec = 0;
|
||||||
|
|
||||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
|
||||||
tsem_init(&qwtTestFetchSem, 0, 0);
|
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
|
||||||
|
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
(void)taosThreadAttrInit(&thattr); //ignore error
|
||||||
|
|
||||||
TdThread t1, t2, t3, t4, t5;
|
TdThread t1, t2, t3, t4, t5;
|
||||||
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
|
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
|
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
|
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -1114,8 +1159,8 @@ TEST(rcTest, shortExecshortDelay) {
|
||||||
|
|
||||||
if (qwtTestCaseFinished) {
|
if (qwtTestCaseFinished) {
|
||||||
if (qwtTestQuitThreadNum < 3) {
|
if (qwtTestQuitThreadNum < 3) {
|
||||||
tsem_post(&qwtTestQuerySem);
|
(void)tsem_post(&qwtTestQuerySem); //ignore error
|
||||||
tsem_post(&qwtTestFetchSem);
|
(void)tsem_post(&qwtTestFetchSem); //ignore error
|
||||||
|
|
||||||
taosUsleep(10);
|
taosUsleep(10);
|
||||||
}
|
}
|
||||||
|
@ -1166,16 +1211,16 @@ TEST(rcTest, longExecshortDelay) {
|
||||||
qwtTestMaxExecTaskUsec = 1000000;
|
qwtTestMaxExecTaskUsec = 1000000;
|
||||||
qwtTestReqMaxDelayUsec = 0;
|
qwtTestReqMaxDelayUsec = 0;
|
||||||
|
|
||||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
|
||||||
tsem_init(&qwtTestFetchSem, 0, 0);
|
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
|
||||||
|
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
(void)taosThreadAttrInit(&thattr); //ignore error
|
||||||
|
|
||||||
TdThread t1, t2, t3, t4, t5;
|
TdThread t1, t2, t3, t4, t5;
|
||||||
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
|
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
|
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
|
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -1197,8 +1242,8 @@ TEST(rcTest, longExecshortDelay) {
|
||||||
|
|
||||||
if (qwtTestCaseFinished) {
|
if (qwtTestCaseFinished) {
|
||||||
if (qwtTestQuitThreadNum < 3) {
|
if (qwtTestQuitThreadNum < 3) {
|
||||||
tsem_post(&qwtTestQuerySem);
|
(void)tsem_post(&qwtTestQuerySem); //ignore error
|
||||||
tsem_post(&qwtTestFetchSem);
|
(void)tsem_post(&qwtTestFetchSem); //ignore error
|
||||||
|
|
||||||
taosUsleep(10);
|
taosUsleep(10);
|
||||||
}
|
}
|
||||||
|
@ -1249,16 +1294,16 @@ TEST(rcTest, shortExeclongDelay) {
|
||||||
qwtTestMaxExecTaskUsec = 0;
|
qwtTestMaxExecTaskUsec = 0;
|
||||||
qwtTestReqMaxDelayUsec = 1000000;
|
qwtTestReqMaxDelayUsec = 1000000;
|
||||||
|
|
||||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
|
||||||
tsem_init(&qwtTestFetchSem, 0, 0);
|
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
|
||||||
|
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
(void)taosThreadAttrInit(&thattr); //ignore error
|
||||||
|
|
||||||
TdThread t1, t2, t3, t4, t5;
|
TdThread t1, t2, t3, t4, t5;
|
||||||
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
|
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
|
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
|
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -1280,8 +1325,8 @@ TEST(rcTest, shortExeclongDelay) {
|
||||||
|
|
||||||
if (qwtTestCaseFinished) {
|
if (qwtTestCaseFinished) {
|
||||||
if (qwtTestQuitThreadNum < 3) {
|
if (qwtTestQuitThreadNum < 3) {
|
||||||
tsem_post(&qwtTestQuerySem);
|
(void)tsem_post(&qwtTestQuerySem); //ignore error
|
||||||
tsem_post(&qwtTestFetchSem);
|
(void)tsem_post(&qwtTestFetchSem); //ignore error
|
||||||
|
|
||||||
taosUsleep(10);
|
taosUsleep(10);
|
||||||
}
|
}
|
||||||
|
@ -1327,16 +1372,16 @@ TEST(rcTest, dropTest) {
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
|
||||||
tsem_init(&qwtTestFetchSem, 0, 0);
|
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
|
||||||
|
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
(void)taosThreadAttrInit(&thattr); //ignore error
|
||||||
|
|
||||||
TdThread t1, t2, t3, t4, t5;
|
TdThread t1, t2, t3, t4, t5;
|
||||||
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
|
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
|
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
|
||||||
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
|
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
|
Loading…
Reference in New Issue