feature/scheduler
This commit is contained in:
parent
0de5f9b98a
commit
1502ee27ef
|
@ -459,9 +459,10 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
|
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
|
||||||
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
|
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
|
||||||
|
|
||||||
//scheduler
|
//scheduler&qworker
|
||||||
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
|
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
|
||||||
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
|
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
|
||||||
|
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
|
||||||
|
|
||||||
//parser
|
//parser
|
||||||
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
|
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
|
||||||
|
|
|
@ -106,8 +106,10 @@ typedef struct SQWTaskCtx {
|
||||||
void *cancelConnection;
|
void *cancelConnection;
|
||||||
|
|
||||||
bool emptyRes;
|
bool emptyRes;
|
||||||
int8_t queryContinue;
|
bool queryFetched;
|
||||||
int8_t queryInQueue;
|
bool queryEnd;
|
||||||
|
bool queryContinue;
|
||||||
|
bool queryInQueue;
|
||||||
int32_t rspCode;
|
int32_t rspCode;
|
||||||
|
|
||||||
int8_t events[QW_EVENT_MAX];
|
int8_t events[QW_EVENT_MAX];
|
||||||
|
|
|
@ -56,19 +56,27 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
|
||||||
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
|
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
|
||||||
if (newStatus != JOB_TASK_STATUS_EXECUTING
|
if (newStatus != JOB_TASK_STATUS_EXECUTING
|
||||||
&& newStatus != JOB_TASK_STATUS_SUCCEED
|
&& newStatus != JOB_TASK_STATUS_SUCCEED
|
||||||
&& newStatus != JOB_TASK_STATUS_CANCELLED) {
|
&& newStatus != JOB_TASK_STATUS_CANCELLED
|
||||||
|
&& newStatus != JOB_TASK_STATUS_FAILED
|
||||||
|
&& newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case JOB_TASK_STATUS_SUCCEED:
|
case JOB_TASK_STATUS_SUCCEED:
|
||||||
if (newStatus != JOB_TASK_STATUS_CANCELLED
|
if (newStatus != JOB_TASK_STATUS_CANCELLED
|
||||||
&& newStatus != JOB_TASK_STATUS_DROPPING) {
|
&& newStatus != JOB_TASK_STATUS_DROPPING
|
||||||
|
&& newStatus != JOB_TASK_STATUS_FAILED) {
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case JOB_TASK_STATUS_FAILED:
|
case JOB_TASK_STATUS_FAILED:
|
||||||
|
if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) {
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case JOB_TASK_STATUS_CANCELLING:
|
case JOB_TASK_STATUS_CANCELLING:
|
||||||
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
@ -77,7 +85,9 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
|
||||||
break;
|
break;
|
||||||
case JOB_TASK_STATUS_CANCELLED:
|
case JOB_TASK_STATUS_CANCELLED:
|
||||||
case JOB_TASK_STATUS_DROPPING:
|
case JOB_TASK_STATUS_DROPPING:
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -459,7 +469,9 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qwReleaseTaskStatus(QW_WRITE, sch);
|
if (task) {
|
||||||
|
qwReleaseTaskStatus(QW_WRITE, sch);
|
||||||
|
}
|
||||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||||
|
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
|
@ -477,7 +489,9 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qwReleaseTaskStatus(QW_READ, sch);
|
if (task) {
|
||||||
|
qwReleaseTaskStatus(QW_READ, sch);
|
||||||
|
}
|
||||||
qwReleaseScheduler(QW_READ, mgmt);
|
qwReleaseScheduler(QW_READ, mgmt);
|
||||||
|
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
|
@ -549,6 +563,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (atomic_load_32(&ctx->rspCode)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
|
@ -608,7 +626,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
if (ctx->emptyRes) {
|
if (ctx->emptyRes) {
|
||||||
QW_TASK_DLOG_E("query end with empty result");
|
QW_TASK_DLOG_E("query end with empty result");
|
||||||
|
|
||||||
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
||||||
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
||||||
|
|
||||||
*rspMsg = rsp;
|
*rspMsg = rsp;
|
||||||
|
@ -635,7 +653,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
|
|
||||||
QW_TASK_DLOG_E("no data in sink and query end");
|
QW_TASK_DLOG_E("no data in sink and query end");
|
||||||
|
|
||||||
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
||||||
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
||||||
*rspMsg = rsp;
|
*rspMsg = rsp;
|
||||||
*dataLen = 0;
|
*dataLen = 0;
|
||||||
|
@ -665,7 +683,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
||||||
|
|
||||||
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
|
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
|
||||||
QW_TASK_DLOG_E("task all data fetched, done");
|
QW_TASK_DLOG_E("task all data fetched, done");
|
||||||
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -687,10 +705,17 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
||||||
if (QW_PHASE_PRE_FETCH != phase) {
|
if (QW_PHASE_PRE_FETCH == phase) {
|
||||||
|
atomic_store_8(&ctx->queryFetched, true);
|
||||||
|
} else {
|
||||||
atomic_store_8(&ctx->phase, phase);
|
atomic_store_8(&ctx->phase, phase);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (atomic_load_8(&ctx->queryEnd)) {
|
||||||
|
QW_TASK_ELOG_E("query already end");
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
switch (phase) {
|
switch (phase) {
|
||||||
case QW_PHASE_PRE_QUERY: {
|
case QW_PHASE_PRE_QUERY: {
|
||||||
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
|
@ -717,12 +742,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase));
|
QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
|
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
|
||||||
QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase));
|
QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -827,6 +852,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
|
||||||
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
||||||
|
}
|
||||||
|
|
||||||
if (ctx) {
|
if (ctx) {
|
||||||
QW_UPDATE_RSP_CODE(ctx, code);
|
QW_UPDATE_RSP_CODE(ctx, code);
|
||||||
|
|
||||||
|
@ -912,15 +941,13 @@ _return:
|
||||||
|
|
||||||
input.code = code;
|
input.code = code;
|
||||||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!queryRsped) {
|
if (!queryRsped) {
|
||||||
qwBuildAndSendQueryRsp(qwMsg->connection, code);
|
qwBuildAndSendQueryRsp(qwMsg->connection, code);
|
||||||
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
|
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
@ -948,6 +975,11 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||||
|
|
||||||
|
if (atomic_load_8(&ctx->queryEnd) || atomic_load_8(&ctx->queryFetched)) {
|
||||||
|
QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8(&ctx->queryEnd), atomic_load_8(&ctx->queryFetched));
|
||||||
|
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
if (ctx->phase == QW_PHASE_POST_QUERY) {
|
if (ctx->phase == QW_PHASE_POST_QUERY) {
|
||||||
code = ctx->rspCode;
|
code = ctx->rspCode;
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -1006,13 +1038,13 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||||
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
||||||
|
|
||||||
// RC WARNING
|
|
||||||
atomic_store_8(&ctx->queryContinue, 1);
|
atomic_store_8(&ctx->queryContinue, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rsp) {
|
if (rsp) {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||||
|
atomic_store_8(&ctx->queryEnd, qComplete);
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
|
@ -1072,6 +1104,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
} else {
|
} else {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||||
|
atomic_store_8(&ctx->queryEnd, qComplete);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||||
|
@ -1084,7 +1117,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
if (QW_IS_QUERY_RUNNING(ctx)) {
|
if (QW_IS_QUERY_RUNNING(ctx)) {
|
||||||
atomic_store_8(&ctx->queryContinue, 1);
|
atomic_store_8(&ctx->queryContinue, 1);
|
||||||
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
|
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
|
||||||
|
|
||||||
atomic_store_8(&ctx->queryInQueue, 1);
|
atomic_store_8(&ctx->queryInQueue, 1);
|
||||||
|
|
||||||
|
@ -1137,7 +1170,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
if (QW_IS_QUERY_RUNNING(ctx)) {
|
if (QW_IS_QUERY_RUNNING(ctx)) {
|
||||||
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
|
||||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
|
||||||
} else if (ctx->phase > 0) {
|
} else if (ctx->phase > 0) {
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
@ -1154,7 +1187,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_UPDATE_RSP_CODE(ctx, code);
|
if (ctx) {
|
||||||
|
QW_UPDATE_RSP_CODE(ctx, code);
|
||||||
|
}
|
||||||
|
|
||||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
|
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,8 @@ namespace {
|
||||||
#define qwtTestQueryQueueSize 1000000
|
#define qwtTestQueryQueueSize 1000000
|
||||||
#define qwtTestFetchQueueSize 1000000
|
#define qwtTestFetchQueueSize 1000000
|
||||||
|
|
||||||
|
bool qwtEnableLog = true;
|
||||||
|
|
||||||
int32_t qwtTestMaxExecTaskUsec = 2;
|
int32_t qwtTestMaxExecTaskUsec = 2;
|
||||||
int32_t qwtTestReqMaxDelayUsec = 2;
|
int32_t qwtTestReqMaxDelayUsec = 2;
|
||||||
|
|
||||||
|
@ -54,10 +56,10 @@ uint64_t qwtTestQueryId = 0;
|
||||||
bool qwtTestEnableSleep = true;
|
bool qwtTestEnableSleep = true;
|
||||||
bool qwtTestStop = false;
|
bool qwtTestStop = false;
|
||||||
bool qwtTestDeadLoop = false;
|
bool qwtTestDeadLoop = false;
|
||||||
int32_t qwtTestMTRunSec = 60;
|
int32_t qwtTestMTRunSec = 6000;
|
||||||
int32_t qwtTestPrintNum = 100000;
|
int32_t qwtTestPrintNum = 10000;
|
||||||
int32_t qwtTestCaseIdx = 0;
|
uint64_t qwtTestCaseIdx = 0;
|
||||||
int32_t qwtTestCaseNum = 4;
|
uint64_t qwtTestCaseNum = 4;
|
||||||
bool qwtTestCaseFinished = false;
|
bool qwtTestCaseFinished = false;
|
||||||
tsem_t qwtTestQuerySem;
|
tsem_t qwtTestQuerySem;
|
||||||
tsem_t qwtTestFetchSem;
|
tsem_t qwtTestFetchSem;
|
||||||
|
@ -95,6 +97,9 @@ SSchTasksStatusReq qwtstatusMsg = {0};
|
||||||
|
|
||||||
|
|
||||||
void qwtInitLogFile() {
|
void qwtInitLogFile() {
|
||||||
|
if (!qwtEnableLog) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
const char *defaultLogFileNamePrefix = "taosdlog";
|
const char *defaultLogFileNamePrefix = "taosdlog";
|
||||||
const int32_t maxLogFileNum = 10;
|
const int32_t maxLogFileNum = 10;
|
||||||
|
|
||||||
|
@ -203,6 +208,9 @@ int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
|
@ -263,26 +271,15 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||||
int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum);
|
|
||||||
|
|
||||||
qwtTestSinkBlockNum = 0;
|
qwtTestSinkBlockNum = 0;
|
||||||
qwtTestSinkMaxBlockNum = taosRand() % 100 + 1;
|
qwtTestSinkMaxBlockNum = taosRand() % 100 + 1;
|
||||||
qwtTestSinkQueryEnd = false;
|
qwtTestSinkQueryEnd = false;
|
||||||
|
|
||||||
if (0 == idx) {
|
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx+1;
|
||||||
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
*handle = (DataSinkHandle)qwtTestCaseIdx+2;
|
||||||
*handle = (DataSinkHandle)qwtTestCaseIdx+1;
|
|
||||||
} else if (1 == idx) {
|
++qwtTestCaseIdx;
|
||||||
*pTaskInfo = NULL;
|
|
||||||
*handle = NULL;
|
|
||||||
} else if (2 == idx) {
|
|
||||||
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
|
|
||||||
*handle = NULL;
|
|
||||||
} else if (3 == idx) {
|
|
||||||
*pTaskInfo = NULL;
|
|
||||||
*handle = (DataSinkHandle)qwtTestCaseIdx;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -315,7 +312,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
|
|
||||||
if (endExec) {
|
if (endExec) {
|
||||||
*pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
|
*pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock));
|
||||||
(*pRes)->info.rows = taosRand() % 1000;
|
(*pRes)->info.rows = taosRand() % 1000 + 1;
|
||||||
} else {
|
} else {
|
||||||
*pRes = NULL;
|
*pRes = NULL;
|
||||||
*useconds = taosRand() % 10;
|
*useconds = taosRand() % 10;
|
||||||
|
@ -850,7 +847,6 @@ void *fetchQueueThread(void *param) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
TEST(seqTest, normalCase) {
|
TEST(seqTest, normalCase) {
|
||||||
void *mgmt = NULL;
|
void *mgmt = NULL;
|
||||||
|
@ -881,7 +877,7 @@ TEST(seqTest, normalCase) {
|
||||||
stubSetPutDataBlock();
|
stubSetPutDataBlock();
|
||||||
stubSetGetDataBlock();
|
stubSetGetDataBlock();
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
|
||||||
|
@ -920,7 +916,7 @@ TEST(seqTest, cancelFirst) {
|
||||||
stubSetStringToPlan();
|
stubSetStringToPlan();
|
||||||
stubSetRpcSendResponse();
|
stubSetRpcSendResponse();
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc);
|
||||||
|
@ -966,7 +962,7 @@ TEST(seqTest, randCase) {
|
||||||
|
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
int32_t t = 0;
|
int32_t t = 0;
|
||||||
|
@ -1025,21 +1021,31 @@ TEST(seqTest, multithreadRand) {
|
||||||
|
|
||||||
stubSetStringToPlan();
|
stubSetStringToPlan();
|
||||||
stubSetRpcSendResponse();
|
stubSetRpcSendResponse();
|
||||||
|
stubSetExecTask();
|
||||||
|
stubSetCreateExecTask();
|
||||||
|
stubSetAsyncKillTask();
|
||||||
|
stubSetDestroyTask();
|
||||||
|
stubSetDestroyDataSinker();
|
||||||
|
stubSetGetDataLength();
|
||||||
|
stubSetEndPut();
|
||||||
|
stubSetPutDataBlock();
|
||||||
|
stubSetGetDataBlock();
|
||||||
|
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
|
||||||
pthread_t t1,t2,t3,t4,t5;
|
pthread_t t1,t2,t3,t4,t5,t6;
|
||||||
pthread_create(&(t1), &thattr, queryThread, mgmt);
|
pthread_create(&(t1), &thattr, queryThread, mgmt);
|
||||||
pthread_create(&(t2), &thattr, readyThread, NULL);
|
pthread_create(&(t2), &thattr, readyThread, NULL);
|
||||||
pthread_create(&(t3), &thattr, fetchThread, NULL);
|
pthread_create(&(t3), &thattr, fetchThread, NULL);
|
||||||
pthread_create(&(t4), &thattr, dropThread, NULL);
|
pthread_create(&(t4), &thattr, dropThread, NULL);
|
||||||
pthread_create(&(t5), &thattr, statusThread, NULL);
|
pthread_create(&(t5), &thattr, statusThread, NULL);
|
||||||
|
pthread_create(&(t6), &thattr, fetchQueueThread, mgmt);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (qwtTestDeadLoop) {
|
if (qwtTestDeadLoop) {
|
||||||
|
@ -1052,12 +1058,19 @@ TEST(seqTest, multithreadRand) {
|
||||||
|
|
||||||
qwtTestStop = true;
|
qwtTestStop = true;
|
||||||
taosSsleep(3);
|
taosSsleep(3);
|
||||||
|
|
||||||
|
qwtTestQueryQueueNum = 0;
|
||||||
|
qwtTestQueryQueueRIdx = 0;
|
||||||
|
qwtTestQueryQueueWIdx = 0;
|
||||||
|
qwtTestQueryQueueLock = 0;
|
||||||
|
qwtTestFetchQueueNum = 0;
|
||||||
|
qwtTestFetchQueueRIdx = 0;
|
||||||
|
qwtTestFetchQueueWIdx = 0;
|
||||||
|
qwtTestFetchQueueLock = 0;
|
||||||
|
|
||||||
qWorkerDestroy(&mgmt);
|
qWorkerDestroy(&mgmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST(rcTest, shortExecshortDelay) {
|
TEST(rcTest, shortExecshortDelay) {
|
||||||
void *mgmt = NULL;
|
void *mgmt = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1081,7 +1094,7 @@ TEST(rcTest, shortExecshortDelay) {
|
||||||
qwtTestStop = false;
|
qwtTestStop = false;
|
||||||
qwtTestQuitThreadNum = 0;
|
qwtTestQuitThreadNum = 0;
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
qwtTestMaxExecTaskUsec = 0;
|
qwtTestMaxExecTaskUsec = 0;
|
||||||
|
@ -1162,7 +1175,7 @@ TEST(rcTest, longExecshortDelay) {
|
||||||
qwtTestStop = false;
|
qwtTestStop = false;
|
||||||
qwtTestQuitThreadNum = 0;
|
qwtTestQuitThreadNum = 0;
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
qwtTestMaxExecTaskUsec = 1000000;
|
qwtTestMaxExecTaskUsec = 1000000;
|
||||||
|
@ -1245,7 +1258,7 @@ TEST(rcTest, shortExeclongDelay) {
|
||||||
qwtTestStop = false;
|
qwtTestStop = false;
|
||||||
qwtTestQuitThreadNum = 0;
|
qwtTestQuitThreadNum = 0;
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
qwtTestMaxExecTaskUsec = 0;
|
qwtTestMaxExecTaskUsec = 0;
|
||||||
|
@ -1305,7 +1318,6 @@ TEST(rcTest, shortExeclongDelay) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
TEST(rcTest, dropTest) {
|
TEST(rcTest, dropTest) {
|
||||||
void *mgmt = NULL;
|
void *mgmt = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1327,7 +1339,7 @@ TEST(rcTest, dropTest) {
|
||||||
|
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
|
||||||
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
|
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
tsem_init(&qwtTestQuerySem, 0, 0);
|
tsem_init(&qwtTestQuerySem, 0, 0);
|
||||||
|
@ -1337,7 +1349,7 @@ TEST(rcTest, dropTest) {
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
|
||||||
pthread_t t1,t2,t3,t4,t5;
|
pthread_t t1,t2,t3,t4,t5;
|
||||||
pthread_create(&(t1), &thattr, clientThread, mgmt);
|
pthread_create(&(t1), &thattr, qwtclientThread, mgmt);
|
||||||
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
|
pthread_create(&(t2), &thattr, queryQueueThread, mgmt);
|
||||||
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
|
pthread_create(&(t3), &thattr, fetchQueueThread, mgmt);
|
||||||
|
|
||||||
|
@ -1355,7 +1367,6 @@ TEST(rcTest, dropTest) {
|
||||||
|
|
||||||
qWorkerDestroy(&mgmt);
|
qWorkerDestroy(&mgmt);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
|
@ -440,6 +440,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup
|
||||||
//scheduler
|
//scheduler
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order")
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue