enh(stream): optimize the scan wal perf

This commit is contained in:
Haojun Liao 2023-10-04 22:06:46 +08:00
parent 45e413f8d5
commit 41ef1a45c1
6 changed files with 112 additions and 77 deletions

View File

@ -458,7 +458,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);
int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem);
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask);
bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ); bool streamQueueIsFull(const SStreamQueue* pQueue);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
@ -706,6 +706,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize);
// common // common
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);

View File

@ -21,7 +21,7 @@
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
@ -297,7 +297,7 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
} }
// todo handle memory error // todo handle memory error
void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer; int64_t maxVer = pTask->dataRange.range.maxVer;
@ -310,12 +310,94 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */streamSchedExec(pTask); // /*int32_t code = */streamSchedExec(pTask);
return true;
} else { } else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer); id, ver, maxVer);
} }
} }
return false;
}
static bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false;
}
// not in ready state, do not handle the data from wal
int32_t status = pTask->status.taskStatus;
if (status != TASK_STATUS__NORMAL) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
return false;
}
// fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
return false;
}
// check if input queue is full or not
if (streamQueueIsFull(pTask->inputInfo.queue)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
return false;
}
// the input queue of downstream task is full, so the output is blocked, stopped for a while
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
return false;
}
return true;
}
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0;
while(1) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
*numOfItems += numOfNewItems;
return numOfNewItems > 0;
}
SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL)/* && (numOfItems + numOfNewItems == 0)*/) { // failed, continue
// handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
// streamMetaReleaseTask(pMeta, pTask);
// taosThreadMutexUnlock(&pTask->lock);
break;
}
if (pItem != NULL) {
code = streamTaskPutDataIntoInputQ(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
numOfNewItems += 1;
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.nextProcessVer = ver;
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
if (itemInFillhistory) {
break;
}
} else {
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
}
}
}
*numOfItems += numOfNewItems;
return numOfNewItems > 0;
} }
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
@ -340,45 +422,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); STaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
int32_t status = pTask->status.taskStatus; if (!taskReadyForDataFromWal(pTask)) {
// non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
const char* pStatus = streamGetTaskStatusStr(status);
if (status != TASK_STATUS__NORMAL) {
tqTrace("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if (streamQueueIsFull(pTask->inputInfo.queue, true)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// downstream task has blocked the output, stopped for a while
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
@ -397,7 +447,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
@ -405,33 +455,11 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
SStreamQueueItem* pItem = NULL; bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems);
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamMetaReleaseTask(pStreamMeta, pTask);
taosThreadMutexUnlock(&pTask->lock);
continue;
}
if (pItem != NULL) {
noDataInWal = false;
code = streamTaskPutDataIntoInputQ(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.nextProcessVer = ver;
handleFillhistoryScanComplete(pTask, ver);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
}
}
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) { if (/*(code == TSDB_CODE_SUCCESS) || */(numOfItems > 0) || hasNewData) {
noDataInWal = false;
code = streamSchedExec(pTask); code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);

View File

@ -41,8 +41,7 @@ extern "C" {
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
// clang-format off // clang-format off
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)

View File

@ -603,7 +603,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
pBlock->type == STREAM_INPUT__TRANS_STATE); pBlock->type == STREAM_INPUT__TRANS_STATE);
int32_t retryCount = 0;
pTask->execInfo.dispatch += 1; pTask->execInfo.dispatch += 1;
pTask->msgInfo.startTs = taosGetTimestampMs(); pTask->msgInfo.startTs = taosGetTimestampMs();
@ -613,6 +612,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} else { // todo handle build dispatch msg failed } else { // todo handle build dispatch msg failed
} }
int32_t retryCount = 0;
while (1) { while (1) {
code = sendDispatchMsg(pTask, pTask->msgInfo.pData); code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {

View File

@ -890,8 +890,8 @@ void metaHbToMnode(void* param, void* tmrId) {
.outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)),
}; };
entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE; entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE;
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {
entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader); entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader);

View File

@ -102,14 +102,13 @@ void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ) { bool streamQueueIsFull(const SStreamQueue* pQueue) {
int32_t numOfItems = streamQueueGetNumOfItems(pQueue); int32_t numOfItems = streamQueueGetNumOfItems(pQueue);
if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) {
return true; return true;
} }
int32_t threshold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= threshold);
} }
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
@ -119,6 +118,14 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2; return numOfItems1 + numOfItems2;
} }
int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) {
int32_t num = streamQueueGetNumOfItems(pQueue);
*availNum = STREAM_TASK_QUEUE_CAPACITY - num;
*availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue);
return 0;
}
// todo: fix it: data in Qall is not included here // todo: fix it: data in Qall is not included here
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) {
return taosQueueMemorySize(pQueue->pQueue); return taosQueueMemorySize(pQueue->pQueue);
@ -264,11 +271,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue, true)) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace( stTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem); taosFreeQitem(pItem);
return -1; return -1;
@ -291,11 +298,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pTask->inputInfo.queue, true)) { if (streamQueueIsFull(pTask->inputInfo.queue)) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem); destroyStreamDataBlock((SStreamDataBlock*)pItem);
return -1; return -1;
} }
@ -345,7 +352,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; STaosQueue* pQueue = pTask->outputInfo.queue->pQueue;
while (streamQueueIsFull(pTask->inputInfo.queue, false)) { while (streamQueueIsFull(pTask->inputInfo.queue)) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
return TSDB_CODE_STREAM_EXEC_CANCELLED; return TSDB_CODE_STREAM_EXEC_CANCELLED;