diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d11a4ad23b..9ea655c15c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -685,18 +685,18 @@ typedef struct STaskStatusEntry { int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task int64_t activeCheckpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - bool checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not + int32_t chkpointTransId; // checkpoint trans id + bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/include/util/tworker.h b/include/util/tworker.h index 8508adf052..f39540d24b 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -23,7 +23,6 @@ extern "C" { #endif -typedef struct SQWorkerPool SQWorkerPool; typedef struct SWWorkerPool SWWorkerPool; typedef struct SQueueWorker { @@ -60,14 +59,14 @@ typedef struct SWWorker { SWWorkerPool *pool; } SWWorker; -typedef struct SWWorkerPool { +struct SWWorkerPool { int32_t max; // max number of workers int32_t num; int32_t nextId; // from 0 to max-1, cyclic const char *name; SWWorker *workers; TdThreadMutex mutex; -} SWWorkerPool; +}; int32_t tQWorkerInit(SQWorkerPool *pool); void tQWorkerCleanup(SQWorkerPool *pool); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2e52c77080..75a54a0cd5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -166,7 +166,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 79d21955d4..b72c4fe077 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 4.0; +float tsRatioOfVnodeStreamThreads = 1.0; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; @@ -621,7 +621,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { 0) return -1; - if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER, + if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 9a792a2774..8b80527447 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -407,12 +407,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { if (tWWorkerInit(pFPool) != 0) return -1; SSingleWorkerCfg mgmtCfg = { - .min = 1, - .max = 1, - .name = "vnode-mgmt", - .fp = (FItem)vmProcessMgmtQueue, - .param = pMgmt, - }; + .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; dDebug("vnode workers are initialized"); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 146f9f6fc4..95150b9d6e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1728,7 +1728,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); // output queue - // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); +// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); // STR_TO_VARSTR(vbuf, buf); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index de435c63a3..04d34b0945 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1102,7 +1102,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);