Merge pull request #18857 from taosdata/enh/changeDefaultValue

fix: change default sch value
This commit is contained in:
Shengliang Guan 2022-12-09 21:46:30 +08:00 committed by GitHub
commit b4ca6c84da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 21 deletions

View File

@ -31,7 +31,7 @@ extern "C" {
#define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_SCHEDULER_NUMBER 100
#define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 3000
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000 #define QW_SCH_TIMEOUT_MSEC 180000
@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_RET(c) \ #define QW_ERR_RET(c) \
do { \ do { \
int32_t _code = (c); \ int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \ if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \ terrno = _code; \
return _code; \ return _code; \
@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt {
} while (0) } while (0)
#define QW_RET(c) \ #define QW_RET(c) \
do { \ do { \
int32_t _code = (c); \ int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \ if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \ terrno = _code; \
} \ } \
@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt {
} while (0) } while (0)
#define QW_ERR_JRET(c) \ #define QW_ERR_JRET(c) \
do { \ do { \
code = (c); \ code = (c); \
if (code != TSDB_CODE_SUCCESS) { \ if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \ terrno = code; \
goto _return; \ goto _return; \

View File

@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) {
int8_t nodeType = mgmt->nodeType; int8_t nodeType = mgmt->nodeType;
int32_t nodeId = mgmt->nodeId; int32_t nodeId = mgmt->nodeId;
int32_t taskCount = 0;
int32_t schStatusCount = 0;
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
taosTmrStop(mgmt->hbTimer); taosTmrStop(mgmt->hbTimer);
@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) {
uint64_t qId, tId; uint64_t qId, tId;
int32_t eId; int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL); void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) { while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL); void *key = taosHashGetKey(pIter, NULL);
@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) {
qwFreeTaskCtx(ctx); qwFreeTaskCtx(ctx);
QW_TASK_DLOG_E("task ctx freed"); QW_TASK_DLOG_E("task ctx freed");
pIter = taosHashIterate(mgmt->ctxHash, pIter); pIter = taosHashIterate(mgmt->ctxHash, pIter);
taskCount++;
} }
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) {
while (pIter) { while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch); qwDestroySchStatus(sch);
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
schStatusCount++;
} }
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) {
qwCloseRef(); qwCloseRef();
qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt,
taskCount, schStatusCount);
} }
int32_t qwOpenRef(void) { int32_t qwOpenRef(void) {

View File

@ -8,9 +8,9 @@
#include "qwMsg.h" #include "qwMsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "tglobal.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows); pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
if (0 == ctx->level) { if (0 == ctx->level) {
QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows,
ctx->level);
break; break;
} }
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
pOutput->numOfRows);
break; break;
} }
} }
@ -650,8 +652,8 @@ _return:
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -671,8 +673,8 @@ _return:
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
tstrerror(code), dataLen); dataLen);
} }
} }
@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code ||
0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);