fix(stream): set correct token in bucket.
This commit is contained in:
parent
328377c25f
commit
35b97cbd19
|
@ -1001,10 +1001,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
double el = 0;
|
||||||
|
if (pTask->msgInfo.blockingTs == 0) {
|
||||||
|
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||||
|
} else {
|
||||||
|
el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0;
|
||||||
|
}
|
||||||
|
|
||||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d",
|
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64
|
||||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref);
|
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
|
||||||
|
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
|
||||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // pipeline send data in output queue
|
} else { // pipeline send data in output queue
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
|
|
|
@ -84,7 +84,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
|
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry exec task", pTask->id.idStr);
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -347,6 +347,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t ra
|
||||||
static void fillBucket(STokenBucket* pBucket) {
|
static void fillBucket(STokenBucket* pBucket) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int64_t delta = now - pBucket->fillTimestamp;
|
int64_t delta = now - pBucket->fillTimestamp;
|
||||||
|
ASSERT(pBucket->numOfToken >= 0);
|
||||||
|
|
||||||
int32_t inc = (delta / 1000.0) * pBucket->rate;
|
int32_t inc = (delta / 1000.0) * pBucket->rate;
|
||||||
if (inc > 0) {
|
if (inc > 0) {
|
||||||
|
@ -363,10 +364,9 @@ static void fillBucket(STokenBucket* pBucket) {
|
||||||
|
|
||||||
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
|
bool streamTaskHasAvailableToken(STokenBucket* pBucket) {
|
||||||
fillBucket(pBucket);
|
fillBucket(pBucket);
|
||||||
bool hasToken = (--pBucket->numOfToken) > 0;
|
if (pBucket->numOfToken > 0) {
|
||||||
if (hasToken) {
|
qDebug("remain token:%d", pBucket->numOfToken-1);
|
||||||
qDebug("remain token:%d", pBucket->numOfToken);
|
return --pBucket->numOfToken;
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue