diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 59a9f32642..e1578b37f1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -171,11 +171,11 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - if (pRsp->blockNum > 0) { - ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); - } else { - ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); - } + /*if (pRsp->blockNum > 0) {*/ + /*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/ + /*} else {*/ + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + /*}*/ } int32_t len = 0; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index bcdac1941a..4083a1a0ae 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -245,7 +245,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); continue; } - if (pPushEntry->dataRsp.reqOffset.version > ver) { + if (pPushEntry->dataRsp.reqOffset.version >= ver) { tqDebug("vgId:%d push entry req version %ld, while push version %ld, skip", pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); continue; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f9e47f92fe..d6e0de0eb1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -479,6 +479,7 @@ typedef struct SStreamScanInfo { SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SExprSupp tbnameCalSup; + SExprSupp tagCalSup; int32_t primaryTsIndex; // primary time stamp slot id SReadHandle readHandle; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. @@ -775,6 +776,7 @@ typedef struct SStreamPartitionOperatorInfo { SPartitionBySupporter partitionSup; SExprSupp scalarSup; SExprSupp tbnameCalSup; + SExprSupp tagCalSup; SHashObj* pPartitions; void* parIte; SSDataBlock* pInputDataBlock; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f3ae652987..b0bb9e42b9 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -991,6 +991,8 @@ static void destroyStreamPartitionOperatorInfo(void* param) { taosMemoryFree(pInfo->partitionSup.keyBuf); cleanupExprSupp(&pInfo->scalarSup); + cleanupExprSupp(&pInfo->tbnameCalSup); + cleanupExprSupp(&pInfo->tagCalSup); blockDataDestroy(pInfo->pDelRes); taosMemoryFreeClear(param); } @@ -1037,6 +1039,19 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } } + if (pPartNode->pTags != NULL) { + int32_t numOfTags; + SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags); + if (pTagExpr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + } + int32_t keyLen = 0; code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupCols); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c20bb95811..7ede06e6ca 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1311,10 +1311,15 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL); + projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL); ASSERT(pResBlock->info.rows == 1); // build tagArray + /*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/ + /*STagVal tagVal = {*/ + /*.cid = 0,*/ + /*.type = 0,*/ + /*};*/ // build STag // set STag @@ -2110,6 +2115,9 @@ static void destroyStreamScanOperatorInfo(void* param) { taosMemoryFree(pStreamScan->pPseudoExpr); } + cleanupExprSupp(&pStreamScan->tbnameCalSup); + cleanupExprSupp(&pStreamScan->tagCalSup); + updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); @@ -2164,6 +2172,19 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } } + if (pTableScanNode->pTags != NULL) { + int32_t numOfTags; + SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags); + if (pTagExpr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + } + pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index e49a963191..144cce2cd0 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -351,7 +351,10 @@ int walCheckAndRepairIdx(SWal* pWal) { taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); break; } - taosLSeekFile(pIdxFile, offset, SEEK_SET); + if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d cannot seek offset %ld when repair idx since %s", pWal->cfg.vgId, offset, terrstr()); + } int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -636,6 +639,5 @@ int walRemoveMeta(SWal* pWal) { if (metaVer == -1) return 0; char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer, fnameStr); - taosRemoveFile(fnameStr); - return 0; + return taosRemoveFile(fnameStr); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c4f1a81eeb..b2cd4fac11 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -514,7 +514,6 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { pReader->pHead->head.version, ver); pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); taosThreadMutexUnlock(&pReader->mutex); return -1; } @@ -528,7 +527,6 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum); pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); taosThreadMutexUnlock(&pReader->mutex); return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 628b432446..e7161079d9 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -42,10 +42,20 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i); char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); + if (taosRemoveFile(fnameStr) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); + return -1; + } + wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); + if (taosRemoveFile(fnameStr) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr()); + return -1; + } + wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr); } } walRemoveMeta(pWal);