diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 93398d337d..7564d91330 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -231,10 +231,9 @@ void destroyTscObj(void *pObj) { tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); - int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - if (0 == connNum) { - destroyAppInst(pTscObj->pAppInfo); - } + // In any cases, we should not free app inst here. Or an race condition rises. + /*int64_t connNum = */atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5dd9293118..a4581f5472 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -214,6 +214,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); +static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -2477,8 +2478,39 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { - setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); - break; + + int32_t nextIndex = -1; + SBlockIndex bIndex = {0}; + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); + if (!hasNeighbor) { // do nothing + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } + + if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + + // 1. find the next neighbor block in the scan block list + SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex}; + int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb); + + // 2. remove it from the scan block list + setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step); + + // 3. load the neighbor block, and set it to be the currently accessed file data block + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); + if (code != TSDB_CODE_SUCCESS) { + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } + + // 4. check the data values + initBlockDumpInfo(pReader, pBlockIter); + } else { + setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); + break; + } } } } @@ -2888,7 +2920,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } // set the correct start position in case of the first/last file block, according to the query time window -static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { +void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { SDataBlk* pBlock = getCurrentBlock(pBlockIter); SReaderStatus* pStatus = &pReader->status; @@ -4055,6 +4087,31 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64 } } + +static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, + SColumnDataAgg* pTsAgg) { + // do fill all null column value SMA info + int32_t i = 0, j = 0; + int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg); + taosArrayInsert(pSup->pColAgg, 0, pTsAgg); + + while (j < numOfCols && i < size) { + SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); + if (pAgg->colId == pSup->colId[j]) { + i += 1; + j += 1; + } else if (pAgg->colId < pSup->colId[j]) { + i += 1; + } else if (pSup->colId[j] < pAgg->colId) { + if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; + taosArrayInsert(pSup->pColAgg, i ,&nullColAgg); + } + j += 1; + } + } +} + int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockSMA, bool* allHave) { int32_t code = 0; *allHave = false; @@ -4110,6 +4167,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); } + // do fill all null column value SMA info + doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg); + + i = 0, j = 0; while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); if (pAgg->colId == pSup->colId[j]) { @@ -4119,15 +4180,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - if (pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID) { - pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; - } else { - // all date in this block are null - SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = pBlock->nRow}; - taosArrayPush(pSup->pColAgg, &nullColAgg); - - pResBlock->pBlockAgg[pSup->slotId[j]] = taosArrayGetLast(pSup->pColAgg); - } + ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); + pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; j += 1; } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bd22e864cd..043cc396b5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1580,6 +1580,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { if (pOperator->blocking) { ASSERT(0); + return 0; } else { return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 04f5f4ecfe..a0f11e9a47 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3032,8 +3032,10 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* if (pSupp->dbNameSlotId != -1) { ASSERT(strlen(dbName)); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); - char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - strncpy(varDataVal(varDbName), dbName, strlen(dbName)); + + char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN); + varDataSetLen(varDbName, strlen(dbName)); colDataAppend(colInfoData, 0, varDbName, false); } @@ -3042,7 +3044,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId); if (strlen(stbName) != 0) { char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - strncpy(varDataVal(varStbName), stbName, strlen(stbName)); + strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN); varDataSetLen(varStbName, strlen(stbName)); colDataAppend(colInfoData, 0, varStbName, false); } else {