Merge pull request #21551 from taosdata/fix/resume_mem

fix(query): check the version range when dump partial rows. #TD-24504
This commit is contained in:
Haojun Liao 2023-05-31 21:51:07 +08:00 committed by GitHub
commit 46f731e4d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 91 additions and 30 deletions

View File

@ -1272,13 +1272,15 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId); int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
tqDebug("recv dispatch rsp, code:%x", pMsg->code);
int32_t vgId = pTq->pStreamMeta->vgId;
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
return -1; tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
return TSDB_CODE_INVALID_MSG;
} }
} }

View File

@ -137,7 +137,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
int32_t blockSz = taosArrayGetSize(pBlocks); int32_t blockSz = taosArrayGetSize(pBlocks);
tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz); tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
void* pBuf = NULL; void* pBuf = NULL;
SArray* tagArray = NULL; SArray* tagArray = NULL;
@ -482,17 +482,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tEncoderClear(&encoder); tEncoderClear(&encoder);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
SRpcMsg msg = { SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
.msgType = TDMT_VND_SUBMIT,
.pCont = pBuf,
.contLen = len,
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put into write-queue since %s", terrstr()); tqDebug("failed to put into write-queue since %s", terrstr());
} }
} }
} }
tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr);
_end: _end:
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);

View File

@ -1121,6 +1121,27 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order); endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
} }
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
int32_t i = endPos;
if (asc) {
for(; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
break;
}
}
} else {
for(; i < pBlock->nRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
break;
}
}
}
endPos = i;
}
return endPos; return endPos;
} }
@ -1260,10 +1281,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
return 0; return 0;
} }
// row index of dump info remain the initial position, let's find the appropriate start position.
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) { if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pBlock->minKey.ts) { if (asc && pReader->window.skey <= pBlock->minKey.ts && pReader->verRange.minVer <= pBlock->minVer) {
// pDumpInfo->rowIndex = 0; // pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) { } else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts && pReader->verRange.maxVer >= pBlock->maxVer) {
// pDumpInfo->rowIndex = pBlock->nRow - 1; // pDumpInfo->rowIndex = pBlock->nRow - 1;
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex } else { // find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t pos = asc ? pBlock->nRow - 1 : 0; int32_t pos = asc ? pBlock->nRow - 1 : 0;
@ -1279,6 +1301,29 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
pBlock->maxVer, pReader->idStr); pBlock->maxVer, pReader->idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
ASSERT(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.maxVer >= pBlock->minVer);
// find the appropriate start position that satisfies the version requirement.
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
int32_t i = pDumpInfo->rowIndex;
if (asc) {
for(; i < pBlock->nRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
break;
}
}
} else {
for(; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
break;
}
}
}
pDumpInfo->rowIndex = i;
}
} }
} }
@ -1293,6 +1338,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
dumpedRows = pReader->resBlockInfo.capacity; dumpedRows = pReader->resBlockInfo.capacity;
} else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly.
setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
return TSDB_CODE_SUCCESS;
} }
int32_t i = 0; int32_t i = 0;
@ -1848,7 +1896,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
SDataBlockToLoadInfo info = {0}; SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader); getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock || info.partiallyRequired); info.overlapWithDelInfo || info.overlapWithLastBlock);
return isCleanFileBlock; return isCleanFileBlock;
} }
@ -2809,7 +2857,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// it is a clean block, load it directly // it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->resBlockInfo.capacity) { pBlock->nRow <= pReader->resBlockInfo.capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
code = copyBlockDataToSDataBlock(pReader); code = copyBlockDataToSDataBlock(pReader);
if (code) { if (code) {
goto _end; goto _end;
@ -2828,7 +2876,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} }
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
while (1) { while (1) {
bool hasBlockData = false; bool hasBlockData = false;
@ -2842,7 +2890,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
@ -2870,7 +2918,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed // currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break; break;
} }

View File

@ -132,7 +132,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks);
qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
@ -140,6 +141,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
taosArrayPush(pInfo->pBlockLists, pReq); taosArrayPush(pInfo->pBlockLists, pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
taosArrayPush(pInfo->pBlockLists, input); taosArrayPush(pInfo->pBlockLists, input);
@ -150,6 +152,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SPackedData tmp = { .pDataBlock = pDataBlock }; SPackedData tmp = { .pDataBlock = pDataBlock };
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
ASSERT(0); ASSERT(0);

View File

@ -1797,9 +1797,10 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; const char* id = GET_TASKID(pTaskInfo);
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo)); qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
@ -1922,7 +1923,9 @@ FETCH_NEXT_BLOCK:
return NULL; return NULL;
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock; SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.parTbName[0]) { if (pBlock->info.parTbName[0]) {
@ -2057,7 +2060,6 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
const char* id = GET_TASKID(pTaskInfo);
SSDataBlock* pBlock = pInfo->pRes; SSDataBlock* pBlock = pInfo->pRes;
SDataBlockInfo* pBlockInfo = &pBlock->info; SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);

View File

@ -126,7 +126,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
if (pBlock == NULL) { if (pBlock == NULL) {
streamTaskInputFail(pTask); streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr); pTask->id.idStr);
} else { } else {
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
@ -233,11 +233,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
if (leftRsp > 0) { if (leftRsp > 0) {
return 0; return 0;
} }
@ -251,6 +251,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
// TODO: init recover timer // TODO: init recover timer
qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId); qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus);
return 0; return 0;
} }

View File

@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
return 0; return 0;
} }
qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus);
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
if (pDispatchedBlock == NULL) { if (pDispatchedBlock == NULL) {
qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr,
pTask->outputStatus);
return 0; return 0;
} }
@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamQueueProcessFail(pTask->outputQueue); streamQueueProcessFail(pTask->outputQueue);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus);
} }
// this block can be freed only when it has been pushed to down stream. // this block can be freed only when it has been pushed to down stream.

View File

@ -16,9 +16,9 @@
#include "streamInc.h" #include "streamInc.h"
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 16 #define MIN_STREAM_EXEC_BATCH_NUM 8
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo (SStreamTask* pTask); static int32_t updateCheckPointInfo (SStreamTask* pTask);
@ -385,7 +385,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} }