refactor: do some internal refactor.
This commit is contained in:
parent
f6c0649108
commit
58694c67dd
|
@ -685,18 +685,18 @@ typedef struct STaskStatusEntry {
|
||||||
int32_t statusLastDuration; // to record the last duration of current status
|
int32_t statusLastDuration; // to record the last duration of current status
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
int64_t verStart; // start version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t verEnd; // end version in WAL, only valid for source task
|
||||||
int64_t processedVer; // only valid for source task
|
int64_t processedVer; // only valid for source task
|
||||||
int64_t activeCheckpointId; // current active checkpoint id
|
int64_t activeCheckpointId; // current active checkpoint id
|
||||||
int32_t chkpointTransId; // checkpoint trans id
|
int32_t chkpointTransId; // checkpoint trans id
|
||||||
bool checkpointFailed; // denote if the checkpoint is failed or not
|
bool checkpointFailed; // denote if the checkpoint is failed or not
|
||||||
bool inputQChanging; // inputQ is changing or not
|
bool inputQChanging; // inputQ is changing or not
|
||||||
int64_t inputQUnchangeCounter;
|
int64_t inputQUnchangeCounter;
|
||||||
double inputQUsed; // in MiB
|
double inputQUsed; // in MiB
|
||||||
double inputRate;
|
double inputRate;
|
||||||
double sinkQuota; // existed quota size for sink task
|
double sinkQuota; // existed quota size for sink task
|
||||||
double sinkDataSize; // sink to dst data size
|
double sinkDataSize; // sink to dst data size
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
typedef struct SStreamHbMsg {
|
typedef struct SStreamHbMsg {
|
||||||
|
|
|
@ -23,7 +23,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SQWorkerPool SQWorkerPool;
|
|
||||||
typedef struct SWWorkerPool SWWorkerPool;
|
typedef struct SWWorkerPool SWWorkerPool;
|
||||||
|
|
||||||
typedef struct SQueueWorker {
|
typedef struct SQueueWorker {
|
||||||
|
@ -60,14 +59,14 @@ typedef struct SWWorker {
|
||||||
SWWorkerPool *pool;
|
SWWorkerPool *pool;
|
||||||
} SWWorker;
|
} SWWorker;
|
||||||
|
|
||||||
typedef struct SWWorkerPool {
|
struct SWWorkerPool {
|
||||||
int32_t max; // max number of workers
|
int32_t max; // max number of workers
|
||||||
int32_t num;
|
int32_t num;
|
||||||
int32_t nextId; // from 0 to max-1, cyclic
|
int32_t nextId; // from 0 to max-1, cyclic
|
||||||
const char *name;
|
const char *name;
|
||||||
SWWorker *workers;
|
SWWorker *workers;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SWWorkerPool;
|
};
|
||||||
|
|
||||||
int32_t tQWorkerInit(SQWorkerPool *pool);
|
int32_t tQWorkerInit(SQWorkerPool *pool);
|
||||||
void tQWorkerCleanup(SQWorkerPool *pool);
|
void tQWorkerCleanup(SQWorkerPool *pool);
|
||||||
|
|
|
@ -166,7 +166,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
|
||||||
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
|
||||||
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||||
float tsRatioOfVnodeStreamThreads = 4.0;
|
float tsRatioOfVnodeStreamThreads = 1.0;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||||
int32_t tsNumOfQnodeQueryThreads = 4;
|
int32_t tsNumOfQnodeQueryThreads = 4;
|
||||||
|
@ -621,7 +621,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
0)
|
0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER,
|
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 10, CFG_SCOPE_SERVER,
|
||||||
CFG_DYN_NONE) != 0)
|
CFG_DYN_NONE) != 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
|
|
@ -407,12 +407,8 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
if (tWWorkerInit(pFPool) != 0) return -1;
|
if (tWWorkerInit(pFPool) != 0) return -1;
|
||||||
|
|
||||||
SSingleWorkerCfg mgmtCfg = {
|
SSingleWorkerCfg mgmtCfg = {
|
||||||
.min = 1,
|
.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
||||||
.max = 1,
|
|
||||||
.name = "vnode-mgmt",
|
|
||||||
.fp = (FItem)vmProcessMgmtQueue,
|
|
||||||
.param = pMgmt,
|
|
||||||
};
|
|
||||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
|
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
|
||||||
|
|
||||||
dDebug("vnode workers are initialized");
|
dDebug("vnode workers are initialized");
|
||||||
|
|
|
@ -1728,7 +1728,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||||
|
|
||||||
// output queue
|
// output queue
|
||||||
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
||||||
// STR_TO_VARSTR(vbuf, buf);
|
// STR_TO_VARSTR(vbuf, buf);
|
||||||
|
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
|
@ -1102,7 +1102,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
};
|
};
|
||||||
|
|
||||||
entry.inputRate = entry.inputQUsed * 100.0 / STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
|
||||||
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
|
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
|
||||||
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
||||||
|
|
Loading…
Reference in New Issue