fix(stream): quit from loop when input queue is full.

This commit is contained in:
Haojun Liao 2023-10-07 14:54:11 +08:00
parent 115a284ac0
commit 7ce3d2c200
4 changed files with 17 additions and 17 deletions

View File

@ -311,7 +311,6 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
// /*int32_t code = */streamSchedExec(pTask);
return true; return true;
} else { } else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
@ -390,6 +389,7 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32
} }
} else { } else {
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
break;
} }
} }
} }

View File

@ -67,9 +67,9 @@ struct STokenBucket {
int32_t numCapacity; // total capacity, available token per second int32_t numCapacity; // total capacity, available token per second
int32_t numOfToken; // total available tokens int32_t numOfToken; // total available tokens
int32_t numRate; // number of token per second int32_t numRate; // number of token per second
double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double bytesRemain; // not consumed bytes per second double quotaRemain; // not consumed bytes per second
double bytesRate; // number of token per second double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp int64_t fillTimestamp; // fill timestamp
}; };
@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate);
STaskId streamTaskExtractKey(const SStreamTask* pTask); STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);

View File

@ -891,7 +891,7 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->pTokenBucket->bytesRate; entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
} }

View File

@ -380,7 +380,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) { if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
@ -390,15 +390,15 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->numOfToken = numCap; pBucket->numOfToken = numCap;
pBucket->numRate = numRate; pBucket->numRate = numRate;
pBucket->bytesRate = bytesRate; pBucket->quotaRate = quotaRate;
pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->bytesRemain = pBucket->bytesCapacity; pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs(); pBucket->fillTimestamp = taosGetTimestampMs();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void fillTokenBucket(STokenBucket* pBucket) { static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp; int64_t delta = now - pBucket->fillTimestamp;
ASSERT(pBucket->numOfToken >= 0); ASSERT(pBucket->numOfToken >= 0);
@ -410,15 +410,15 @@ static void fillTokenBucket(STokenBucket* pBucket) {
} }
// increase the new available quota as time goes on // increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->bytesRate; double incSize = (delta / 1000.0) * pBucket->quotaRate;
if (incSize > 0) { if (incSize > 0) {
pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
} }
if (incNum > 0) { if (incNum > 0) {
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
" wait for %.2f Sec", " idle for %.2f Sec, %s",
pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0); pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
} }
} }
@ -426,7 +426,7 @@ bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
fillTokenBucket(pBucket); fillTokenBucket(pBucket);
if (pBucket->numOfToken > 0) { if (pBucket->numOfToken > 0) {
if (pBucket->bytesRemain > 0) { if (pBucket->quotaRemain > 0) {
pBucket->numOfToken -= 1; pBucket->numOfToken -= 1;
return true; return true;
} else { // no available size quota now } else { // no available size quota now
@ -443,5 +443,5 @@ void streamTaskPutbackToken(STokenBucket* pBucket) {
// size in KB // size in KB
void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) {
pBucket->bytesRemain -= SIZE_IN_MiB(bytes); pBucket->quotaRemain -= SIZE_IN_MiB(bytes);
} }