fix(stream): reduce the sink throughput. and fix the bug in status check.
This commit is contained in:
parent
523573844a
commit
9d735e7f8b
|
@ -2775,11 +2775,14 @@ static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStream
|
||||||
if (pTaskEntry->id.streamId == pId->streamId) {
|
if (pTaskEntry->id.streamId == pId->streamId) {
|
||||||
numOfTotal++;
|
numOfTotal++;
|
||||||
|
|
||||||
if (pTaskEntry->id.taskId != pId->taskId && pTaskEntry->status == TASK_STATUS__READY) {
|
if (pTaskEntry->id.taskId != pId->taskId) {
|
||||||
|
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
||||||
|
if (pEntry->status == TASK_STATUS__READY) {
|
||||||
numOfReady++;
|
numOfReady++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (numOfReady > 0) {
|
if (numOfReady > 0) {
|
||||||
mDebug("stream:0x%" PRIx64
|
mDebug("stream:0x%" PRIx64
|
||||||
|
|
|
@ -448,7 +448,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
|
|
||||||
// 2MiB per second for sink task
|
// 2MiB per second for sink task
|
||||||
// 50 times sink operator per second
|
// 50 times sink operator per second
|
||||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
|
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
||||||
|
|
||||||
TdThreadMutexAttr attr = {0};
|
TdThreadMutexAttr attr = {0};
|
||||||
int code = taosThreadMutexAttrInit(&attr);
|
int code = taosThreadMutexAttrInit(&attr);
|
||||||
|
|
Loading…
Reference in New Issue