diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1fb00e743f..b7e6c42e3b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ec9eb54174..95b2f94f3f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -278,6 +278,7 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; int8_t schedStatus; + int8_t keepTaskStatus; } SStreamStatus; struct SStreamTask { @@ -542,6 +543,7 @@ int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); +bool streamTaskShouldPause(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index d3e2bbb1be..09198aa038 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -138,6 +138,8 @@ typedef struct { int8_t enableRef; } SWalFilterCond; +typedef struct SWalReader SWalReader; + // todo hide this struct typedef struct SWalReader { SWal *pWal; @@ -193,9 +195,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); +int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +int64_t walReaderGetValidFirstVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 523753d7c6..2a0d753722 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER); SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER) - useDb = taosHashIterate(pUser->writeTbs, useDb); + useDb = taosHashIterate(pUser->useDbs, useDb); } SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 30b2fb74ca..d6e4b86097 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -167,9 +167,9 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq); -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr); +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 6a4bddc991..3542ea9ffb 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -250,7 +250,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * if (pDataBlock->info.type == STREAM_DELETE_RESULT) { pDeleteReq->suid = suid; pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); + tqBuildDeleteReq(stbFullName, pDataBlock, pDeleteReq, ""); continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 92432d1ca2..bbdfd715b5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -169,7 +169,7 @@ void tqNotifyClose(STQ* pTq) { int64_t st = taosGetTimestampMs(); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); int64_t el = taosGetTimestampMs() - st; - tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); + tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el); } taosWUnLockLatch(&pTq->pStreamMeta->lock); @@ -637,7 +637,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; + pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; int32_t ver1 = 1; SMetaInfo info = {0}; @@ -821,17 +821,19 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr); + tqDebug("s-task:%s start non-blocking recover stage(step 1) scan", pTask->id.idStr); int64_t st = taosGetTimestampMs(); streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); // build msg to launch next step SStreamRecoverStep2Req req; @@ -842,7 +844,6 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -852,13 +853,14 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { + tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr); return -1; } memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + tqDebug("s-task:%s step 1 finished, send msg to start blocking recover stage(step 2)", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -870,12 +872,16 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t int32_t code = 0; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } // do recovery step 2 + int64_t st = taosGetTimestampMs(); + tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -895,12 +901,16 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } // set status normal + tqDebug("s-task:%s blocking stage completed, set the status to be normal", pTask->id.idStr); code = streamSetStatusNormal(pTask); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } + double el = (taosGetTimestampMs() - st)/ 1000.0; + tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); + // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); if (code < 0) { @@ -912,7 +922,6 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; } @@ -1178,6 +1187,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); streamMetaReleaseTask(pTq->pStreamMeta, pTask); } @@ -1188,7 +1198,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { - streamSetStatusNormal(pTask); + atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version if (pReq->igUntreated) { // discard all the data when the stream task is suspended. @@ -1325,12 +1335,12 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed restore stream tasks, code:%s", vgId, terrstr()); + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } - tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = WAL_READ_TASKS_ID; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 64062bac20..e13c0288be 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -295,7 +295,7 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c051650436..800bcf2469 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -105,20 +105,20 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr); + tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || - status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { + status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if (tInputQueueIsFull(pTask)) { - tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr); + tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -126,17 +126,37 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - streamMetaReleaseTask(pStreamMeta, pTask); - continue; + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); + + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != pTask->chkInfo.currentVer) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } } - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); - SPackedData packData = {0}; - code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3cb691cfb5..9373f23c02 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,23 +17,24 @@ #include "tmsg.h" #include "tq.h" -int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, - SBatchDeleteReq* deleteReq) { - int32_t totRow = pDataBlock->info.rows; +int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, + const char* pIdStr) { + int32_t totalRows = pDataBlock->info.rows; SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - tqDebug("stream delete msg: row %d", totRow); + tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); - for (int32_t row = 0; row < totRow; row++) { - int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row); - int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row); + for (int32_t row = 0; row < totalRows; row++) { + int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row); + int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); + char* name; void* varTbName = NULL; - if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) { + if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) { varTbName = colDataGetVarData(pTbNameCol, row); } @@ -43,31 +44,17 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64, - pVnode->config.vgId, groupId, name, startTs, endTs); -#if 0 - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, name) < 0) { - metaReaderClear(&mr); - tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); - taosMemoryFree(name); - continue; - } - int64_t uid = mr.me.uid; - metaReaderClear(&mr); - taosMemoryFree(name); -#endif - SSingleDeleteReq req = { - .startTs = startTs, - .endTs = endTs, - }; + tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, + pIdStr, groupId, name, skey, ekey); + + SSingleDeleteReq req = { .startTs = skey, .endTs = ekey}; strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1); taosMemoryFree(name); - /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ + taosArrayPush(deleteReq->deleteReqs, &req); } + return 0; } @@ -108,12 +95,7 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tlen = 0; encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); - SRpcMsg msg = { - .msgType = TDMT_VND_CREATE_TABLE, - .pCont = buf, - .contLen = tlen, - }; - + SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqError("failed to put into write-queue since %s", terrstr()); } @@ -121,13 +103,12 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { return TSDB_CODE_SUCCESS; } -void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { +void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; int64_t suid = pTask->tbSink.stbUid; char* stbFullName = pTask->tbSink.stbFullName; STSchema* pTSchema = pTask->tbSink.pTSchema; - /*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/ int32_t blockSz = taosArrayGetSize(pBlocks); @@ -141,11 +122,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); int32_t rows = pDataBlock->info.rows; + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq deleteReq = {0}; - deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - deleteReq.suid = suid; - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; + + tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr); if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { taosArrayDestroy(deleteReq.deleteReqs); continue; @@ -154,10 +135,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t len; int32_t code; tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); + if (code != TSDB_CODE_SUCCESS) { + qError("s-task:%s failed to encode delete request", pTask->id.idStr); } + SEncoder encoder; void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); @@ -168,11 +149,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - SRpcMsg msg = { - .msgType = TDMT_VND_BATCH_DEL, - .pCont = serializedDeleteReq, - .contLen = len + sizeof(SMsgHead), - }; + SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } @@ -182,6 +159,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* if (NULL == reqs.pArray) { goto _end; } + for (int32_t rowId = 0; rowId < rows; rowId++) { SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &createTbReq; @@ -204,11 +182,13 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tdDestroySVCreateTbReq(pCreateTbReq); goto _end; } + STagVal tagVal = { .cid = pTSchema->numOfCols + 1, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = (int64_t)pDataBlock->info.id.groupId, }; + taosArrayPush(tagArray, &tagVal); // set tag name @@ -271,7 +251,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -405,8 +385,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } else { void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = - (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value + // address copy, no value + SValue sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d83345ad59..e3bde14b6d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { + verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d0a0ea7947..b7ec7bb5c5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3164,6 +3164,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + if (pReader->code != TSDB_CODE_SUCCESS) { + return pReader->code; + } + pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); if (pScanInfo == NULL) { return terrno; @@ -5078,6 +5082,11 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + + if (pReader->code != TSDB_CODE_SUCCESS) { + return NULL; + } + STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { return NULL; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2d5830e4a9..8bbbd3524d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1084,12 +1092,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // let's seek to the next version in wal file - int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } - + verifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 56ce0ac0c0..27f9a1ac28 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2251,8 +2251,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; - qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, - pEnryInfo->isNullRes, pEnryInfo->numOfRes); + if (pCtx[j].fpSet.finalize) { int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code1)) { @@ -2274,6 +2273,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pBlock->info.rows += pRow->numOfRows; } + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1fa3dc3354..9a39bb09dc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,8 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 102400 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) int32_t streamInit() { int8_t old; @@ -52,7 +53,7 @@ void streamCleanUp() { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); return; } @@ -216,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId); // todo add the input queue buffer limitation @@ -253,6 +254,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(0); return 0; } + // continue dispatch streamDispatch(pTask); return 0; @@ -294,13 +296,18 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, - pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, - pSubmitBlock->submit.ver, total); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, + pSubmitBlock->submit.ver, numOfBlocks, size); + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, + numOfBlocks, size); streamDataSubmitDestroy(pSubmitBlock); return -1; } @@ -308,13 +315,18 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { - qError("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + size); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ae616260f3..7fb35ad2ad 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -68,7 +68,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); if (pDataSubmit == NULL) { return NULL; } @@ -128,7 +128,12 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) } SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); + int32_t len = 0; + if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + len = pSubmit->submit.msgLen; + } + + SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d12eca7ce3..a757d39d3f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -270,8 +271,12 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { - goto FAIL; + if (buf) { + rpcFreeCont(buf); + } + return code; } + tEncoderClear(&encoder); msg.contLen = tlen + sizeof(SMsgHead); @@ -280,13 +285,10 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - - qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId); + qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr, + pReq->taskId, vgId); return 0; -FAIL: - if (buf) rpcFreeCont(buf); - return code; } int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { @@ -407,13 +409,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, + qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, pTask->selfChildId, blockNum, downstreamTaskId, vgId); if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { goto FAIL_FIXED_DISPATCH; } + code = 0; + FAIL_FIXED_DISPATCH: taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); @@ -427,6 +431,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t vgSz = taosArrayGetSize(vgInfo); SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -442,6 +447,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { goto FAIL_SHUFFLE_DISPATCH; } + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); pReqs[i].taskId = pVgInfo->taskId; } @@ -468,43 +474,52 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, + blockNum, vgSz); + for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { - // send SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, + pReqs[i].blockNum, pVgInfo->vgId); + if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } } + code = 0; + FAIL_SHUFFLE_DISPATCH: - if (pReqs) { - for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroyP(pReqs[i].data, taosMemoryFree); - taosArrayDestroy(pReqs[i].dataLen); - } - taosMemoryFree(pReqs); + for (int32_t i = 0; i < vgSz; i++) { + taosArrayDestroyP(pReqs[i].data, taosMemoryFree); + taosArrayDestroy(pReqs[i].dataLen); } - return code; + taosMemoryFree(pReqs); } - return 0; + return code; } int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr, - taosQueueItemSize(pTask->outputQueue->queue)); + + int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); + if (numOfElems > 0) { + qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, + numOfElems); + } int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { + qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); return 0; } SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { - qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr); + qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } @@ -516,10 +531,8 @@ int32_t streamDispatch(SStreamTask* pTask) { code = -1; streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - goto FREE; } -FREE: taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); return code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fda2603432..78a1f3e967 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,13 +15,19 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 20480 +#define MAX_STREAM_EXEC_BATCH_NUM 10240 +#define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } +bool streamTaskShouldPause(const SStreamStatus* pStatus) { + int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + return (status == TASK_STATUS__PAUSE); +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -141,7 +147,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { taosArrayDestroy(pRes); return 0; } @@ -167,8 +173,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { batchCnt++; - qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz); - + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz); if (batchCnt >= batchSz) { break; } @@ -201,7 +206,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); + qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); streamDispatch(pTask); } @@ -255,6 +260,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 1; void* pInput = NULL; + int16_t times = 0; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); @@ -262,6 +268,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { + times++; + taosMsleep(1); + qDebug("===stream===try agian batchSize:%d", batchSize); + continue; + } + break; } @@ -280,7 +293,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { break; } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f2d5b95f35..b278998e80 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -54,7 +54,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); + pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { goto _err; } @@ -251,11 +251,13 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { + taosWLockLatch(&pMeta->lock); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; - taosWLockLatch(&pMeta->lock); + // taosWLockLatch(&pMeta->lock); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); @@ -273,8 +275,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } streamMetaReleaseTask(pMeta, pTask); - taosWUnLockLatch(&pMeta->lock); } + + taosWUnLockLatch(&pMeta->lock); } int32_t streamMetaBegin(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0324580885..1fd7f77c55 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -225,7 +225,7 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; - qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); + qDebug("s-task:%s recover step2 (blocking stage) started", pTask->id.idStr); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } @@ -233,12 +233,13 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { - SStreamRecoverFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->selfChildId, - }; + SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId }; + // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); + req.taskId = pTask->fixedEpDispatcher.taskId; streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 437367c4b3..fee2f2ce58 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -25,7 +25,7 @@ #include "tref.h" #include "ttimer.h" -#define MAX_TABLE_NAME_NUM 100000 +#define MAX_TABLE_NAME_NUM 2000000 int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4cc43a19a0..f7ada8be84 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; @@ -183,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } + if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner if (walReadChangeFile(pReader, pRet->firstVer) < 0) { @@ -202,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return 0; } -int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { +int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); @@ -232,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); if (pRead->curVersion != fetchVer) { - if (walReadSeekVer(pRead, fetchVer) < 0) { + if (walReaderSeekVer(pRead, fetchVer) < 0) { return -1; } seeked = true; @@ -336,7 +338,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); + code = walReaderSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; // pRead->curInvalid = 1; @@ -471,7 +473,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReadSeekVer(pReader, ver) < 0) { + if (walReaderSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index e2f1d7c033..81350dddd2 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -78,7 +78,7 @@ bool taosQueueEmpty(STaosQueue *queue) { bool empty = false; taosThreadMutexLock(&queue->mutex); - if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { + if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) { empty = true; } taosThreadMutexUnlock(&queue->mutex); @@ -162,7 +162,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { pNode->next = NULL; taosThreadMutexLock(&queue->mutex); - if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { + if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) { code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, queue->memLimit, tstrerror(code)); @@ -185,7 +185,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { queue->tail = pNode; } queue->numOfItems++; - queue->memOfItems += pNode->size; + queue->memOfItems += (pNode->size + pNode->dataSize); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); @@ -208,7 +208,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, @@ -413,7 +413,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; // queue->numOfItems--; - queue->memOfItems -= pNode->size; + queue->memOfItems -= (pNode->size + pNode->dataSize); atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1, diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index 32dbe826cc..64b17125aa 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -58,16 +58,16 @@ if $data23 != 0 then return -1 endi -print ========== stop dnode2 -system sh/exec.sh -n dnode2 -s stop -x SIGKILL +#print ========== stop dnode2 +#system sh/exec.sh -n dnode2 -s stop -x SIGKILL -sleep 1000 -print =============== drop database -sql_error drop database d1 +#sleep 1000 +#print =============== drop database +sql drop database d1 -print ========== start dnode2 -system sh/exec.sh -n dnode2 -s start -sleep 1000 +#print ========== start dnode2 +#system sh/exec.sh -n dnode2 -s start +#sleep 1000 print =============== re-create database $x = 0 diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index e1e2a67a22..fa7be19310 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -190,6 +190,46 @@ if $data11 != 1 then goto loop2 endi +sql pause stream streams2; + +sql insert into t1 values(1648791223003,2,2,3,1.1); +sql insert into t1 values(1648791233003,2,2,3,1.1); + +sql resume stream IGNORE UNTREATED streams2; + +$loop_count = 0 + +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 4 select * from streamt2; +sql select * from streamt2; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop3 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop3 +endi + +if $data11 != 1 then + print =====data01=$data01 + goto loop3 +endi + + print ===== step 2 over