diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 20c4fa64c4..fd56b5f5ae 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1332,6 +1332,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock); + uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock); taosMemoryFreeClear(pBlock->info.pks[0].pData); taosMemoryFreeClear(pBlock->info.pks[1].pData); } @@ -1483,6 +1485,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SSDataBlock* pBlock = createDataBlock(); pBlock->info = pDataBlock->info; + pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; @@ -1503,11 +1506,17 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); pVal->nData = pDataBlock->info.pks[0].nData; + memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData); - pVal = &pBlock->info.pks[1]; - pVal->type = pDataBlock->info.pks[1].type; - pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); - pVal->nData = pDataBlock->info.pks[1].nData; + SValue* p = &pBlock->info.pks[1]; + p->type = pDataBlock->info.pks[1].type; + p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + p->nData = pDataBlock->info.pks[1].nData; + memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); + uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); + uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); + } else { + uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 54d5c54788..b7f97771da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -119,7 +119,7 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { return ret > 0 ? 1 : -1; } } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + return p1->pks[0].val - p2->pks[0].val; } } } @@ -179,9 +179,8 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { } if (IS_VAR_DATA_TYPE(indices[i].type)) { - tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); - pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); - pKey->pks[i].pData += pKey->pks[i].nData; + tdata += tGetU32v(tdata, &pKey->pks[i].nData); + memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); } else { memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } @@ -396,14 +395,18 @@ _err: return code; } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) { +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp) { + return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); +} + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { - clearDataBlockIterator(pIter, hasPk); + clearDataBlockIterator(pIter, needFree); } } @@ -821,18 +824,13 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S return TSDB_CODE_SUCCESS; } -// todo keep the the last returned key static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { -// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; pDumpInfo->allDumped = true; -// ASSERT(0); -// pDumpInfo->lastKey.key.ts = maxKey + step; } static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, bool asc) { pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = numOfPks; if (pKey->numOfPKs <= 0) { return; @@ -842,7 +840,7 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; } else { uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; - pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; + pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); } } @@ -2838,11 +2836,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; } else { - memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); - memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); + memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); - pInfo->pks[0].nData = pBlockInfo->firstPKLen; - pInfo->pks[1].nData = pBlockInfo->lastPKLen; + pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); + pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); } } @@ -3203,7 +3201,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); } @@ -3313,7 +3311,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -4158,7 +4156,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { @@ -4362,7 +4360,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pSupInfo->colId); tBlockDataDestroy(&pReader->status.fileBlockData); - cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0); + cleanupDataBlockIterator(&pReader->status.blockIter, shouldFreePkBuf(&pReader->suppInfo)); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (pReader->status.pTableMap != NULL) { @@ -5038,7 +5036,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0); + resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index c82363c921..ae8a6466ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -446,17 +446,17 @@ void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; + } else { + char* p = taosMemoryCalloc(1, pFirstKey->pks[0].nData + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), pFirstKey->pks[0].pData, pFirstKey->pks[0].nData); + varDataSetLen(p, pFirstKey->pks[0].nData); + pBlockInfo->firstPk.pData = (uint8_t*)p; - pBlockInfo->firstPKLen = 0; - pBlockInfo->lastPKLen = 0; - } else { // todo handle memory alloc error, opt memory alloc perf - pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; - pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); - memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); - - pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; - pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); - memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + int32_t keyLen = record->lastKey.key.pks[0].nData; + p = taosMemoryCalloc(1, keyLen + VARSTR_HEADER_SIZE); + memcpy(varDataVal(p), record->lastKey.key.pks[0].pData, keyLen); + varDataSetLen(p, keyLen); + pBlockInfo->lastPk.pData = (uint8_t*)p; } } } @@ -467,21 +467,21 @@ static void freePkItem(void* pItem) { taosMemoryFreeClear(p->lastPk.pData); } -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayClearEx(pIter->blockList, freePkItem); } else { taosArrayClear(pIter->blockList); } } -void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) { +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) { pIter->index = -1; pIter->numOfBlocks = 0; - if (hasPk) { + if (needFree) { taosArrayDestroyEx(pIter->blockList, freePkItem); } else { taosArrayDestroy(pIter->blockList); @@ -492,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; - clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0); + clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo)); pBlockIter->numOfBlocks = numOfBlocks; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 94909aabf4..581696c94a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -212,8 +212,6 @@ typedef struct SFileDataBlockInfo { uint8_t* pData; } lastPk; - int32_t firstPKLen; - int32_t lastPKLen; int64_t minVer; int64_t maxVer; int64_t blockOffset; @@ -349,8 +347,9 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); +bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); -void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c2032554b6..8e6f637d81 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -340,22 +340,21 @@ typedef struct STableMergeScanInfo { int32_t scanTimes; int32_t readIdx; SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSHashObj* mTableNumRows; // uid->num of table rows - SHashObj* mSkipTables; - int64_t mergeLimit; + SSampleExecInfo sample; // sample execution info + SSHashObj* mTableNumRows; // uid->num of table rows + SHashObj* mSkipTables; + int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool needCountEmptyTable; - bool bGroupProcessed; // the group return data means processed - bool filesetDelimited; - bool bNewFilesetEvent; - bool bNextDurationBlockEvent; - int32_t numNextDurationBlocks; - SSDataBlock* nextDurationBlocks[2]; - bool rtnNextDurationBlocks; - int32_t nextDurationBlocksIdx; - - bool bSortRowId; + bool needCountEmptyTable; + bool bGroupProcessed; // the group return data means processed + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; + bool bSortRowId; STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d152baf502..be6fb2983c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -261,6 +261,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) pBlockInfo->pks[0].type = pInfoData->info.type; pBlockInfo->pks[1].type = pInfoData->info.type; + // allocate enough buffer size, which is pInfoData->info.bytes if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); if (pBlockInfo->pks[0].pData == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f917daa63f..59247e1cb6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); - pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); - pInfo->pResBlock->info.blankFill = false; if (!pInfo->needCountEmptyTable) { @@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t num = 0; - STableKeyInfo* pList = NULL; + int32_t num = 0; + STableKeyInfo* pList = NULL; if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { @@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { return NULL; } + taosRLockLatch(&pTaskInfo->lock); initNextGroupScan(pInfo, &pList, &num); + taosRUnLockLatch(&pTaskInfo->lock); + ASSERT(pInfo->base.dataReader == NULL); int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -4134,14 +4135,13 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - - SSDataBlock* pBlock = pInfo->pReaderBlock; - int32_t code = 0; - bool hasNext = false; - STsdbReader* reader = pInfo->base.dataReader; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { @@ -4177,27 +4177,24 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe *pSkipped = true; return; } + return; } static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = NULL; - int32_t code = 0; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + int64_t st = taosGetTimestampUs(); - int64_t st = taosGetTimestampUs(); - bool hasNext = false; - - STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { - qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", - GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; @@ -4206,19 +4203,19 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { blockDataDestroy(pInfo->nextDurationBlocks[i]); pInfo->nextDurationBlocks[i] = NULL; } + pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; pInfo->numNextDurationBlocks = 0; continue; } } else { - bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", - GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -4229,15 +4226,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; if (pInfo->numNextDurationBlocks > 2) { - qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), + pInfo->numNextDurationBlocks); pInfo->bNewFilesetEvent = false; break; } } + if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; return NULL; } + if (pInfo->bNextDurationBlockEvent) { pInfo->bNextDurationBlockEvent = false; continue; @@ -4245,19 +4245,18 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } if (bSkipped) continue; } + pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; - pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - + return pBlock; } return NULL; } - SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) { SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo biTs = {0}; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e62763ebc5..b72811cdcc 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1339,11 +1339,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); } else { // start a new session window + // start a new session window if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window SResultRow* pResult = NULL; // keep the time window for the closed time window. STimeWindow window = pRowSup->win; + int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 44404c345e..cd1a858175 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1649,8 +1649,8 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH } static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); @@ -1688,12 +1688,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + int64_t firstRowTs = *(int64_t*)tsCol->pData; + if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { if (bExtractedBlock) { blockDataDestroy(pBlk); - } + } continue; } } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index f3192b4956..9829528d92 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1945,10 +1945,14 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt if (code == TSDB_CODE_SUCCESS && bFirstTable) { code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); } - - code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), - pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); - initTableColSubmitData(*ppTableDataCxt); + if (code == TSDB_CODE_SUCCESS) { + code = + insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); + } + if (code == TSDB_CODE_SUCCESS) { + code = initTableColSubmitData(*ppTableDataCxt); + } if (code == TSDB_CODE_SUCCESS) { SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1); code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow); @@ -1965,7 +1969,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt clearStbRowsDataContext(pStbRowsCxt); - return TSDB_CODE_SUCCESS; + return code; } static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ede4dc07e1..4b7ed59039 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - terrno = TSDB_CODE_SUCCESS; - raftStoreNextTerm(ths); - if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); - return -1; - } - if (syncNodeAssignedLeader2Leader(ths) != 0) { - sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId); - return -1; - } - - taosThreadMutexLock(&ths->arbTokenMutex); - syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken); - sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken); - taosThreadMutexUnlock(&ths->arbTokenMutex); + syncNodeStepDown(ths, pMsg->term); } } else { (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 85aa3a2796..fbdb5f4201 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) { return code; } -#ifdef BUILD_NO_CALL -int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sError("sync step down error"); - return -1; - } - - syncNodeStepDown(pSyncNode, newTerm); - syncNodeRelease(pSyncNode); - return 0; -} -#endif - bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // start in syncNodeStart // start raft - // syncNodeBecomeFollower(pSyncNode); int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; @@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // persist cfg syncWriteCfgFile(pSyncNode); - -#if 0 - // change isStandBy to normal (election timeout) - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, ""); - - // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); - // syncMaybeAdvanceCommitIndex(pSyncNode); - - } else { - syncNodeBecomeFollower(pSyncNode, ""); - } -#endif } else { // persist cfg syncWriteCfgFile(pSyncNode); @@ -1874,18 +1845,6 @@ _END: } // raft state change -------------- -#ifdef BUILD_NO_CALL -void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { - if (term > raftStoreGetTerm(pSyncNode)) { - raftStoreSetTerm(pSyncNode, term); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode); - } -} -#endif - void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { if (term > raftStoreGetTerm(pSyncNode)) { raftStoreSetTerm(pSyncNode, term); @@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); } while (0); + if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { + taosThreadMutexLock(&pSyncNode->arbTokenMutex); + syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken); + sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken); + taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); + } + if (currentTerm < newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode); - } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { syncNodeBecomeFollower(pSyncNode, "step down"); @@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { sNTrace(pSyncNode, "follower to candidate"); } -#ifdef BUILD_NO_CALL -void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); - syncNodeBecomeFollower(pSyncNode, "leader to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "leader to follower"); -} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - syncNodeBecomeFollower(pSyncNode, "candidate to follower"); - SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, - pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); - - sNTrace(pSyncNode, "candidate to follower"); -} -#endif - int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER); syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");