fix(stream): quit from loop when input queue is full.
This commit is contained in:
parent
9d5a3b8d78
commit
59e284c332
|
@ -311,7 +311,6 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||
// /*int32_t code = */streamSchedExec(pTask);
|
||||
return true;
|
||||
} else {
|
||||
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
|
||||
|
@ -390,6 +389,7 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32
|
|||
}
|
||||
} else {
|
||||
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,9 +67,9 @@ struct STokenBucket {
|
|||
int32_t numCapacity; // total capacity, available token per second
|
||||
int32_t numOfToken; // total available tokens
|
||||
int32_t numRate; // number of token per second
|
||||
double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second
|
||||
double bytesRemain; // not consumed bytes per second
|
||||
double bytesRate; // number of token per second
|
||||
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
|
||||
double quotaRemain; // not consumed bytes per second
|
||||
double quotaRate; // number of token per second
|
||||
int64_t fillTimestamp; // fill timestamp
|
||||
};
|
||||
|
||||
|
@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
|||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
|
|
|
@ -892,7 +892,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
|
||||
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
|
||||
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
entry.sinkQuota = (*pTask)->pTokenBucket->bytesRate;
|
||||
entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate;
|
||||
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
|
||||
}
|
||||
|
||||
|
|
|
@ -380,7 +380,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) {
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) {
|
||||
if (numCap < 10 || numRate < 10 || pBucket == NULL) {
|
||||
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -390,15 +390,15 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
|
|||
pBucket->numOfToken = numCap;
|
||||
pBucket->numRate = numRate;
|
||||
|
||||
pBucket->bytesRate = bytesRate;
|
||||
pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO;
|
||||
pBucket->bytesRemain = pBucket->bytesCapacity;
|
||||
pBucket->quotaRate = quotaRate;
|
||||
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
|
||||
pBucket->quotaRemain = pBucket->quotaCapacity;
|
||||
|
||||
pBucket->fillTimestamp = taosGetTimestampMs();
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void fillTokenBucket(STokenBucket* pBucket) {
|
||||
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int64_t delta = now - pBucket->fillTimestamp;
|
||||
ASSERT(pBucket->numOfToken >= 0);
|
||||
|
@ -410,15 +410,15 @@ static void fillTokenBucket(STokenBucket* pBucket) {
|
|||
}
|
||||
|
||||
// increase the new available quota as time goes on
|
||||
double incSize = (delta / 1000.0) * pBucket->bytesRate;
|
||||
double incSize = (delta / 1000.0) * pBucket->quotaRate;
|
||||
if (incSize > 0) {
|
||||
pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity);
|
||||
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
|
||||
}
|
||||
|
||||
if (incNum > 0) {
|
||||
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
|
||||
" wait for %.2f Sec",
|
||||
pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0);
|
||||
" idle for %.2f Sec, %s",
|
||||
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -426,7 +426,7 @@ bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
|
|||
fillTokenBucket(pBucket);
|
||||
|
||||
if (pBucket->numOfToken > 0) {
|
||||
if (pBucket->bytesRemain > 0) {
|
||||
if (pBucket->quotaRemain > 0) {
|
||||
pBucket->numOfToken -= 1;
|
||||
return true;
|
||||
} else { // no available size quota now
|
||||
|
@ -443,5 +443,5 @@ void streamTaskPutbackToken(STokenBucket* pBucket) {
|
|||
|
||||
// size in KB
|
||||
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
|
||||
pBucket->bytesRemain -= SIZE_IN_MiB(bytes);
|
||||
pBucket->quotaRemain -= SIZE_IN_MiB(bytes);
|
||||
}
|
Loading…
Reference in New Issue