change default sch value

This commit is contained in:
yihaoDeng 2022-12-09 17:08:51 +08:00
parent 52045c43ab
commit e150913c7b
3 changed files with 31 additions and 21 deletions

View File

@ -31,7 +31,7 @@ extern "C" {
#define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_SCHEDULER_NUMBER 100
#define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 3000
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000 #define QW_SCH_TIMEOUT_MSEC 180000
@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_RET(c) \ #define QW_ERR_RET(c) \
do { \ do { \
int32_t _code = (c); \ int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \ if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \ terrno = _code; \
return _code; \ return _code; \
@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt {
} while (0) } while (0)
#define QW_RET(c) \ #define QW_RET(c) \
do { \ do { \
int32_t _code = (c); \ int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \ if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \ terrno = _code; \
} \ } \
@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt {
} while (0) } while (0)
#define QW_ERR_JRET(c) \ #define QW_ERR_JRET(c) \
do { \ do { \
code = (c); \ code = (c); \
if (code != TSDB_CODE_SUCCESS) { \ if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \ terrno = code; \
goto _return; \ goto _return; \

View File

@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) {
int8_t nodeType = mgmt->nodeType; int8_t nodeType = mgmt->nodeType;
int32_t nodeId = mgmt->nodeId; int32_t nodeId = mgmt->nodeId;
int32_t taskCount = 0;
int32_t schStatusCount = 0;
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
taosTmrStop(mgmt->hbTimer); taosTmrStop(mgmt->hbTimer);
@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) {
uint64_t qId, tId; uint64_t qId, tId;
int32_t eId; int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL); void *key = taosHashGetKey(pIter, NULL);
@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) {
qwFreeTaskCtx(ctx); qwFreeTaskCtx(ctx);
QW_TASK_DLOG_E("task ctx freed"); QW_TASK_DLOG_E("task ctx freed");
pIter = taosHashIterate(mgmt->ctxHash, pIter); pIter = taosHashIterate(mgmt->ctxHash, pIter);
taskCount++;
} }
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) {
while (pIter) { while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch); qwDestroySchStatus(sch);
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
schStatusCount++;
} }
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) {
qwCloseRef(); qwCloseRef();
qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt,
taskCount, schStatusCount);
} }
int32_t qwOpenRef(void) { int32_t qwOpenRef(void) {

View File

@ -8,9 +8,9 @@
#include "qwMsg.h" #include "qwMsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "tglobal.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (queryStop) { if (queryStop) {
*queryStop = true; *queryStop = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
if (0 == ctx->level) { if (0 == ctx->level) {
QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows,
ctx->level);
break; break;
} }
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
pOutput->numOfRows);
break; break;
} }
} }
@ -538,7 +540,7 @@ _return:
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) { if (!rsped) {
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
} }
} }
@ -650,8 +652,8 @@ _return:
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -671,8 +673,8 @@ _return:
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
tstrerror(code), dataLen); dataLen);
} }
} }
@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
do { do {
ctx = NULL; ctx = NULL;
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code ||
0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
uint64_t qId, tId; uint64_t qId, tId;
int32_t eId; int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
QW_TASK_DLOG_E("start to force stop task"); QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping"); QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
pIter = taosHashIterate(mgmt->ctxHash, pIter); pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue; continue;
} }
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {