From e53b5d439218518816a71feca2573ca8d5e7c92f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 11:01:53 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tq/tq.c | 10 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 16 +- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/stream.c | 142 +----------- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamData.c | 3 +- source/libs/stream/src/streamExec.c | 4 +- source/libs/stream/src/streamQueue.c | 207 ++++++++++-------- source/libs/stream/src/streamRecover.c | 9 +- .../6-cluster/clusterCommonCheck.py | 2 +- ...node3mnodeInsertLessDataAlterRep3to1to3.py | 2 +- 16 files changed, 159 insertions(+), 255 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 213a0e3c34..48bf2451a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -189,7 +189,6 @@ int32_t streamInit(); void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); -void streamQueueCleanup(SStreamQueue* pQueue); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { @@ -424,8 +423,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsg int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); -int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); -bool tInputQueueIsFull(const SStreamTask* pTask); +int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); +bool streamQueueIsFull(const STaosQueue* pQueue); typedef struct { SMsgHead head; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cce1d55e63..2bc41e6b94 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -164,7 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWalForStreamTasks(STQ* pTq); -int32_t tqSetStreamTasksReady(STQ* pTq); +int32_t tqCheckAndRunStreamTask(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 004c840bfe..3355e771e2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,11 +223,11 @@ void tqClose(STQ*); int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasksAsync(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed. +int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqSetStreamTasksReadyAsync(STQ* pTq); +int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c6b87d66a2..65e6ee4433 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); } - tqStartStreamTasksAsync(pTq, false); + tqScanWalAsync(pTq, false); } streamMetaReleaseTask(pMeta, pTask); @@ -1341,7 +1341,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { - tqSetStreamTasksReady(pTq); + tqCheckAndRunStreamTask(pTq); return 0; } @@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasksAsync(pTq, false); + tqScanWalAsync(pTq, false); return 0; } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this @@ -1505,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { - tqStartStreamTasksAsync(pTq, false); + tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); } @@ -1824,7 +1824,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); - tqSetStreamTasksReadyAsync(pTq); + tqCheckAndRunStreamTaskAsync(pTq); } } } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 5a4302a8fb..62952078bc 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { // 2. the vnode should be the leader. // 3. the stream is not suspended yet. if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { - tqStartStreamTasksAsync(pTq, true); + tqScanWalAsync(pTq, true); } return 0; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 5b718be124..37763a690d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -56,7 +56,7 @@ int32_t tqScanWalForStreamTasks(STQ* pTq) { return 0; } -int32_t tqSetStreamTasksReady(STQ* pTq) { +int32_t tqCheckAndRunStreamTask(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -73,18 +73,18 @@ int32_t tqSetStreamTasksReady(STQ* pTq) { // broadcast the check downstream tasks msg for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } + // fill-history task can only be launched by related stream tasks. if (pTask->info.fillHistory == 1) { streamMetaReleaseTask(pMeta, pTask); continue; } - // todo: how about the fill-history task? if (pTask->status.downstreamReady == 1) { tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", pTask->id.idStr); @@ -103,7 +103,7 @@ int32_t tqSetStreamTasksReady(STQ* pTq) { return 0; } -int32_t tqSetStreamTasksReadyAsync(STQ* pTq) { +int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -136,7 +136,7 @@ int32_t tqSetStreamTasksReadyAsync(STQ* pTq) { return 0; } -int32_t tqStartStreamTasksAsync(STQ* pTq, bool ckPause) { +int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -340,7 +340,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (tInputQueueIsFull(pTask)) { + if (streamQueueIsFull(pTask->inputQueue->queue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; @@ -386,7 +386,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (pItem != NULL) { noDataInWal = false; - code = tAppendDataToInputQueue(pTask, pItem); + code = streamTaskPutDataIntoInputQ(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 44375152d4..d580b41093 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -560,7 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); - tqSetStreamTasksReadyAsync(pVnode->pTq); + tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 43a7232213..a485e94824 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -68,7 +68,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 77a7456745..3b045a8ad7 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,9 +16,6 @@ #include "streamInt.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) SStreamGlobalEnv streamEnv; int32_t streamInit() { @@ -85,7 +82,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); pTrigger->pBlock->info.type = STREAM_GET_ALL; - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) { + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger) < 0) { taosFreeQitem(pTrigger); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); return; @@ -172,7 +169,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp pTask->status.appendTranstateBlock = true; } - int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); + int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; } @@ -192,7 +189,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; streamRetrieveReqToData(pReq, pData); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -239,47 +236,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } -// static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { -// int8_t status = 0; -// -// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); -// if (pBlock == NULL) { -// streamTaskInputFail(pTask); -// status = TASK_INPUT_STATUS__FAILED; -// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, -// pTask->id.idStr); -// } else { -// if (pBlock->type == STREAM_INPUT__TRANS_STATE) { -// pTask->status.appendTranstateBlock = true; -// } -// -// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); -// // input queue is full, upstream is blocked now -// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; -// } -// -// return status; -// } - -// static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** -// pBuf) { -// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); -// if (*pBuf == NULL) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// -// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); -// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); -// -// pDispatchRsp->inputStatus = status; -// pDispatchRsp->streamId = htobe64(pReq->streamId); -// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); -// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); -// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); -// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); -// -// return TSDB_CODE_SUCCESS; -// } int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); @@ -343,98 +299,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } -bool tInputQueueIsFull(const SStreamTask* pTask) { - bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUE_CAPACITY; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); -} - -int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - - if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { - qError( - "s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push " - "data", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); - streamDataSubmitDestroy(px); - taosFreeQitem(pItem); - return -1; - } - - int32_t msgLen = px->submit.msgLen; - int64_t ver = px->submit.ver; - - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); - if (code != TSDB_CODE_SUCCESS) { - streamDataSubmitDestroy(px); - taosFreeQitem(pItem); - return code; - } - - // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - msgLen, ver, total, size + SIZE_IN_MB(msgLen)); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || - type == STREAM_INPUT__REF_DATA_BLOCK) { - if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */ (tInputQueueIsFull(pTask))) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); - destroyStreamDataBlock((SStreamDataBlock*)pItem); - return -1; - } - - qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); - int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); - if (code != TSDB_CODE_SUCCESS) { - destroyStreamDataBlock((SStreamDataBlock*)pItem); - return code; - } - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || - type == STREAM_INPUT__TRANS_STATE) { - taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); - } else if (type == STREAM_INPUT__GET_RES) { - // use the default memory limit, refactor later. - taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); - } else { - ASSERT(0); - } - - if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); - qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus); - } - - return 0; -} - -static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } - -void* streamQueueNextItem(SStreamQueue* pQueue) { - int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); - - if (flag == STREAM_QUEUE__FAILED) { - ASSERT(pQueue->qItem != NULL); - return streamQueueCurItem(pQueue); - } else { - pQueue->qItem = NULL; - taosGetQitem(pQueue->qall, &pQueue->qItem); - if (pQueue->qItem == NULL) { - taosReadAllQitems(pQueue->queue, pQueue->qall); - taosGetQitem(pQueue->qall, &pQueue->qItem); - } - - return streamQueueCurItem(pQueue); - } -} - void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ebf3ce8a30..54f8b9b697 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -124,7 +124,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint taosArrayPush(pChkpoint->blocks, pBlock); taosMemoryFree(pBlock); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) { taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 3c731df071..b248c753af 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -166,6 +166,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm return 0; } +// todo handle memory error SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { terrno = 0; @@ -195,7 +196,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; } else { - qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type); + qDebug("block type:%d not merged with existed blocks list, type:%d", streamGetBlockTypeStr(pElem->type), dst->type); return NULL; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e61f26fb89..a3cbc1fb4e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -368,7 +368,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pDelBlock->info.version = 0; pItem->type = STREAM_INPUT__REF_DATA_BLOCK; pItem->pBlock = pDelBlock; - int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem); + int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem); qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code); } @@ -517,7 +517,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &numOfBlocks); + /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); if (pInput == NULL) { ASSERT(numOfBlocks == 0); return 0; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index e0b6116457..519551486b 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -15,8 +15,30 @@ #include "streamInt.h" -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) + +// todo refactor: +// read data from input queue +typedef struct SQueueReader { + SStreamQueue* pQueue; + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms +} SQueueReader; + +static void streamQueueCleanup(SStreamQueue* pQueue) { + void* qItem = NULL; + while ((qItem = streamQueueNextItem(pQueue)) != NULL) { + streamFreeQitem(qItem); + } + pQueue->status = STREAM_QUEUE__SUCESS; +} + +static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } SStreamQueue* streamQueueOpen(int64_t cap) { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); @@ -40,21 +62,6 @@ SStreamQueue* streamQueueOpen(int64_t cap) { return pQueue; } -void streamQueueCleanup(SStreamQueue* pQueue) { - void* qItem = NULL; - while ((qItem = streamQueueNextItem(pQueue)) != NULL) { - streamFreeQitem(qItem); - } - pQueue->status = STREAM_QUEUE__SUCESS; -} -// void streamQueueClose(SStreamQueue* pQueue) { -// streamQueueCleanup(pQueue); - -// taosFreeQall(pQueue->qall); -// taosCloseQueue(pQueue->queue); -// taosMemoryFree(pQueue); -// } - void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue)); streamQueueCleanup(pQueue); @@ -64,6 +71,24 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { taosMemoryFree(pQueue); } +void* streamQueueNextItem(SStreamQueue* pQueue) { + int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); + + if (flag == STREAM_QUEUE__FAILED) { + ASSERT(pQueue->qItem != NULL); + return streamQueueCurItem(pQueue); + } else { + pQueue->qItem = NULL; + taosGetQitem(pQueue->qall, &pQueue->qItem); + if (pQueue->qItem == NULL) { + taosReadAllQitems(pQueue->queue, pQueue->qall); + taosGetQitem(pQueue->qall, &pQueue->qItem); + } + + return streamQueueCurItem(pQueue); + } +} + #if 0 bool streamQueueResEmpty(const SStreamQueueRes* pRes) { // @@ -122,64 +147,13 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { } #endif -// todo refactor: -// read data from input queue -typedef struct SQueueReader { - SStreamQueue* pQueue; - int32_t taskLevel; - int32_t maxBlocks; // maximum block in one batch - int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms -} SQueueReader; - -#if 0 -SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { - int32_t numOfBlocks = 0; - int32_t tryCount = 0; - SStreamQueueItem* pRet = NULL; - - while (1) { - SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue); - if (qItem == NULL) { - if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { - tryCount++; - taosMsleep(1); - qDebug("try again batchSize:%d", numOfBlocks); - continue; - } - - qDebug("break batchSize:%d", numOfBlocks); - break; - } - - if (pRet == NULL) { - pRet = qItem; - streamQueueProcessSuccess(pReader->pQueue); - if (pReader->taskLevel == TASK_LEVEL__SINK) { - break; - } - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = NULL; - if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) { - streamQueueProcessFail(pReader->pQueue); - break; - } else { - numOfBlocks++; - pRet = newRet; - streamQueueProcessSuccess(pReader->pQueue); - if (numOfBlocks > pReader->maxBlocks) { - qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr); - break; - } - } - } - } - - return pRet; +bool streamQueueIsFull(const STaosQueue* pQueue) { + bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY; + double size = QUEUE_MEM_SIZE_IN_MB((STaosQueue*) pQueue); + return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); } -#endif -int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; @@ -205,7 +179,6 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i } } - // non sink task while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); @@ -227,24 +200,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i // do not merge blocks for sink node and check point data block if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || qItem->type == STREAM_INPUT__TRANS_STATE) { - if (*pInput == NULL) { - char* p = NULL; - if (qItem->type == STREAM_INPUT__CHECKPOINT) { - p = "checkpoint"; - } else if (qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - p = "checkpoint-trigger"; - } else { - p = "transtate"; - } + const char* p = streamGetBlockTypeStr(qItem->type); + if (*pInput == NULL) { qDebug("s-task:%s %s msg extracted, start to process immediately", id, p); + *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block - qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous blocks, numOfBlocks:%d", id, - *numOfBlocks); + qDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } @@ -256,7 +222,11 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i // todo we need to sort the data block, instead of just appending into the array list. void* newRet = streamMergeQueueItem(*pInput, qItem); if (newRet == NULL) { - qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + if (terrno != 0) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(terrno)); + } + streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } @@ -274,3 +244,68 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i } } } + +int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { + int8_t type = pItem->type; + int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + + if (type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputQueue->queue)) { + qError( + "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); + return -1; + } + + int32_t msgLen = px->submit.msgLen; + int64_t ver = px->submit.ver; + + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamDataSubmitDestroy(px); + taosFreeQitem(pItem); + return code; + } + + // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + msgLen, ver, total, size + SIZE_IN_MB(msgLen)); + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || + type == STREAM_INPUT__REF_DATA_BLOCK) { + if (streamQueueIsFull(pTask->inputQueue->queue)) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + destroyStreamDataBlock((SStreamDataBlock*)pItem); + return -1; + } + + qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock((SStreamDataBlock*)pItem); + return code; + } + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); + } else if (type == STREAM_INPUT__GET_RES) { + // use the default memory limit, refactor later. + taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + } else { + ASSERT(0); + } + + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { + atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + qDebug("s-task:%s new data arrived, active the trigger, triggerStatus:%d", pTask->id.idStr, pTask->triggerStatus); + } + + return 0; +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 25ec20d06b..fda6333516 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -388,7 +388,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { taosArrayPush(pTranstate->blocks, pBlock); taosMemoryFree(pBlock); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) { + if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { taosFreeQitem(pTranstate); return TSDB_CODE_OUT_OF_MEMORY; } @@ -624,7 +624,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } - checkFillhistoryTaskStatus(pTask, *pHTask); + if ((*pHTask)->status.downstreamReady == 1) { + qDebug("s-task:%s fill-history task is ready, no need to check downstream", (*pHTask)->id.idStr); + } else { + checkFillhistoryTaskStatus(pTask, *pHTask); + } + return TSDB_CODE_SUCCESS; } diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 439f0b6b8c..5e5568c5c5 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -261,7 +261,7 @@ class ClusterComCheck: count+=1 else: tdLog.debug(tdSql.queryResult) - tdLog.notice(f"elections of {db_name} all vgroups are failed in{count} s ") + tdLog.notice(f"elections of {db_name} all vgroups are failed in {count} s ") caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno) tdLog.exit("%s(%d) failed " % args) diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py index fb9872a8f6..aa0c7a0177 100644 --- a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py +++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py @@ -27,7 +27,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): tdLog.debug(f"start to excute {__file__}") self.TDDnodes = None - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.host = socket.gethostname()