From 890d658aeaf85aa3778294183f74511edc536ac0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Sep 2023 14:11:45 +0800 Subject: [PATCH 01/14] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 10 ++++++---- source/dnode/vnode/src/tq/tqSink.c | 5 +++-- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamQueue.c | 6 ++++++ 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a98ad5a4c2..29a75083ef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -326,18 +326,21 @@ typedef struct SSinkRecorder { int64_t numOfSubmit; int64_t numOfBlocks; int64_t numOfRows; - int64_t bytes; + int64_t dataSize; } SSinkRecorder; typedef struct STaskExecStatisInfo { int64_t created; int64_t init; + int64_t start; int64_t step1Start; int64_t step2Start; - int64_t start; int32_t updateCount; - int32_t dispatch; int64_t latestUpdateTs; + int32_t processDataBlocks; + int64_t processDataSize; + int32_t dispatch; + int64_t dispatchDataSize; int32_t checkpoint; SSinkRecorder sink; } STaskExecStatisInfo; @@ -415,7 +418,6 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; int64_t stage; -// bool leader; int32_t role; STaskStartInfo startInfo; SRWLatch lock; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a5958197bd..8009eccb1b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -272,11 +272,12 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* SSinkRecorder* pRec = &pTask->execInfo.sink; pRec->numOfSubmit += 1; - if ((pRec->numOfSubmit % 5000) == 0) { + if ((pRec->numOfSubmit % 1000) == 0) { double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), + el); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a87bb00972..e46b094f60 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -556,7 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { int32_t blockSize = streamQueueItemGetSize(pInput); - pTask->execInfo.sink.bytes += blockSize; + pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index abf10487de..61453cb54e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -154,6 +154,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t taskLevel = pTask->info.taskLevel; *numOfBlocks = 0; + // no available token in bucket for sink task, let's wait + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) { + stDebug("s-task:%s no available token in bucket for sink data, wait", id); + return TSDB_CODE_SUCCESS; + } + while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); From aec15ae39f9e631b6a8b66b7cbba4badfb976cea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Sep 2023 23:29:54 +0800 Subject: [PATCH 02/14] enh(stream): add quota limitation for sink task. --- source/libs/stream/inc/streamInt.h | 15 ++-- source/libs/stream/src/streamExec.c | 5 +- source/libs/stream/src/streamQueue.c | 115 ++++++++++++++++++++------- source/libs/stream/src/streamTask.c | 4 +- 4 files changed, 102 insertions(+), 37 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index dbe868b54f..dc5cf497db 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -49,10 +49,13 @@ typedef struct SStreamContinueExecInfo { } SStreamContinueExecInfo; struct STokenBucket { - int32_t capacity; // total capacity - int64_t fillTimestamp;// fill timestamp - int32_t numOfToken; // total available tokens - int32_t rate; // number of token per second + int32_t numCapacity; // total capacity, available token per second + int32_t numOfToken; // total available tokens + int32_t numRate; // number of token per second + double bytesCapacity; // available capacity for maximum input size, KiloBytes per Second + double bytesRemain; // not consumed bytes per second + double bytesRate; // number of token per second + int64_t fillTimestamp; // fill timestamp }; struct STaskTimer { @@ -89,7 +92,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); @@ -103,7 +106,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e46b094f60..a6f7ac27d4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -524,6 +524,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s start to extract data block from inputQ", id); while (1) { + int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; if (streamTaskShouldStop(&pTask->status)) { @@ -531,7 +532,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (pInput == NULL) { ASSERT(numOfBlocks == 0); return 0; @@ -555,9 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { - int32_t blockSize = streamQueueItemGetSize(pInput); pTask->execInfo.sink.dataSize += blockSize; - stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); continue; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 61453cb54e..708b0572a4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -20,6 +20,7 @@ #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) +#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec // todo refactor: // read data from input queue @@ -30,7 +31,9 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; -static bool streamTaskHasAvailableToken(STokenBucket* pBucket); +static bool streamTaskExtractAvailableToken(STokenBucket* pBucket); +static void streamTaskPutbackToken(STokenBucket* pBucket); +static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); static void streamQueueCleanup(SStreamQueue* pQueue) { void* qItem = NULL; @@ -147,15 +150,19 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + int32_t* blockSize) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; - *numOfBlocks = 0; - // no available token in bucket for sink task, let's wait - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskHasAvailableToken(pTask->pTokenBucket))) { + *pInput = NULL; + *numOfBlocks = 0; + *blockSize = 0; + + // no available token in bucket for sink task, let's wait for a little bit + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) { stDebug("s-task:%s no available token in bucket for sink data, wait", id); return TSDB_CODE_SUCCESS; } @@ -172,6 +179,17 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu taosMsleep(10); continue; } + + // restore the token to bucket + if (*numOfBlocks > 0) { + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + } else { + streamTaskPutbackToken(pTask->pTokenBucket); + } + return TSDB_CODE_SUCCESS; } @@ -179,17 +197,24 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int8_t type = qItem->type; if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { - const char* p = streamQueueItemGetTypeStr(qItem->type); + const char* p = streamQueueItemGetTypeStr(type); if (*pInput == NULL) { stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); + // restore the token to bucket in case of checkpoint/trans-state msg + streamTaskPutbackToken(pTask->pTokenBucket); + *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; - } else { - // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -198,7 +223,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ASSERT((*numOfBlocks) == 0); *pInput = qItem; } else { - // todo we need to sort the data block, instead of just appending into the array list. + // merge current block failed, let's handle the already merged blocks. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { @@ -206,6 +231,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu tstrerror(terrno)); } + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + streamQueueProcessFail(pTask->inputInfo.queue); return TSDB_CODE_SUCCESS; } @@ -218,6 +248,12 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + + *blockSize = streamQueueItemGetSize(*pInput); + if (taskLevel == TASK_LEVEL__SINK) { + streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + } + return TSDB_CODE_SUCCESS; } } @@ -340,43 +376,68 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { - if (cap < 50 || rate < 50 || pBucket == NULL) { - stError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate) { + if (numCap < 10 || numRate < 10 || pBucket == NULL) { + stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; } - pBucket->capacity = cap; - pBucket->rate = rate; - pBucket->numOfToken = cap; + pBucket->numCapacity = numCap; + pBucket->numOfToken = numCap; + pBucket->numRate = numRate; + + pBucket->bytesRate = bytesRate; + pBucket->bytesCapacity = bytesRate * MAX_SMOOTH_BURST_RATIO; + pBucket->bytesRemain = pBucket->bytesCapacity; + pBucket->fillTimestamp = taosGetTimestampMs(); return TSDB_CODE_SUCCESS; } -static void fillBucket(STokenBucket* pBucket) { +static void fillTokenBucket(STokenBucket* pBucket) { int64_t now = taosGetTimestampMs(); int64_t delta = now - pBucket->fillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t inc = (delta / 1000.0) * pBucket->rate; - if (inc > 0) { - if ((pBucket->numOfToken + inc) < pBucket->capacity) { - pBucket->numOfToken += inc; + int32_t incNum = (delta / 1000.0) * pBucket->numRate; + if (incNum > 0) { + if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) { + pBucket->numOfToken += incNum; } else { - pBucket->numOfToken = pBucket->capacity; + pBucket->numOfToken = pBucket->numCapacity; } - pBucket->fillTimestamp = now; - stDebug("new token available, current:%d, inc:%d ts:%"PRId64, pBucket->numOfToken, inc, now); + stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now); + } + + // increase the new available quota as time goes on + double incSize = (delta / 1000.0) * pBucket->bytesRate; + if (incSize > 0) { + pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); + stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now); } } -bool streamTaskHasAvailableToken(STokenBucket* pBucket) { - fillBucket(pBucket); +bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { + fillTokenBucket(pBucket); + if (pBucket->numOfToken > 0) { - --pBucket->numOfToken; - return true; + if (pBucket->bytesRemain > 0) { + pBucket->numOfToken -= 1; + return true; + } else { // no available size quota now + return false; + } } else { return false; } +} + +void streamTaskPutbackToken(STokenBucket* pBucket) { + pBucket->numOfToken = MIN(pBucket->numOfToken + 1, pBucket->numCapacity); +} + +// size in KB +void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { + pBucket->bytesRemain -= SIZE_IN_MiB(bytes); } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e5088e9c69..ea7e89cf1b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -431,7 +431,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } - streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50); + // 2MiB per second for sink task + // 50 times sink operator per second + streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); From 27dcbcb96f9f447591f15fa9d96bf12f30ac2058 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Sep 2023 23:42:02 +0800 Subject: [PATCH 03/14] fix(stream): update the retry interval. --- source/libs/stream/src/streamMeta.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e98a292d37..2c96a26ddc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -195,11 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(500); + taosMsleep(100); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { - stError("vgId:%d failed to init stream backend", pMeta->vgId); - stInfo("vgId:%d retry to init stream backend", pMeta->vgId); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); @@ -263,11 +262,10 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(500); + taosMsleep(100); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { - stError("vgId:%d failed to init stream backend", pMeta->vgId); - stInfo("vgId:%d retry to init stream backend", pMeta->vgId); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } } From 9ab9de0ad35c0acc0f8067536531501c4adafd63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 00:30:38 +0800 Subject: [PATCH 04/14] fix(stream): limit sink quota. --- source/libs/stream/src/streamQueue.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 708b0572a4..dc0eb949da 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -20,7 +20,7 @@ #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) -#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec +#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec // todo refactor: // read data from input queue @@ -401,20 +401,20 @@ static void fillTokenBucket(STokenBucket* pBucket) { int32_t incNum = (delta / 1000.0) * pBucket->numRate; if (incNum > 0) { - if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) { - pBucket->numOfToken += incNum; - } else { - pBucket->numOfToken = pBucket->numCapacity; - } + pBucket->numOfToken = MIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->fillTimestamp = now; - stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now); } // increase the new available quota as time goes on double incSize = (delta / 1000.0) * pBucket->bytesRate; if (incSize > 0) { pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); - stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now); + } + + if (incNum > 0) { + stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 + " wait for %.2f Sec", + pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0); } } From 37bf387453f1b03e05c48c7e4ace0a3a0f8b3e76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 09:36:20 +0800 Subject: [PATCH 05/14] fix(stream): reduce the counter when remove stream tasks. --- source/libs/stream/src/streamDispatch.c | 7 ++++--- source/libs/stream/src/streamExec.c | 4 ++-- source/libs/stream/src/streamMeta.c | 2 ++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bd5753cac3..ea8c7c71ac 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1043,12 +1043,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId pTask->msgInfo.pData = NULL; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; - stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", - pTask->id.idStr, downstreamId, el); // put data into inputQ of current task is also allowed if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, downstreamId, el); + } else { + stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el); } // now ready for next data output @@ -1110,7 +1112,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); ASSERT(leftRsp >= 0); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a6f7ac27d4..20c1495d49 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -248,8 +248,8 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - stDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, - outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); + stDebug("s-task:%s scan exec numOfBlocks:%d, size:%d output num-limit:%d, size-limit:%d reached", + pTask->id.idStr, numOfBlocks, size, outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); break; } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2c96a26ddc..8ae349a31a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -549,6 +549,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t (*ppStreamTask)->historyTaskId.taskId = 0; (*ppStreamTask)->historyTaskId.streamId = 0; } + } else { + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); From a7e6933ce7c2ac8454a1fe2619541d1cd3537003 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 09:46:15 +0800 Subject: [PATCH 06/14] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 20c1495d49..7759de9590 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -192,11 +192,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - int32_t size = 0; int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; - int32_t outputBatchSize = 100; qSetStreamOpOpen(exec); @@ -213,6 +211,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { return -1; } + int32_t size = 0; int32_t numOfBlocks = 0; while (1) { if (streamTaskShouldStop(&pTask->status)) { @@ -247,9 +246,10 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); - if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - stDebug("s-task:%s scan exec numOfBlocks:%d, size:%d output num-limit:%d, size-limit:%d reached", - pTask->id.idStr, numOfBlocks, size, outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); + if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", + pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, + SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); break; } } @@ -260,8 +260,6 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { return code; } - - size = 0; } else { taosArrayDestroy(pRes); } From e13efd5b04ef55682a0287003f0875566eb62f21 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 14:00:48 +0800 Subject: [PATCH 07/14] refactor: optimize the fill-history task launch policy, and do some other internal refactor. --- include/libs/stream/tstream.h | 27 +-- source/dnode/mnode/impl/src/mndScheduler.c | 4 +- source/dnode/vnode/src/tq/tq.c | 16 +- source/libs/stream/inc/streamInt.h | 24 ++- source/libs/stream/src/streamDispatch.c | 21 +-- source/libs/stream/src/streamExec.c | 6 +- source/libs/stream/src/streamMeta.c | 14 +- source/libs/stream/src/streamRecover.c | 198 ++++++++++++++------- source/libs/stream/src/streamTask.c | 62 ++++--- 9 files changed, 235 insertions(+), 137 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 29a75083ef..61d74b5809 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -270,13 +270,13 @@ typedef struct SCheckpointInfo { } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t keepTaskStatus; - bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t timerActive; // timer is active - int8_t pauseAllowed; // allowed task status to be set to be paused + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t keepTaskStatus; + bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it + int8_t pauseAllowed; // allowed task status to be set to be paused + int32_t timerActive; // timer is active } SStreamStatus; typedef struct SDataRange { @@ -304,6 +304,7 @@ typedef struct SDispatchMsgInfo { int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch SArray* pRetryList; // current dispatch successfully completed node of downstream + void* pTimer; // used to dispatch data after a given time duration } SDispatchMsgInfo; typedef struct STaskOutputInfo { @@ -345,7 +346,14 @@ typedef struct STaskExecStatisInfo { SSinkRecorder sink; } STaskExecStatisInfo; -typedef struct STaskTimer STaskTimer; +typedef struct SHistoryTaskInfo { + STaskId id; + void* pTimer; + int32_t tickCount; + int32_t retryTimes; + int32_t waitInterval; +} SHistoryTaskInfo; + typedef struct STokenBucket STokenBucket; typedef struct SMetaHbInfo SMetaHbInfo; @@ -361,7 +369,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; - STaskId historyTaskId; + SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; SArray* pReadyMsgList; // SArray @@ -378,7 +386,6 @@ struct SStreamTask { }; STokenBucket* pTokenBucket; - STaskTimer* pTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1d7d391acf..62d5ff47e3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -296,8 +296,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i); - (*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; - (*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; + (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId; + (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId; (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dada1695ef..dabb01afe4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -876,7 +876,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, (int32_t)pTask->historyTaskId.taskId, pTask->info.triggerParam); + pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); } return 0; @@ -1227,7 +1227,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } else { STimeWindow* pWindow = &pTask->dataRange.window; - if (pTask->historyTaskId.taskId == 0) { + if (pTask->hTaskInfo.id.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " @@ -1441,12 +1441,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg streamTaskPause(pTask, pMeta); SStreamTask* pHistoryTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + if (pTask->hTaskInfo.id.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask == NULL) { tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64 ", it may have been dropped already", - pMeta->vgId, pTask->historyTaskId.taskId); + pMeta->vgId, pTask->hTaskInfo.id.taskId); streamMetaReleaseTask(pMeta, pTask); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active @@ -1515,7 +1515,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } SStreamTask* pHistoryTask = - streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1810,8 +1810,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); SStreamTask** ppHTask = NULL; - if (pTask->historyTaskId.taskId != 0) { - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); + if (pTask->hTaskInfo.id.taskId != 0) { + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (ppHTask == NULL || *ppHTask == NULL) { tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index dc5cf497db..d6f6de3de8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,19 @@ extern "C" { #endif -#define CHECK_DOWNSTREAM_INTERVAL 100 +#define CHECK_DOWNSTREAM_INTERVAL 100 +#define LAUNCH_HTASK_INTERVAL 100 +#define WAIT_FOR_MINIMAL_INTERVAL 100.00 +#define MAX_RETRY_LAUNCH_HISTORY_TASK 20 +#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 + +#define MAX_BLOCK_NAME_NUM 1024 +#define DISPATCH_RETRY_INTERVAL_MS 300 +#define MAX_CONTINUE_RETRY_COUNT 5 + +#define META_HB_CHECK_INTERVAL 200 +#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec +#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -58,11 +70,6 @@ struct STokenBucket { int64_t fillTimestamp; // fill timestamp }; -struct STaskTimer { - void* hTaskLaunchTimer; - void* dispatchTimer; -}; - extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -107,6 +114,9 @@ int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t bytesRate); +STaskId streamTaskExtractKey(const SStreamTask* pTask); +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); @@ -115,7 +125,7 @@ void streamQueueProcessFail(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); -STaskId extractStreamTaskKey(const SStreamTask* pTask); + #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ea8c7c71ac..2d701d6bb0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,10 +18,6 @@ #include "ttimer.h" #include "tmisce.h" -#define MAX_BLOCK_NAME_NUM 1024 -#define DISPATCH_RETRY_INTERVAL_MS 300 -#define MAX_CONTINUE_RETRY_COUNT 5 - typedef struct SBlockName { uint32_t hashValue; char parTbName[TSDB_TABLE_NAME_LEN]; @@ -425,7 +421,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { int32_t msgId = pTask->execInfo.dispatch; if (streamTaskShouldStop(&pTask->status)) { - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -487,26 +483,25 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { - STaskTimer* pTmr = pTask->pTimer; pTask->msgInfo.retryCount++; stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); - if (pTmr->dispatchTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); } else { - pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); } } @@ -636,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); @@ -1143,7 +1138,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt); } - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7759de9590..b5ea82d347 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -310,7 +310,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); + ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -361,8 +361,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info - pStreamTask->historyTaskId.taskId = 0; - pStreamTask->historyTaskId.streamId = 0; + pStreamTask->hTaskInfo.id.taskId = 0; + pStreamTask->hTaskInfo.id.streamId = 0; // 6. save to disk taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8ae349a31a..45c2743ecb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -21,10 +21,6 @@ #include "tstream.h" #include "ttimer.h" -#define META_HB_CHECK_INTERVAL 200 -#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec -#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) - static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; @@ -546,8 +542,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId}; SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId)); if (ppStreamTask != NULL) { - (*ppStreamTask)->historyTaskId.taskId = 0; - (*ppStreamTask)->historyTaskId.streamId = 0; + (*ppStreamTask)->hTaskInfo.id.taskId = 0; + (*ppStreamTask)->hTaskInfo.id.streamId = 0; } } else { atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); @@ -697,7 +693,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - STaskId id = extractStreamTaskKey(pTask); + STaskId id = streamTaskExtractKey(pTask); taosArrayPush(pRecycleList, &id); int32_t total = taosArrayGetSize(pRecycleList); @@ -806,7 +802,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { return 0; } -static bool enoughTimeDuration(SMetaHbInfo* pInfo) { +static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter pInfo->tickCounter = 0; return true; @@ -843,7 +839,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = taosGetTimestampMs(); } - if (!enoughTimeDuration(pMeta->pHbInfo)) { + if (!waitForEnoughDuration(pMeta->pHbInfo)) { taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3ca81ea90b..4018d86dd0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -29,9 +29,12 @@ typedef struct STaskRecheckInfo { void* checkTimer; } STaskRecheckInfo; -static int32_t streamSetParamForScanHistory(SStreamTask* pTask); -static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); -static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); +static int32_t streamSetParamForScanHistory(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); +static int32_t getNextRetryInterval(int32_t waitInterval); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +static void tryLaunchHistoryTask(void* param, void* tmrId); static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { SStreamMeta* pMeta = pTask->pMeta; @@ -54,7 +57,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { taosWLockLatch(&pMeta->lock); - STaskId id = extractStreamTaskKey(pTask); + STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); @@ -90,20 +93,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -const char* streamGetTaskStatusStr(int32_t status) { - switch(status) { - case TASK_STATUS__NORMAL: return "normal"; - case TASK_STATUS__SCAN_HISTORY: return "scan-history"; - case TASK_STATUS__HALT: return "halt"; - case TASK_STATUS__PAUSE: return "paused"; - case TASK_STATUS__CK: return "check-point"; - case TASK_STATUS__DROPPING: return "dropping"; - case TASK_STATUS__STOP: return "stop"; - case TASK_STATUS__UNINIT: return "uninitialized"; - default:return ""; - } -} - static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { @@ -249,7 +238,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { } destroyRecheckInfo(pInfo); - int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } @@ -302,7 +291,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); pTask->status.taskStatus = TASK_STATUS__DROPPING; - ASSERT(pTask->historyTaskId.taskId == 0); + ASSERT(pTask->hTaskInfo.id.taskId == 0); } else { stDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); streamTaskEnablePause(pTask); @@ -375,7 +364,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); @@ -464,6 +453,10 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } +int32_t getNextRetryInterval(int32_t waitInterval) { + return waitInterval * RETRY_LAUNCH_INTERVAL_INC_RATE; +} + int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { @@ -609,12 +602,47 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) doCheckDownstreamStatus(pHTask); } +static bool doLaunchHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo) { + SStreamMeta* pMeta = pTask->pMeta; + streamTaskSetRetryInfoForLaunch(&pTask->hTaskInfo); + + stDebug("s-task:%s try launch related fill-history task in timer, retry:%d", pTask->id.idStr, + pTask->hTaskInfo.retryTimes); + + ASSERT(pTask->status.timerActive >= 1); + + // abort the timer if intend to stop task + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); + if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { + const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); + stWarn( + "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have " + "been destroyed, or should stop", + pTask->id.idStr, pMeta->vgId, streamGetTaskStatusStr(pTask->status.taskStatus), (int32_t)pTask->hTaskInfo.id.taskId); + + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); + streamMetaReleaseTask(pMeta, pTask); + return true; + } + + if (pHTask != NULL) { + checkFillhistoryTaskStatus(pTask, pHTask); + streamMetaReleaseTask(pMeta, pHTask); + } + + // not in timer anymore + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pTask->hTaskInfo.retryTimes, ref); + streamMetaReleaseTask(pMeta, pTask); + + return false; +} + static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId); - taosWLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); if (ppTask) { @@ -622,10 +650,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (streamTaskShouldStop(&(*ppTask)->status)) { const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); - stDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); + + int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); + stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", + (*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref); taosMemoryFree(pInfo); - atomic_sub_fetch_8(&(*ppTask)->status.timerActive, 1); taosWUnLockLatch(&pMeta->lock); return; } @@ -634,78 +664,120 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId); if (pTask != NULL) { - ASSERT(pTask->status.timerActive >= 1); - // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - stWarn( - "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have been " - "destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId); + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); + pHTaskInfo->tickCount -= 1; + if (pHTaskInfo->tickCount > 0) { + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } - if (pHTask != NULL) { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); - } + if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + taosMemoryFree(pInfo); + streamMetaReleaseTask(pMeta, pTask); - // not in timer anymore - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - streamMetaReleaseTask(pMeta, pTask); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, ref); + + pHTaskInfo->id.taskId = 0; + pHTaskInfo->id.streamId = 0; + } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. + streamTaskSetRetryInfoForLaunch(pHTaskInfo); + ASSERT(pTask->status.timerActive >= 1); + + // abort the timer if intend to stop task + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); + if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { + const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); + int32_t hTaskId = pHTaskInfo->id.taskId; + stDebug( + "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch related fill-history task in " + "timer, retryCount:%d", + pTask->id.idStr, p, pHTaskInfo->retryTimes, hTaskId); + + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + streamMetaReleaseTask(pMeta, pTask); + return; + } + + if (pHTask != NULL) { + checkFillhistoryTaskStatus(pTask, pHTask); + streamMetaReleaseTask(pMeta, pHTask); + } + + // not in timer anymore + int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes, ref); + streamMetaReleaseTask(pMeta, pTask); + } } else { - stError("s-task:0x%x failed to load task, it may have been destroyed", (int32_t) pInfo->id.taskId); + stError("s-task:0x%x failed to load task, it may have been destroyed, not launch related fill-history task", + (int32_t)pInfo->id.taskId); } taosMemoryFree(pInfo); } -// todo fix the bug: 2. race condition +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); + if (pInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pInfo->id.taskId = taskId; + pInfo->id.streamId = streamId; + pInfo->pMeta = pMeta; + return pInfo; +} + // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - int32_t hTaskId = pTask->historyTaskId.taskId; + int32_t hTaskId = pTask->hTaskInfo.id.taskId; if (hTaskId == 0) { return TSDB_CODE_SUCCESS; } ASSERT(pTask->status.downstreamReady == 1); stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, hTaskId); + pTask->hTaskInfo.id.streamId, hTaskId); // Set the execute conditions, including the query time window and the version range - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->historyTaskId, sizeof(pTask->historyTaskId)); + SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (pHTask == NULL) { - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, - pMeta->vgId, hTaskId); + stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", pTask->id.idStr, pMeta->vgId, + hTaskId); - SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); - pInfo->id.taskId = pTask->id.taskId; - pInfo->id.streamId = pTask->id.streamId; - pInfo->pMeta = pTask->pMeta; + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + if (pInfo == NULL) { + stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", pTask->id.idStr); + return terrno; + } - if (pTask->pTimer->hTaskLaunchTimer == NULL) { - pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); - if (pTask->pTimer->hTaskLaunchTimer == NULL) { - // todo failed to create timer + streamTaskInitForLaunchHTask(&pTask->hTaskInfo); + if (pTask->hTaskInfo.pTimer == NULL) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer); + if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer + atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, + pTask->status.timerActive); taosMemoryFree(pInfo); } else { - int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active - ASSERT(ref == 1); + ASSERT(ref >= 1); stDebug("s-task:%s set timer active flag, ref:%d", pTask->id.idStr, ref); } } else { // timer exists - ASSERT(pTask->status.timerActive == 1); + ASSERT(pTask->status.timerActive >= 1); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); } - // try again in 100ms return TSDB_CODE_SUCCESS; } @@ -849,7 +921,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory } void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { - if (pTask->historyTaskId.taskId == 0) { + if (pTask->hTaskInfo.id.taskId == 0) { SDataRange* pRange = &pTask->dataRange; if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ea7e89cf1b..e77ab16040 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -96,8 +96,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; - int32_t taskId = pTask->historyTaskId.taskId; + if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; + int32_t taskId = pTask->hTaskInfo.id.taskId; if (tEncodeI32(pEncoder, taskId)) return -1; if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; @@ -169,9 +169,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; + if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->historyTaskId.taskId = taskId; + pTask->hTaskInfo.id.taskId = taskId; if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; if (tDecodeI32(pDecoder, &taskId)) return -1; @@ -312,18 +312,14 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->schedInfo.pTimer = NULL; } - if (pTask->pTimer != NULL) { - if (pTask->pTimer->hTaskLaunchTimer != NULL) { - taosTmrStop(pTask->pTimer->hTaskLaunchTimer); - pTask->pTimer->hTaskLaunchTimer = NULL; - } + if (pTask->hTaskInfo.pTimer != NULL) { + taosTmrStop(pTask->hTaskInfo.pTimer); + pTask->hTaskInfo.pTimer = NULL; + } - if (pTask->pTimer->dispatchTimer != NULL) { - taosTmrStop(pTask->pTimer->dispatchTimer); - pTask->pTimer->dispatchTimer = NULL; - } - - taosMemoryFreeClear(pTask->pTimer); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrStop(pTask->msgInfo.pTimer); + pTask->msgInfo.pTimer = NULL; } int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); @@ -425,12 +421,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i return TSDB_CODE_OUT_OF_MEMORY; } - pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer)); - if (pTask->pTimer == NULL) { - stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; - } - // 2MiB per second for sink task // 50 times sink operator per second streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50, 2); @@ -689,7 +679,35 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } -STaskId extractStreamTaskKey(const SStreamTask* pTask) { +STaskId streamTaskExtractKey(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id; +} + +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) { + pInfo->waitInterval = LAUNCH_HTASK_INTERVAL; + pInfo->tickCount = ceil(LAUNCH_HTASK_INTERVAL / WAIT_FOR_MINIMAL_INTERVAL); + pInfo->retryTimes = 0; +} + +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) { + ASSERT(pInfo->tickCount == 0); + + pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE; + pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL); + pInfo->retryTimes += 1; +} + +const char* streamGetTaskStatusStr(int32_t status) { + switch(status) { + case TASK_STATUS__NORMAL: return "normal"; + case TASK_STATUS__SCAN_HISTORY: return "scan-history"; + case TASK_STATUS__HALT: return "halt"; + case TASK_STATUS__PAUSE: return "paused"; + case TASK_STATUS__CK: return "check-point"; + case TASK_STATUS__DROPPING: return "dropping"; + case TASK_STATUS__STOP: return "stop"; + case TASK_STATUS__UNINIT: return "uninitialized"; + default:return ""; + } } \ No newline at end of file From ae020fcd2de6c5925afd1185e0301f85be57b623 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 14:03:57 +0800 Subject: [PATCH 08/14] log(stream): update the log. --- source/libs/stream/src/streamRecover.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4018d86dd0..386580199f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -695,8 +695,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t hTaskId = pHTaskInfo->id.taskId; stDebug( "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch related fill-history task in " - "timer, retryCount:%d", - pTask->id.idStr, p, pHTaskInfo->retryTimes, hTaskId); + "%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); From 34d547bfff6b14bf7309c29d793433126edb770b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 14:05:14 +0800 Subject: [PATCH 09/14] fix(stream): avoid repeatly free obj. --- source/libs/stream/src/streamRecover.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 386580199f..3946506238 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -676,7 +676,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - taosMemoryFree(pInfo); streamMetaReleaseTask(pMeta, pTask); stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d", From 11641e1115af75d7aa4ca6a6fb2d824373a4c9a4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 14:18:01 +0800 Subject: [PATCH 10/14] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamRecover.c | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d6f6de3de8..1d8fde3a48 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -29,7 +29,7 @@ extern "C" { #define CHECK_DOWNSTREAM_INTERVAL 100 #define LAUNCH_HTASK_INTERVAL 100 #define WAIT_FOR_MINIMAL_INTERVAL 100.00 -#define MAX_RETRY_LAUNCH_HISTORY_TASK 20 +#define MAX_RETRY_LAUNCH_HISTORY_TASK 40 #define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2 #define MAX_BLOCK_NAME_NUM 1024 diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3946506238..8b538db7fb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -693,8 +693,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); int32_t hTaskId = pHTaskInfo->id.taskId; stDebug( - "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch related fill-history task in " - "%dms, retryCount:%d", + "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch %dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); From 8f2d217834220d4cf326f55d3c5f5e7e93782c03 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 14:48:53 +0800 Subject: [PATCH 11/14] log: update logs. --- source/libs/stream/src/streamRecover.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 8b538db7fb..d074e115f7 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -693,7 +693,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); int32_t hTaskId = pHTaskInfo->id.taskId; stDebug( - "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch %dms, retryCount:%d", + "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); From 2dade996bbe44c4f6ed8c56fe7dfec479857a0cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 15:37:12 +0800 Subject: [PATCH 12/14] log(stream): update logs. --- source/libs/stream/src/streamRecover.c | 46 ++------------------------ 1 file changed, 2 insertions(+), 44 deletions(-) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d074e115f7..03a5233023 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -32,7 +32,6 @@ typedef struct STaskRecheckInfo { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static int32_t getNextRetryInterval(int32_t waitInterval); static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); static void tryLaunchHistoryTask(void* param, void* tmrId); @@ -453,10 +452,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } -int32_t getNextRetryInterval(int32_t waitInterval) { - return waitInterval * RETRY_LAUNCH_INTERVAL_INC_RATE; -} - int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { @@ -602,43 +597,6 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) doCheckDownstreamStatus(pHTask); } -static bool doLaunchHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo) { - SStreamMeta* pMeta = pTask->pMeta; - streamTaskSetRetryInfoForLaunch(&pTask->hTaskInfo); - - stDebug("s-task:%s try launch related fill-history task in timer, retry:%d", pTask->id.idStr, - pTask->hTaskInfo.retryTimes); - - ASSERT(pTask->status.timerActive >= 1); - - // abort the timer if intend to stop task - SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); - if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { - const char* p = streamGetTaskStatusStr(pTask->status.taskStatus); - stWarn( - "s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or may have " - "been destroyed, or should stop", - pTask->id.idStr, pMeta->vgId, streamGetTaskStatusStr(pTask->status.taskStatus), (int32_t)pTask->hTaskInfo.id.taskId); - - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); - streamMetaReleaseTask(pMeta, pTask); - return true; - } - - if (pHTask != NULL) { - checkFillhistoryTaskStatus(pTask, pHTask); - streamMetaReleaseTask(pMeta, pHTask); - } - - // not in timer anymore - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pTask->hTaskInfo.retryTimes, ref); - streamMetaReleaseTask(pMeta, pTask); - - return false; -} - static void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; @@ -678,8 +636,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task timer, ref:%d", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, ref); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); pHTaskInfo->id.taskId = 0; pHTaskInfo->id.streamId = 0; From 8c998673adfd444db4ef33366369e3ef8089cee7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 17:15:49 +0800 Subject: [PATCH 13/14] fix(stream): fix dead lock. --- source/libs/stream/src/streamBackendRocksdb.c | 6 +++++- source/libs/stream/src/streamCheckpoint.c | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 85494693c1..a419a59996 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -794,7 +794,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* int64_t id = *(int64_t*)pIter; SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); - if (wrapper == NULL) continue; + if (wrapper == NULL) { + pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + continue; + } taosThreadRwlockRdlock(&wrapper->rwLock); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -967,6 +970,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL || pHandle->db == NULL) { + stError("failed to acquire state-backend handle"); goto _ERROR; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 997fecbba9..a87901eb47 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -321,7 +321,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { pTask->chkInfo.startTs = 0; // clear the recorded start time if (remain == 0) { // all tasks are ready - stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); + stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); stInfo( From f525778aef5699749a9f98851ba88e93e2b923b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Sep 2023 00:53:36 +0800 Subject: [PATCH 14/14] fix(stream): fix syntax error. --- source/libs/stream/src/streamQueue.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index dc0eb949da..65400386b1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -401,14 +401,14 @@ static void fillTokenBucket(STokenBucket* pBucket) { int32_t incNum = (delta / 1000.0) * pBucket->numRate; if (incNum > 0) { - pBucket->numOfToken = MIN(pBucket->numOfToken + incNum, pBucket->numCapacity); + pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->fillTimestamp = now; } // increase the new available quota as time goes on double incSize = (delta / 1000.0) * pBucket->bytesRate; if (incSize > 0) { - pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); + pBucket->bytesRemain = TMIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); } if (incNum > 0) { @@ -434,7 +434,7 @@ bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { } void streamTaskPutbackToken(STokenBucket* pBucket) { - pBucket->numOfToken = MIN(pBucket->numOfToken + 1, pBucket->numCapacity); + pBucket->numOfToken = TMIN(pBucket->numOfToken + 1, pBucket->numCapacity); } // size in KB