diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 904ed3bfb0..5eb8190136 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -459,9 +459,10 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) #define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407) -//scheduler +//scheduler&qworker #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) #define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) +#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) //parser #define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 1bfa2395ce..6e2d482c05 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -106,8 +106,10 @@ typedef struct SQWTaskCtx { void *cancelConnection; bool emptyRes; - int8_t queryContinue; - int8_t queryInQueue; + bool queryFetched; + bool queryEnd; + bool queryContinue; + bool queryInQueue; int32_t rspCode; int8_t events[QW_EVENT_MAX]; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index eb586cb871..4d4b2387ef 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -56,19 +56,27 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, case JOB_TASK_STATUS_PARTIAL_SUCCEED: if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED - && newStatus != JOB_TASK_STATUS_CANCELLED) { + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_DROPPING) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_SUCCEED: if (newStatus != JOB_TASK_STATUS_CANCELLED - && newStatus != JOB_TASK_STATUS_DROPPING) { + && newStatus != JOB_TASK_STATUS_DROPPING + && newStatus != JOB_TASK_STATUS_FAILED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_FAILED: + if (newStatus != JOB_TASK_STATUS_CANCELLED && newStatus != JOB_TASK_STATUS_DROPPING) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + break; + case JOB_TASK_STATUS_CANCELLING: if (newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -77,7 +85,9 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } break; default: @@ -459,7 +469,9 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { _return: - qwReleaseTaskStatus(QW_WRITE, sch); + if (task) { + qwReleaseTaskStatus(QW_WRITE, sch); + } qwReleaseScheduler(QW_WRITE, mgmt); QW_RET(code); @@ -477,7 +489,9 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { _return: - qwReleaseTaskStatus(QW_READ, sch); + if (task) { + qwReleaseTaskStatus(QW_READ, sch); + } qwReleaseScheduler(QW_READ, mgmt); QW_RET(code); @@ -549,6 +563,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { break; } + + if (atomic_load_32(&ctx->rspCode)) { + break; + } } QW_RET(code); @@ -608,7 +626,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void if (ctx->emptyRes) { QW_TASK_DLOG_E("query end with empty result"); - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; @@ -635,7 +653,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_TASK_DLOG_E("no data in sink and query end"); - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; *dataLen = 0; @@ -665,7 +683,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_TASK_DLOG_E("task all data fetched, done"); - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); } return TSDB_CODE_SUCCESS; @@ -687,10 +705,17 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_LOCK(QW_WRITE, &ctx->lock); - if (QW_PHASE_PRE_FETCH != phase) { + if (QW_PHASE_PRE_FETCH == phase) { + atomic_store_8(&ctx->queryFetched, true); + } else { atomic_store_8(&ctx->phase, phase); } + if (atomic_load_8(&ctx->queryEnd)) { + QW_TASK_ELOG_E("query already end"); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + switch (phase) { case QW_PHASE_PRE_QUERY: { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { @@ -717,12 +742,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase)); + QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { - QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase)); + QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); } break; @@ -827,6 +852,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp _return: + if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); + } + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -912,15 +941,13 @@ _return: input.code = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - - if (TSDB_CODE_SUCCESS == code) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED); - } if (!queryRsped) { qwBuildAndSendQueryRsp(qwMsg->connection, code); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); - } + } + + QW_RET(code); } int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { @@ -948,6 +975,11 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + if (atomic_load_8(&ctx->queryEnd) || atomic_load_8(&ctx->queryFetched)) { + QW_TASK_ELOG("got ready msg at wrong status, queryEnd:%d, queryFetched:%d", atomic_load_8(&ctx->queryEnd), atomic_load_8(&ctx->queryFetched)); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (ctx->phase == QW_PHASE_POST_QUERY) { code = ctx->rspCode; goto _return; @@ -1006,13 +1038,13 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); - // RC WARNING atomic_store_8(&ctx->queryContinue, 1); } if (rsp) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + atomic_store_8(&ctx->queryEnd, qComplete); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); @@ -1072,6 +1104,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + atomic_store_8(&ctx->queryEnd, qComplete); } if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { @@ -1084,7 +1117,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_QUERY_RUNNING(ctx)) { atomic_store_8(&ctx->queryContinue, 1); } else if (0 == atomic_load_8(&ctx->queryInQueue)) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING); atomic_store_8(&ctx->queryInQueue, 1); @@ -1137,7 +1170,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); needRsp = true; @@ -1154,7 +1187,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { _return: if (code) { - QW_UPDATE_RSP_CODE(ctx, code); + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + } qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 2ba06f781e..04c36b97da 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -47,6 +47,8 @@ namespace { #define qwtTestQueryQueueSize 1000000 #define qwtTestFetchQueueSize 1000000 +bool qwtEnableLog = true; + int32_t qwtTestMaxExecTaskUsec = 2; int32_t qwtTestReqMaxDelayUsec = 2; @@ -54,10 +56,10 @@ uint64_t qwtTestQueryId = 0; bool qwtTestEnableSleep = true; bool qwtTestStop = false; bool qwtTestDeadLoop = false; -int32_t qwtTestMTRunSec = 60; -int32_t qwtTestPrintNum = 100000; -int32_t qwtTestCaseIdx = 0; -int32_t qwtTestCaseNum = 4; +int32_t qwtTestMTRunSec = 6000; +int32_t qwtTestPrintNum = 10000; +uint64_t qwtTestCaseIdx = 0; +uint64_t qwtTestCaseNum = 4; bool qwtTestCaseFinished = false; tsem_t qwtTestQuerySem; tsem_t qwtTestFetchSem; @@ -95,6 +97,9 @@ SSchTasksStatusReq qwtstatusMsg = {0}; void qwtInitLogFile() { + if (!qwtEnableLog) { + return; + } const char *defaultLogFileNamePrefix = "taosdlog"; const int32_t maxLogFileNum = 10; @@ -203,6 +208,9 @@ int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) { return 0; } +void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { + +} void qwtRpcSendResponse(const SRpcMsg *pRsp) { @@ -263,26 +271,15 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { return; } -int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { - int32_t idx = abs((++qwtTestCaseIdx) % qwtTestCaseNum); - +int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { qwtTestSinkBlockNum = 0; qwtTestSinkMaxBlockNum = taosRand() % 100 + 1; qwtTestSinkQueryEnd = false; - if (0 == idx) { - *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx; - *handle = (DataSinkHandle)qwtTestCaseIdx+1; - } else if (1 == idx) { - *pTaskInfo = NULL; - *handle = NULL; - } else if (2 == idx) { - *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx; - *handle = NULL; - } else if (3 == idx) { - *pTaskInfo = NULL; - *handle = (DataSinkHandle)qwtTestCaseIdx; - } + *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx+1; + *handle = (DataSinkHandle)qwtTestCaseIdx+2; + + ++qwtTestCaseIdx; return 0; } @@ -315,7 +312,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (endExec) { *pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock)); - (*pRes)->info.rows = taosRand() % 1000; + (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; *useconds = taosRand() % 10; @@ -850,7 +847,6 @@ void *fetchQueueThread(void *param) { } -#if 0 TEST(seqTest, normalCase) { void *mgmt = NULL; @@ -881,7 +877,7 @@ TEST(seqTest, normalCase) { stubSetPutDataBlock(); stubSetGetDataBlock(); - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); @@ -920,7 +916,7 @@ TEST(seqTest, cancelFirst) { stubSetStringToPlan(); stubSetRpcSendResponse(); - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); @@ -966,7 +962,7 @@ TEST(seqTest, randCase) { taosSeedRand(taosGetTimestampSec()); - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); int32_t t = 0; @@ -1025,21 +1021,31 @@ TEST(seqTest, multithreadRand) { stubSetStringToPlan(); stubSetRpcSendResponse(); + stubSetExecTask(); + stubSetCreateExecTask(); + stubSetAsyncKillTask(); + stubSetDestroyTask(); + stubSetDestroyDataSinker(); + stubSetGetDataLength(); + stubSetEndPut(); + stubSetPutDataBlock(); + stubSetGetDataBlock(); taosSeedRand(taosGetTimestampSec()); - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); pthread_attr_t thattr; pthread_attr_init(&thattr); - pthread_t t1,t2,t3,t4,t5; + pthread_t t1,t2,t3,t4,t5,t6; pthread_create(&(t1), &thattr, queryThread, mgmt); pthread_create(&(t2), &thattr, readyThread, NULL); pthread_create(&(t3), &thattr, fetchThread, NULL); pthread_create(&(t4), &thattr, dropThread, NULL); pthread_create(&(t5), &thattr, statusThread, NULL); + pthread_create(&(t6), &thattr, fetchQueueThread, mgmt); while (true) { if (qwtTestDeadLoop) { @@ -1052,12 +1058,19 @@ TEST(seqTest, multithreadRand) { qwtTestStop = true; taosSsleep(3); + + qwtTestQueryQueueNum = 0; + qwtTestQueryQueueRIdx = 0; + qwtTestQueryQueueWIdx = 0; + qwtTestQueryQueueLock = 0; + qwtTestFetchQueueNum = 0; + qwtTestFetchQueueRIdx = 0; + qwtTestFetchQueueWIdx = 0; + qwtTestFetchQueueLock = 0; qWorkerDestroy(&mgmt); } -#endif - TEST(rcTest, shortExecshortDelay) { void *mgmt = NULL; int32_t code = 0; @@ -1081,7 +1094,7 @@ TEST(rcTest, shortExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1162,7 +1175,7 @@ TEST(rcTest, longExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1245,7 +1258,7 @@ TEST(rcTest, shortExeclongDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1305,7 +1318,6 @@ TEST(rcTest, shortExeclongDelay) { } -#if 0 TEST(rcTest, dropTest) { void *mgmt = NULL; int32_t code = 0; @@ -1327,7 +1339,7 @@ TEST(rcTest, dropTest) { taosSeedRand(taosGetTimestampSec()); - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, (sendReqToDnodeFp)qwtSendReqToDnode); ASSERT_EQ(code, 0); tsem_init(&qwtTestQuerySem, 0, 0); @@ -1337,7 +1349,7 @@ TEST(rcTest, dropTest) { pthread_attr_init(&thattr); pthread_t t1,t2,t3,t4,t5; - pthread_create(&(t1), &thattr, clientThread, mgmt); + pthread_create(&(t1), &thattr, qwtclientThread, mgmt); pthread_create(&(t2), &thattr, queryQueueThread, mgmt); pthread_create(&(t3), &thattr, fetchQueueThread, mgmt); @@ -1355,7 +1367,6 @@ TEST(rcTest, dropTest) { qWorkerDestroy(&mgmt); } -#endif int main(int argc, char** argv) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7adcb03e42..08180a1c49 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -440,6 +440,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order") #ifdef TAOS_ERROR_C };