From c647ded0daba38b703e77e168f040ee86b89abb4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Apr 2023 19:47:49 +0800 Subject: [PATCH 01/31] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tq.c | 11 +++-- source/libs/stream/src/streamDispatch.c | 53 +++++++++++++++---------- source/libs/stream/src/streamRecover.c | 11 ++--- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 82e57daf56..c690610341 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -826,6 +826,9 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + double el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -842,7 +845,6 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -852,13 +854,14 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { + tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr); return -1; } memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -870,7 +873,8 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t int32_t code = 0; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } @@ -912,7 +916,6 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 549374ed94..95e864dc99 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -270,8 +270,12 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { - goto FAIL; + if (buf) { + rpcFreeCont(buf); + } + return code; } + tEncoderClear(&encoder); msg.contLen = tlen + sizeof(SMsgHead); @@ -280,13 +284,10 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - - qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId); + qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr, + pReq->taskId, vgId); return 0; -FAIL: - if (buf) rpcFreeCont(buf); - return code; } int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { @@ -407,13 +408,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, + qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, pTask->selfChildId, blockNum, downstreamTaskId, vgId); if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { goto FAIL_FIXED_DISPATCH; } + code = 0; + FAIL_FIXED_DISPATCH: taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); @@ -427,6 +430,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t vgSz = taosArrayGetSize(vgInfo); SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -442,6 +446,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { goto FAIL_SHUFFLE_DISPATCH; } + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); pReqs[i].taskId = pVgInfo->taskId; } @@ -468,33 +473,41 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, + blockNum, vgSz); + for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { - // send SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, + pReqs[i].blockNum, pVgInfo->vgId); + if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } } + code = 0; + FAIL_SHUFFLE_DISPATCH: - if (pReqs) { - for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroyP(pReqs[i].data, taosMemoryFree); - taosArrayDestroy(pReqs[i].dataLen); - } - taosMemoryFree(pReqs); + for (int32_t i = 0; i < vgSz; i++) { + taosArrayDestroyP(pReqs[i].data, taosMemoryFree); + taosArrayDestroy(pReqs[i].dataLen); } - return code; + taosMemoryFree(pReqs); } - return 0; + return code; } int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr, - taosQueueItemSize(pTask->outputQueue->queue)); + + int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); + if (numOfElems > 0) { + qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, + numOfElems); + } int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); @@ -504,7 +517,7 @@ int32_t streamDispatch(SStreamTask* pTask) { SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { - qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr); + qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } @@ -516,10 +529,8 @@ int32_t streamDispatch(SStreamTask* pTask) { code = -1; streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - goto FREE; } -FREE: taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); return code; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0d1440fbde..40c33577fb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -212,7 +212,7 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; - qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); + qDebug("s-task:%s recover step2 (blocking stage) started", pTask->id.idStr); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } @@ -220,12 +220,13 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { - SStreamRecoverFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->selfChildId, - }; + SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId }; + // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId); + req.taskId = pTask->fixedEpDispatcher.taskId; streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { From b69137df44354d523b08370ffc4cc2f4b928ad4f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 00:00:25 +0800 Subject: [PATCH 02/31] refactor(log): add some logs. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamMeta.c | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c690610341..aba72835cf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -172,7 +172,7 @@ void tqNotifyClose(STQ* pTq) { int64_t st = taosGetTimestampMs(); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); int64_t el = taosGetTimestampMs() - st; - tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); + tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); } taosWUnLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d7c5602799..5efe0c2e96 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -52,7 +52,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); + pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { goto _err; } @@ -215,6 +215,8 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { + taosWLockLatch(&pMeta->lock); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; @@ -222,11 +224,10 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); - - taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); - taosWUnLockLatch(&pMeta->lock); } + + taosWUnLockLatch(&pMeta->lock); } int32_t streamMetaBegin(SStreamMeta* pMeta) { From bbb8aaa0dcea4d13da419524b48e02e2cebc0c05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 02:37:10 +0800 Subject: [PATCH 03/31] refactor: add some logs. --- source/dnode/vnode/src/tq/tq.c | 6 ++++++ source/libs/stream/src/stream.c | 1 + source/libs/stream/src/streamDispatch.c | 2 ++ source/libs/stream/src/streamRecover.c | 4 ++-- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aba72835cf..a42a2119d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -880,6 +880,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } // do recovery step 2 + int64_t st = taosGetTimestampMs(); + tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -905,6 +908,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } + double el = (taosGetTimestampMs() - st)/ 1000.0; + tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el); + // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); if (code < 0) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 120fe68b5d..a0bc95ac01 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -256,6 +256,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(0); return 0; } + // continue dispatch streamDispatch(pTask); return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 95e864dc99..ce556aaa8d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -512,6 +513,7 @@ int32_t streamDispatch(SStreamTask* pTask) { int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { + qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 40c33577fb..471b363cbd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -224,8 +224,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId); + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); req.taskId = pTask->fixedEpDispatcher.taskId; streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); From 39cac9d3084d9fba771ee379cb81e3e2d531a3ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 14:06:26 +0800 Subject: [PATCH 04/31] refactor: remove log ; --- source/libs/executor/src/timewindowoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 700ca92881..1533527834 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2252,8 +2252,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; - qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, - pEnryInfo->isNullRes, pEnryInfo->numOfRes); + if (pCtx[j].fpSet.finalize) { int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code1)) { @@ -2275,6 +2274,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pBlock->info.rows += pRow->numOfRows; } + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; From 777ed1761ffc730211e8ff80ee757b337c8aba0e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 18:24:46 +0800 Subject: [PATCH 05/31] refactor: update logs. --- source/dnode/vnode/src/tq/tq.c | 14 +++++++------- source/dnode/vnode/src/tq/tqRestore.c | 4 ++-- source/libs/stream/src/stream.c | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a42a2119d0..fd01e3ffae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -821,12 +821,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr); + tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr); int64_t st = taosGetTimestampMs(); streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { - double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -834,7 +833,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); // build msg to launch next step SStreamRecoverStep2Req req; @@ -861,7 +860,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr); + tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -902,6 +901,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } // set status normal + tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr); code = streamSetStatusNormal(pTask); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -909,7 +909,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } double el = (taosGetTimestampMs() - st)/ 1000.0; - tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el); + tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); @@ -1367,12 +1367,12 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr()); + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } - tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = WAL_READ_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b5fae3184a..c8b598763d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -100,7 +100,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); + tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -113,7 +113,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (tInputQueueIsFull(pTask)) { - tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); + tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index a0bc95ac01..22c8a61a7b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -216,8 +216,8 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch msg from taskId:%d (vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); From ace193ca9b3deae70decc9dfa95ba66d84d4c304 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 23:34:49 +0800 Subject: [PATCH 06/31] fix(stream): set the seek version if the reader's version is earlier than the first version in wal. --- include/libs/wal/wal.h | 3 +++ source/dnode/vnode/src/tq/tqRestore.c | 24 +++++++++++++++++------- source/libs/wal/src/walRead.c | 1 + 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b51289de5e..ad7e4e104a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -138,6 +138,8 @@ typedef struct { int8_t enableRef; } SWalFilterCond; +typedef struct SWalReader SWalReader; + // todo hide this struct typedef struct SWalReader { SWal *pWal; @@ -198,6 +200,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +int64_t walReaderGetValidFirstVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c8b598763d..0eb9a92f87 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -121,17 +121,27 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - streamMetaReleaseTask(pStreamMeta, pTask); - continue; + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); } - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != pTask->chkInfo.currentVer) { + int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } SPackedData packData = {0}; - code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de..b185e7f00a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; From a88a4f5db00f6d03778362582b899a74f58e9aac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Apr 2023 01:03:13 +0800 Subject: [PATCH 07/31] fix(query): add more check for stopping tsdb reader --- source/dnode/vnode/src/tsdb/tsdbRead.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index eb15400d05..54defbae18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2997,6 +2997,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr // only check here, since the iterate data in memory is very fast. if (pReader->flag == READER_STATUS_SHOULD_STOP) { tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); + taosArrayDestroy(pIndexList); return TSDB_CODE_SUCCESS; } @@ -3163,6 +3164,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + if (pReader->flag == READER_STATUS_SHOULD_STOP) { + return TSDB_CODE_SUCCESS; + } + pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); if (pScanInfo == NULL) { return terrno; @@ -3483,7 +3488,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (pBlockIter->numOfBlocks == 0) { _begin: code = doLoadLastBlockSequentially(pReader); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } @@ -3496,7 +3501,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || + pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } @@ -5066,6 +5072,11 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + + if (pReader->flag == READER_STATUS_SHOULD_STOP) { + return NULL; + } + STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { return NULL; From f645454f6c2ab9e62f8ea12de54a28c7c693d241 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Apr 2023 01:58:08 +0800 Subject: [PATCH 08/31] fix(query): add check. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 54defbae18..46af66008b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3567,7 +3567,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = doBuildDataBlock(pReader); } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } From 2fe494c2c7916f1efa465cc05bcaa33736185f46 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Apr 2023 02:42:26 +0800 Subject: [PATCH 09/31] fix(query):add more check. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 46af66008b..30bdb4ae86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3497,6 +3497,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } // all data blocks are checked in this last block file, now let's try the next file + // ASSERT(pReader->status.pTableIter == NULL); if (pReader->status.pTableIter == NULL) { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3551,7 +3552,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || + pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } From d8a1f15d10c63eba7779e19f336970d982d250f7 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 4 May 2023 09:28:06 +0800 Subject: [PATCH 10/31] fix resum bug --- include/libs/stream/tstream.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamExec.c | 7 ++++++- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7d16924ef..c6ab61529a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -279,6 +279,7 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; int8_t schedStatus; + int8_t keepTaskStatus; } SStreamStatus; struct SStreamTask { @@ -539,6 +540,7 @@ int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); +bool streamTaskShouldPause(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fd01e3ffae..b6602588da 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1221,6 +1221,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); streamMetaReleaseTask(pTq->pStreamMeta, pTask); } @@ -1231,7 +1232,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { - streamSetStatusNormal(pTask); + atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version if (pReq->igUntreated) { // discard all the data when the stream task is suspended. diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0eb9a92f87..92c37a8b5e 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || - status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { + status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 22c8a61a7b..cbb4b33cf3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -52,7 +52,7 @@ void streamCleanUp() { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); return; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7ea093973a..1122cd8ff0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -22,6 +22,11 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) { return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } +bool streamTaskShouldPause(const SStreamStatus* pStatus) { + int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + return (status == TASK_STATUS__PAUSE); +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -141,7 +146,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { taosArrayDestroy(pRes); return 0; } From d17de8b32bff43376ae9bf237afe31e12169e181 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 4 May 2023 14:02:39 +0800 Subject: [PATCH 11/31] add test case --- tests/script/tsim/stream/pauseAndResume.sim | 40 +++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index e1e2a67a22..fa7be19310 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -190,6 +190,46 @@ if $data11 != 1 then goto loop2 endi +sql pause stream streams2; + +sql insert into t1 values(1648791223003,2,2,3,1.1); +sql insert into t1 values(1648791233003,2,2,3,1.1); + +sql resume stream IGNORE UNTREATED streams2; + +$loop_count = 0 + +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 4 select * from streamt2; +sql select * from streamt2; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop3 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop3 +endi + +if $data11 != 1 then + print =====data01=$data01 + goto loop3 +endi + + print ===== step 2 over From 74ab6897190a7b4819e638056032461aec89a3b4 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 5 May 2023 08:59:07 +0800 Subject: [PATCH 12/31] opt batch num --- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamState.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index cbb4b33cf3..560f92ec4c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,7 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 102400 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 int32_t streamInit() { int8_t old; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1122cd8ff0..663cdb4d7c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,7 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 20480 +#define STREAM_EXEC_MAX_BATCH_NUM 10240 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index c7d85ac885..abd14801a8 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -24,7 +24,7 @@ #include "tcompare.h" #include "ttimer.h" -#define MAX_TABLE_NAME_NUM 100000 +#define MAX_TABLE_NAME_NUM 2000000 int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { From 76bf0aea56ac3d2e7bafb93d1abac0e77b210931 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Sat, 6 May 2023 10:14:27 +0800 Subject: [PATCH 13/31] opt stream input queue --- source/libs/stream/src/streamExec.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 663cdb4d7c..7884463ebb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,8 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 10240 +#define MAX_STREAM_EXEC_BATCH_NUM 10240 +#define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); @@ -260,12 +261,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 1; void* pInput = NULL; + int16_t times = 0; // merge multiple input data if possible in the input queue. while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { -// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); + if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { + times++; + taosMsleep(1); + qDebug("===stream===try agian batchSize:%d", batchSize); + continue; + } break; } @@ -284,7 +291,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { break; } } From 7dfa4c210528c52f33daa6db070106f3a5dcd091 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 13:13:28 +0800 Subject: [PATCH 14/31] fix(query): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 13c6e66b27..b7ec7bb5c5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3164,8 +3164,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - if (pReader->flag == READER_STATUS_SHOULD_STOP) { - return TSDB_CODE_SUCCESS; + if (pReader->code != TSDB_CODE_SUCCESS) { + return pReader->code; } pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); @@ -5083,7 +5083,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); - if (pReader->flag == READER_STATUS_SHOULD_STOP) { + if (pReader->code != TSDB_CODE_SUCCESS) { return NULL; } From aca5760ceb65e11f691ab686f8b644cdfd67718e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 13:17:31 +0800 Subject: [PATCH 15/31] fix(query): fix syntax error. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b1f8df5c6c..abbf016db3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -68,7 +68,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->walScan = 0; + pMeta->walScanCounter = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; From 6b305bf745f678d1135517d85fddf6517d929a9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 13:35:18 +0800 Subject: [PATCH 16/31] refactor: do some internal refactor. --- include/libs/wal/wal.h | 2 +- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 21 ++++++++++++++------- source/libs/wal/src/walRead.c | 9 +++++---- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 5c7a726d69..09198aa038 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -195,7 +195,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); +int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); int64_t walReaderGetValidFirstVer(const SWalReader* pReader); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index ea099373d7..d4db43dd21 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 4e18e570b5..c855d323fc 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -112,18 +112,25 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { pTask->chkInfo.currentVer = firstVer; tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); - } - int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer != pTask->chkInfo.currentVer) { - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != pTask->chkInfo.currentVer) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } } SPackedData packData = {0}; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4a6be5bfb7..66c32437a3 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -184,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } + if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner if (walReadChangeFile(pReader, pRet->firstVer) < 0) { @@ -203,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return 0; } -int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { +int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); @@ -233,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); if (pRead->curVersion != fetchVer) { - if (walReadSeekVer(pRead, fetchVer) < 0) { + if (walReaderSeekVer(pRead, fetchVer) < 0) { return -1; } seeked = true; @@ -336,7 +337,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); + code = walReaderSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; // pRead->curInvalid = 1; @@ -471,7 +472,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReadSeekVer(pReader, ver) < 0) { + if (walReaderSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1; From f1a638c196cc6bf3a27ef2033899c246c6597324 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 13:36:42 +0800 Subject: [PATCH 17/31] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqRestore.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c855d323fc..9ee8e667db 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -119,6 +119,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (currentVer != pTask->chkInfo.currentVer) { From fc799bcd19c33939d172172de03e7abce403d2a6 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 6 May 2023 15:06:53 +0800 Subject: [PATCH 18/31] test: fix tsim/db/error1.sim random failed --- tests/script/tsim/db/error1.sim | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index 32dbe826cc..d275dca387 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -56,18 +56,18 @@ endi if $data23 != 0 then return -1 -endi +end -print ========== stop dnode2 -system sh/exec.sh -n dnode2 -s stop -x SIGKILL +#print ========== stop dnode2 +#system sh/exec.sh -n dnode2 -s stop -x SIGKILL -sleep 1000 -print =============== drop database -sql_error drop database d1 +#sleep 1000 +#print =============== drop database +sql drop database d1 -print ========== start dnode2 -system sh/exec.sh -n dnode2 -s start -sleep 1000 +#print ========== start dnode2 +#system sh/exec.sh -n dnode2 -s start +#sleep 1000 print =============== re-create database $x = 0 From b925342ad06a1d0433b19afb5eacd741957001d1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 6 May 2023 15:08:33 +0800 Subject: [PATCH 19/31] test: fix tsim/db/error1.sim random failed --- tests/script/tsim/db/error1.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index d275dca387..64b17125aa 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -56,7 +56,7 @@ endi if $data23 != 0 then return -1 -end +endi #print ========== stop dnode2 #system sh/exec.sh -n dnode2 -s stop -x SIGKILL From ee32620808441e640ac22b5d65769a8323553754 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 16:22:30 +0800 Subject: [PATCH 20/31] fix:[TS-3347]set ver to first version if version stored is smaller than first version in wal when subscribe db --- include/libs/executor/executor.h | 2 ++ source/dnode/vnode/src/tq/tqUtil.c | 1 + source/libs/executor/src/executor.c | 15 +++++++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1fb00e743f..b7e6c42e3b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..dba363122a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { + verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..1c87619e84 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1083,12 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // let's seek to the next version in wal file - int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } - + verifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; From 6ceda6836584c94267823179eaba27b88287ff6b Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Sat, 6 May 2023 17:42:38 +0800 Subject: [PATCH 21/31] opt selectivity function --- source/libs/executor/src/executil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b..3160925abc 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1486,7 +1486,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { + if (strcmp(pName, "_select_value") == 0) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { p = &pCtx[i]; From 13f3ca42267f42f5bda1952c2462d20ec063fea5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 18:47:29 +0800 Subject: [PATCH 22/31] fix(stream): add input queue size limitation. --- source/libs/stream/src/stream.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 047000054f..9a30bc7727 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,6 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200) int32_t streamInit() { int8_t old; @@ -295,13 +296,17 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, - pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, total); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, + pSubmitBlock->submit.ver, numOfBlocks, size); + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -309,13 +314,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || + (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); From 9392c03b7f0756f8cfe72487237d8bde76171b3e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 19:05:55 +0800 Subject: [PATCH 23/31] fix(stream): add some logs. --- source/dnode/vnode/src/inc/tq.h | 6 +- source/dnode/vnode/src/sma/smaTimeRange.c | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 82 +++++++++-------------- 4 files changed, 36 insertions(+), 56 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 30b2fb74ca..d6e4b86097 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -167,9 +167,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq); -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr); +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 6a4bddc991..3542ea9ffb 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -250,7 +250,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29daeb1eef..8d9d5c5fa4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -637,7 +637,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; + pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; int32_t ver1 = 1; SMetaInfo info = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3cb691cfb5..9373f23c02 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,23 +17,24 @@ #include "tmsg.h" #include "tq.h" -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq) { - int32_t totRow = pDataBlock->info.rows; +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr) { + int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - tqDebug("stream delete msg: row %d", totRow); + tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); - for (int32_t row = 0; row < totRow; row++) { - int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row); - int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row); + for (int32_t row = 0; row < totalRows; row++) { + int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); + int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); + char* name; void* varTbName = NULL; - if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) { + if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } @@ -43,31 +44,17 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64, - pVnode->config.vgId, groupId, name, startTs, endTs); -#if 0 - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, name) < 0) { - metaReaderClear(&mr); - tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); - taosMemoryFree(name); - continue; - } - int64_t uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFree(name); -#endif - SSingleDeleteReq req = { - .startTs = startTs, - .endTs = endTs, - }; + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, + pIdStr, groupId, name, skey, ekey); + + SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); taosMemoryFree(name); - /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ + taosArrayPush(deleteReq->deleteReqs, &req); } + return 0; } @@ -108,12 +95,7 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tlen = 0; encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); - SRpcMsg msg = { - .msgType = TDMT_VND_CREATE_TABLE, - .pCont = buf, - .contLen = tlen, - }; - + SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqError("failed to put into write-queue since %s", terrstr()); } @@ -121,13 +103,12 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; int64_t suid = pTask->tbSink.stbUid; char* stbFullName = pTask->tbSink.stbFullName; STSchema* pTSchema = pTask->tbSink.pTSchema; - /*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/ int32_t blockSz = taosArrayGetSize(pBlocks); @@ -141,11 +122,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); int32_t rows = pDataBlock->info.rows; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq deleteReq = {0}; - deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - deleteReq.suid = suid; - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + + tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { taosArrayDestroy(deleteReq.deleteReqs); continue; @@ -154,10 +135,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t len; int32_t code; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); + if (code != TSDB_CODE_SUCCESS) { + qError("s-task:%s failed to encode delete request", pTask->id.idStr); } + SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); @@ -168,11 +149,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - SRpcMsg msg = { - .msgType = TDMT_VND_BATCH_DEL, - .pCont = serializedDeleteReq, - .contLen = len + sizeof(SMsgHead), - }; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } @@ -182,6 +159,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* if (NULL == reqs.pArray) { goto _end; } + for (int32_t rowId = 0; rowId < rows; rowId++) { SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &createTbReq; @@ -204,11 +182,13 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tdDestroySVCreateTbReq(pCreateTbReq); goto _end; } + STagVal tagVal = { .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId, }; + taosArrayPush(tagArray, &tagVal); // set tag name @@ -271,7 +251,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -405,8 +385,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } else { void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = - (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value + // address copy, no value + SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { From a425f085040d1aab329d543c3336d939b67f897f Mon Sep 17 00:00:00 2001 From: dmchen Date: Sat, 6 May 2023 19:53:57 +0800 Subject: [PATCH 24/31] use wrong usedb map --- source/dnode/mnode/impl/src/mndUser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 523753d7c6..2a0d753722 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER); SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER) - useDb = taosHashIterate(pUser->writeTbs, useDb); + useDb = taosHashIterate(pUser->useDbs, useDb); } SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) From c141abfbb8945f15ab70572b32cf82615a868034 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 20:03:07 +0800 Subject: [PATCH 25/31] fix(stream): set correct msg size; --- source/libs/stream/src/stream.c | 5 +++-- source/libs/stream/src/streamData.c | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9a30bc7727..c15e014993 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -305,8 +305,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, + numOfBlocks, size); streamDataSubmitDestroy(pSubmitBlock); return -1; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..d10928cadc 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -68,7 +68,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); if (pDataSubmit == NULL) { return NULL; } From d026e7ef16a88f7d8a3cbd87f7671a1f23be90f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 May 2023 22:55:49 +0800 Subject: [PATCH 26/31] refactor: update some logs. --- source/libs/stream/src/stream.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index c15e014993..d50e01742f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -305,7 +305,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr, + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); streamDataSubmitDestroy(pSubmitBlock); @@ -316,10 +316,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || - (numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + size); return -1; } From 31ac1e3eaa801bbc778f1897fcd73c78dccae458 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 May 2023 00:00:55 +0800 Subject: [PATCH 27/31] fix(stream): revise the input queue capacity. --- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamData.c | 7 ++++++- source/libs/stream/src/streamExec.c | 5 ++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d50e01742f..9a39bb09dc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -17,7 +17,7 @@ #include "ttimer.h" #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (200) +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) int32_t streamInit() { int8_t old; @@ -299,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - qDebug("s-task:%s submit enqueue %p %p msgLen:%dB ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index d10928cadc..7fb35ad2ad 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -128,7 +128,12 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) } SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + int32_t len = 0; + if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + len = pSubmit->submit.msgLen; + } + + SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c1d25da86d..93ee3916ba 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -173,8 +173,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { batchCnt++; - qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz); - + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); if (batchCnt >= batchSz) { break; } @@ -207,7 +206,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); + qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); streamDispatch(pTask); } From a7691aba9c79b5f4f5f2091f6f0671a46d60cecf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 May 2023 00:24:00 +0800 Subject: [PATCH 28/31] fix(stream): set the correct queue item size. --- source/util/src/tqueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index e2f1d7c033..767b0d7a24 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -185,7 +185,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { queue->tail = pNode; } queue->numOfItems++; - queue->memOfItems += pNode->size; + queue->memOfItems += (pNode->size + pNode->dataSize); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); From fc8cab6fe1c9ca2e00eb7c667fafeadcafa695a8 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 8 May 2023 09:53:27 +0800 Subject: [PATCH 29/31] recover setSelectValueColumnInfo --- source/libs/executor/src/executil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 3160925abc..c51dc39b5b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1486,7 +1486,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if (strcmp(pName, "_select_value") == 0) { + if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { p = &pCtx[i]; From 8617af2a5580d9a67b1a3e96a66c84be43ee867d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 8 May 2023 07:43:44 +0000 Subject: [PATCH 30/31] fix queue failure --- source/util/src/tqueue.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 767b0d7a24..81350dddd2 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) { bool empty = false; taosThreadMutexLock(&queue->mutex); - if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { + if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) { empty = true; } taosThreadMutexUnlock(&queue->mutex); @@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { pNode->next = NULL; taosThreadMutexLock(&queue->mutex); - if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { + if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) { code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, queue->memLimit, tstrerror(code)); @@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, @@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; // queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, From b5fdaa327d9a9c3e519c932fd037488bc4616ffe Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 9 May 2023 07:00:29 +0000 Subject: [PATCH 31/31] avoid deadlock --- source/libs/stream/src/streamMeta.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index abbf016db3..28a345c766 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -198,7 +198,7 @@ int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { size_t size = taosHashGetSize(pMeta->pTasks); ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); - return (int32_t) size; + return (int32_t)size; } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { @@ -234,7 +234,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { if (ppTask) { SStreamTask* pTask = *ppTask; - taosWLockLatch(&pMeta->lock); + // taosWLockLatch(&pMeta->lock); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); @@ -242,7 +242,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); int32_t num = taosArrayGetSize(pMeta->pTaskList); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (*pTaskId == taskId) { taosArrayRemove(pMeta->pTaskList, i);