diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index bc204e032d..c36207e495 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -240,7 +240,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); -int32_t tdRSmaProcessExecImpl(SSma *pSma); +int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 77b18b8c02..900d29b97e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -65,6 +65,7 @@ struct SVBufPool { SVBufPool* next; SVnode* pVnode; volatile int32_t nRef; + TdThreadSpinlock lock; int64_t size; uint8_t* ptr; SVBufPoolNode* pTail; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 807c033489..101fca3346 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -297,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { } /** - * @brief Rsma async commit implementation + * @brief Rsma async commit implementation(only do some necessary light weighted task) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) - * 3) * * @param pSma * @return int32_t @@ -334,12 +333,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } - // step 3: consume the SubmitReq in buffer - if (tdRSmaProcessExecImpl(pSma) < 0) { - return TSDB_CODE_FAILED; - } - - // step 4: swap rsmaInfoHash and iRsmaInfoHash + // step 3: swap queue/qall and iQueue/iQal // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); @@ -379,9 +373,32 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } - // perform persist task for qTaskInfo SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + + // step 1: consume the SubmitReq in buffer + int32_t nLoops = 0; + smaDebug("vgId:%d start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + while (pRSmaStat->execStat == 1) { + taosMsleep(15); + if ((++nLoops & 63) == 0) { + smaWarn("vgId:%d 1s waited for rsma exec stat = 0, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + sched_yield(); + } + } + pRSmaStat->execStat = 1; + smaDebug("vgId:%d end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + + if (tdRSmaProcessExecImpl(pSma, 1) < 0) { + pRSmaStat->execStat = 0; + return TSDB_CODE_FAILED; + } + + // step 2: perform persist task for qTaskInfo operator + if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { + pRSmaStat->execStat = 0; + return TSDB_CODE_FAILED; + } + pRSmaStat->execStat = 0; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4ffd6479f5..de4b7dd808 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,9 +15,9 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE 32768 +#define RSMA_QTASKINFO_BUFSIZE (32768) #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZ 10 * 1048576 // 8 MB +#define RSMA_QTASKEXEC_BUFSIZ (1048576) SSmaMgmt smaMgmt = { .inited = 0, @@ -35,7 +35,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUi static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, - tb_uid_t suid, int8_t level); + int8_t type, int8_t level); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, @@ -600,17 +600,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { return 0; } -static void tdDestroySDataBlockArray(SArray *pArray) { - // TODO -#if 0 - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SSDataBlock *pDataBlock = taosArrayGet(pArray, i); - blockDestroyInner(pDataBlock); - } -#endif - taosArrayDestroy(pArray); -} - /** * @brief retention of rsma1/rsma2 * @@ -668,8 +657,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } - -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -731,11 +719,45 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu taosWriteQitem(pInfo->queue, qItem); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_add_64(&pRSmaStat->qBufSize, taosQueueMemorySize(pInfo->queue)); + int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); + + // smoothing consume + int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZ; + if (n > 1) { + if (n > 10) { + n = 10; + } + taosMsleep(n << 4); + if (n > 2) { + smaWarn("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + } else { + smaDebug("vgId:%d pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + } + } return TSDB_CODE_SUCCESS; } +static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) { + SSubmitMsgIter msgIter = {0}; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1; + while (true) { + SSubmitBlk *pBlock = NULL; + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); + while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { + smaDebug("vgId:%d numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64, + SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts); + } + } + return 0; +} + /** * @brief sync mode * @@ -744,32 +766,42 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu * @param msgSize * @param inputType * @param pInfo - * @param suid + * @param type * @param level * @return int32_t */ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, - tb_uid_t suid, int8_t level) { + int8_t type, int8_t level) { int32_t idx = level - 1; - if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) { - smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + + void *qTaskInfo = (type == 0) ? RSMA_INFO_QTASK(pInfo, idx) : RSMA_INFO_IQTASK(pInfo, idx); + if (!qTaskInfo) { + smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, + pInfo->suid); return TSDB_CODE_SUCCESS; } if (!pInfo->pTSchema) { - smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); + smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); return TSDB_CODE_FAILED; } smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, - RSMA_INFO_QTASK(pInfo, idx), suid); + RSMA_INFO_QTASK(pInfo, idx), pInfo->suid); - if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, msgSize, inputType) < 0) { // INPUT__DATA_SUBMIT +#if 0 + for (int32_t i = 0; i < msgSize; ++i) { + SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *)); + smaDebug("vgId:%d [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version); + tdRsmaPrintSubmitReq(pSma, pReq); + } +#endif + if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid); + tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { @@ -858,6 +890,8 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; } + } else { + ASSERT(0); } tdReleaseRSmaInfo(pSma, pRSmaInfo); @@ -896,9 +930,18 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { int64_t bufSize = atomic_load_64(&pRsmaStat->qBufSize); if ((pRsmaStat->execStat == 1) || (bufSize < RSMA_QTASKEXEC_BUFSIZ)) { + if (bufSize > RSMA_QTASKEXEC_BUFSIZ) { + smaDebug("vgId:%d bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), + bufSize); + } else { + smaDebug("vgId:%d bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, + RSMA_QTASKEXEC_BUFSIZ); + } return TSDB_CODE_SUCCESS; } + smaDebug("vgId:%d bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); + pRsmaStat->execStat = 1; SRSmaExecMsg fetchMsg; @@ -1633,6 +1676,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { goto _err; } +#if 0 pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); if (!pInfo) { if (terrno == TSDB_CODE_SUCCESS) { @@ -1657,12 +1701,13 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { tdCleanupStreamInputDataBlock(taskInfo); tdReleaseRSmaInfo(pSma, pInfo); +#endif tDecoderClear(&decoder); smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, req.level); return TSDB_CODE_SUCCESS; _err: - tdReleaseRSmaInfo(pSma, pInfo); + // tdReleaseRSmaInfo(pSma, pInfo); tDecoderClear(&decoder); smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; @@ -1674,7 +1719,14 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { } } -int32_t tdRSmaProcessExecImpl(SSma *pSma) { +/** + * @brief + * + * @param pSma + * @param type 0 triggered when buffer overflow, 1 triggered by commit + * @return int32_t + */ +int32_t tdRSmaProcessExecImpl(SSma *pSma, int8_t type) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; @@ -1686,12 +1738,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { goto _err; } - taosRLockLatch(SMA_ENV_LOCK(pEnv)); - if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) { + if (type == 0) { + taosRLockLatch(SMA_ENV_LOCK(pEnv)); + if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZ) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return TSDB_CODE_SUCCESS; + } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return TSDB_CODE_SUCCESS; } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1706,18 +1760,33 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { SRSmaExecQItem qItem = {0}; taosWLockLatch(SMA_ENV_LOCK(pEnv)); void *pIter = taosHashIterate(infoHash, NULL); - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->queue)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); - qItem.qall = &pInfo->qall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); + if (type == 0) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->queue)) { + taosReadAllQitems(pInfo->queue, pInfo->qall); + qItem.qall = &pInfo->qall; + qItem.pRSmaInfo = pIter; + taosArrayPush(pSubmitQArr, &qItem); + } + ASSERT(taosQueueItemSize(pInfo->queue) == 0); + pIter = taosHashIterate(infoHash, pIter); } - ASSERT(taosQueueItemSize(pInfo->queue) == 0); - pIter = taosHashIterate(infoHash, pIter); + } else if (type == 1) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->iQueue)) { + taosReadAllQitems(pInfo->iQueue, pInfo->iQall); + qItem.qall = &pInfo->iQall; + qItem.pRSmaInfo = pIter; + taosArrayPush(pSubmitQArr, &qItem); + } + ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); + pIter = taosHashIterate(infoHash, pIter); + } + } else { + ASSERT(0); } - atomic_store_64(&pRSmaStat->qBufSize, 0); taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -1739,12 +1808,16 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma) { int32_t size = taosArrayGetSize(pSubmitArr); if (size > 0) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, pInfo->suid, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + if (type == 0 || type == 1) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } } + } else { + ASSERT(0); } tdFreeRSmaSubmitItems(pSubmitArr); taosArrayClear(pSubmitArr); @@ -1775,16 +1848,17 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) { terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; goto _err; } - - if (tdRSmaProcessExecImpl(pSma) < 0) { + smaDebug("vgId:%d, begin to process rsma exec msg by thread:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaProcessExecImpl(pSma, 0) < 0) { goto _err; } pRsmaStat->execStat = 0; - smaWarn("vgId:%d, success to process rsma exec msg", SMA_VID(pSma)); + smaDebug("vgId:%d, success to process rsma exec msg by thead:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); return TSDB_CODE_SUCCESS; _err: pRsmaStat->execStat = 0; - smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); + smaError("vgId:%d, failed to process rsma fetch msg by thread:%p since %s", SMA_VID(pSma), + (void *)taosGetSelfPthreadId(), terrstr()); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 0623b3bd10..5a22114ab4 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { SVBufPoolNode *pNode; void *p; - + taosThreadSpinLock(&pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadSpinUnlock(&pPool->lock); return NULL; } @@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - + taosThreadSpinUnlock(&pPool->lock); return p; } @@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } + if (taosThreadSpinInit(&pPool->lock, 0) != 0) { + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pPool->next = NULL; pPool->pVnode = pVnode; pPool->nRef = 0; @@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); + taosThreadSpinDestroy(&pPool->lock); taosMemoryFree(pPool); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index c8dc07af0a..2f5169a0ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,9 +220,6 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - pVnode->state.commitTerm = pVnode->state.applyTerm; // save info @@ -248,7 +245,7 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - smaAsyncCommit(pVnode->pSma); + smaAsyncCommit(pVnode->pSma); // would write L2/L3 data into BufPool if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); @@ -268,6 +265,9 @@ int vnodeCommit(SVnode *pVnode) { return -1; } } + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; if (tqCommit(pVnode->pTq) < 0) { ASSERT(0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 32d564d502..adb79fc6ad 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1174,6 +1174,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; + bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; @@ -1183,7 +1184,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } - bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); + // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) &&