Merge pull request #21087 from taosdata/fix/liaohj_main
refactor: do some internal refactor.
This commit is contained in:
commit
9a35d4e950
|
@ -123,15 +123,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
void tsdbRowClose(STSDBRowIter *pIter);
|
void tsdbRowClose(STSDBRowIter *pIter);
|
||||||
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
|
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
|
||||||
// SRowMerger
|
|
||||||
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
|
|
||||||
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
|
||||||
void tsdbRowMergerClear(SRowMerger *pMerger);
|
|
||||||
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
|
|
||||||
|
|
||||||
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema);
|
// SRowMerger
|
||||||
void tsdbRowMergerClear_rv(SRowMerger* pMerger);
|
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pSchema);
|
||||||
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger);
|
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
|
||||||
|
void tsdbRowMergerClear(SRowMerger *pMerger);
|
||||||
|
void tsdbRowMergerCleanup(SRowMerger *pMerger);
|
||||||
|
|
||||||
// TABLEID
|
// TABLEID
|
||||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||||
|
|
|
@ -74,7 +74,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||||
tqError("prepare scan failed, return");
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,7 +118,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
|
||||||
tqDebug("tqScanTaosx prepare scan failed, return");
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -780,7 +780,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
}
|
}
|
||||||
|
|
||||||
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
|
|
||||||
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
|
||||||
|
|
||||||
code = tBlockDataCreate(&pReader->status.fileBlockData);
|
code = tBlockDataCreate(&pReader->status.fileBlockData);
|
||||||
|
@ -789,8 +788,20 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT (pReader->suppInfo.colId[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
if (pReader->suppInfo.colId[0] != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
tsdbError("the first column isn't primary timestamp, %d, %s", pReader->suppInfo.colId[0], pReader->idStr);
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
|
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
|
||||||
|
int32_t type = pReader->status.pPrimaryTsCol->info.type;
|
||||||
|
if (type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name,
|
||||||
|
pReader->idStr);
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInitReaderLock(pReader);
|
tsdbInitReaderLock(pReader);
|
||||||
|
|
||||||
|
@ -1464,7 +1475,6 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
pBlockIter->pTableMap = pReader->status.pTableMap;
|
pBlockIter->pTableMap = pReader->status.pTableMap;
|
||||||
|
|
||||||
// access data blocks according to the offset of each block in asc/desc order.
|
// access data blocks according to the offset of each block in asc/desc order.
|
||||||
// int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap);
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1474,21 +1484,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cnt = 0;
|
int32_t cnt = 0;
|
||||||
// void* ptr = NULL;
|
|
||||||
// int32_t iter = 0;
|
|
||||||
|
|
||||||
// while (1) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
// ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter);
|
|
||||||
// if (ptr == NULL) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
|
||||||
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
||||||
ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
|
ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
|
||||||
// if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
|
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||||
sup.numOfBlocksPerTable[sup.numOfTables] = num;
|
sup.numOfBlocksPerTable[sup.numOfTables] = num;
|
||||||
|
@ -1876,8 +1875,6 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
pScanInfo->lastKeyInStt = key;
|
pScanInfo->lastKeyInStt = key;
|
||||||
|
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
|
||||||
// the qualifed ts may equal to k.ts, only a greater version one.
|
|
||||||
// here we need to fallback one step.
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1939,7 +1936,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
|
code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2021,7 +2018,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (pReader->order == TSDB_ORDER_ASC) {
|
if (pReader->order == TSDB_ORDER_ASC) {
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
init = true; // todo check if pReader->pSchema is null or not
|
init = true; // todo check if pReader->pSchema is null or not
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2031,15 +2028,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMergerAdd(&pReader->status.merger, fRow1, NULL);
|
tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr);
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
|
@ -2051,7 +2048,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2069,7 +2066,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2086,7 +2083,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2099,7 +2096,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2116,7 +2113,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2149,7 +2146,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
pBlockScanInfo->lastKey = tsLastBlock;
|
pBlockScanInfo->lastKey = tsLastBlock;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2166,14 +2163,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // not merge block data
|
} else { // not merge block data
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2193,7 +2190,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -2245,7 +2242,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2454,7 +2451,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2627,7 +2624,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear_rv(&pReader->status.merger);
|
tsdbRowMergerClear(&pReader->status.merger);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2966,7 +2963,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
|
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
|
||||||
// ASSERT(taosArrayGetSize(pTableList) > 0);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3913,7 +3909,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
}
|
}
|
||||||
|
|
||||||
pResRow->type = TSDBROW_ROW_FMT;
|
pResRow->type = TSDBROW_ROW_FMT;
|
||||||
tsdbRowMergerClear_rv(&pReader->status.merger);
|
tsdbRowMergerClear(&pReader->status.merger);
|
||||||
*freeTSRow = true;
|
*freeTSRow = true;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3972,7 +3968,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
|
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
|
||||||
tsdbRowMergerClear_rv(pMerger);
|
tsdbRowMergerClear(pMerger);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4097,12 +4093,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
// ASSERT (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID);// {
|
|
||||||
// SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
|
||||||
// ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
|
|
||||||
((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
|
((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
|
||||||
i += 1;
|
i += 1;
|
||||||
// }
|
|
||||||
|
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
int32_t numOfInputCols = pBlockData->nColData;
|
int32_t numOfInputCols = pBlockData->nColData;
|
||||||
|
@ -4355,7 +4347,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->pSchema != NULL) {
|
if (pReader->pSchema != NULL) {
|
||||||
tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
|
tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
|
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
|
||||||
|
@ -4515,7 +4507,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
taosMemoryFree(pReader->idStr);
|
taosMemoryFree(pReader->idStr);
|
||||||
|
|
||||||
tsdbRowMergerCleanup_rv(&pReader->status.merger);
|
tsdbRowMergerCleanup(&pReader->status.merger);
|
||||||
taosMemoryFree(pReader->pSchema);
|
taosMemoryFree(pReader->pSchema);
|
||||||
|
|
||||||
tSimpleHashCleanup(pReader->pSchemaMap);
|
tSimpleHashCleanup(pReader->pSchemaMap);
|
||||||
|
@ -4562,7 +4554,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
// pInfo->lastKey = ts;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// resetDataBlockScanInfo excluding lastKey
|
// resetDataBlockScanInfo excluding lastKey
|
||||||
|
@ -4585,7 +4576,6 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
// pInfo->lastKey = ts;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
||||||
|
@ -5134,8 +5124,6 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
|
|
||||||
const int32_t numOfBuckets = 20.0;
|
const int32_t numOfBuckets = 20.0;
|
||||||
|
|
||||||
// find the start data block in file
|
|
||||||
|
|
||||||
// find the start data block in file
|
// find the start data block in file
|
||||||
tsdbAcquireReader(pReader);
|
tsdbAcquireReader(pReader);
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
|
|
|
@ -637,78 +637,6 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SRowMerger ======================================================
|
// SRowMerger ======================================================
|
||||||
|
|
||||||
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
|
|
||||||
int32_t code = 0;
|
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
|
||||||
SColVal *pColVal = &(SColVal){0};
|
|
||||||
STColumn *pTColumn;
|
|
||||||
int32_t iCol, jCol = 0;
|
|
||||||
|
|
||||||
if (NULL == pResTSchema) {
|
|
||||||
pResTSchema = pTSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMerger->pTSchema = pResTSchema;
|
|
||||||
pMerger->version = key.version;
|
|
||||||
|
|
||||||
pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal));
|
|
||||||
if (pMerger->pArray == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ts
|
|
||||||
pTColumn = &pTSchema->columns[jCol++];
|
|
||||||
|
|
||||||
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
||||||
|
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
|
|
||||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// other
|
|
||||||
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
|
|
||||||
pTColumn = &pResTSchema->columns[iCol];
|
|
||||||
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
|
||||||
++jCol;
|
|
||||||
--iCol;
|
|
||||||
continue;
|
|
||||||
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
|
||||||
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
|
||||||
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
|
||||||
uint8_t *pVal = pColVal->value.pData;
|
|
||||||
|
|
||||||
pColVal->value.pData = NULL;
|
|
||||||
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
if (pColVal->value.nData) {
|
|
||||||
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (; iCol < pResTSchema->numOfCols; ++iCol) {
|
|
||||||
pTColumn = &pResTSchema->columns[iCol];
|
|
||||||
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
@ -836,7 +764,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
|
int32_t tsdbRowMergerInit(SRowMerger* pMerger, STSchema *pSchema) {
|
||||||
pMerger->pTSchema = pSchema;
|
pMerger->pTSchema = pSchema;
|
||||||
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
|
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
|
||||||
if (pMerger->pArray == NULL) {
|
if (pMerger->pArray == NULL) {
|
||||||
|
@ -846,7 +774,7 @@ int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
|
void tsdbRowMergerClear(SRowMerger* pMerger) {
|
||||||
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
|
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
|
||||||
|
@ -857,7 +785,7 @@ void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
|
||||||
taosArrayClear(pMerger->pArray);
|
taosArrayClear(pMerger->pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
|
void tsdbRowMergerCleanup(SRowMerger* pMerger) {
|
||||||
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
|
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
|
||||||
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
|
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
@ -869,82 +797,6 @@ void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
|
||||||
taosArrayDestroy(pMerger->pArray);
|
taosArrayDestroy(pMerger->pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbRowMergerClear(SRowMerger *pMerger) {
|
|
||||||
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
|
||||||
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
|
|
||||||
tFree(pTColVal->value.pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pMerger->pArray);
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
|
||||||
int32_t code = 0;
|
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
|
||||||
SColVal *pColVal = &(SColVal){0};
|
|
||||||
|
|
||||||
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
|
|
||||||
|
|
||||||
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
|
||||||
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
|
|
||||||
|
|
||||||
if (key.version > pMerger->version) {
|
|
||||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
|
||||||
if (!COL_VAL_IS_NULL(pColVal)) {
|
|
||||||
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
pTColVal->value.nData = pColVal->value.nData;
|
|
||||||
if (pTColVal->value.nData) {
|
|
||||||
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
|
|
||||||
}
|
|
||||||
pTColVal->flag = 0;
|
|
||||||
} else {
|
|
||||||
tFree(pTColVal->value.pData);
|
|
||||||
pTColVal->value.pData = NULL;
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (key.version < pMerger->version) {
|
|
||||||
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
|
||||||
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
|
||||||
if (!COL_VAL_IS_NULL(pColVal)) {
|
|
||||||
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
tColVal->value.nData = pColVal->value.nData;
|
|
||||||
if (tColVal->value.nData) {
|
|
||||||
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
|
|
||||||
}
|
|
||||||
tColVal->flag = 0;
|
|
||||||
} else {
|
|
||||||
tFree(tColVal->value.pData);
|
|
||||||
tColVal->value.pData = NULL;
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pMerger->version = key.version;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
|
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
|
||||||
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
|
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
|
||||||
}
|
}
|
||||||
|
@ -1199,8 +1051,6 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) {
|
||||||
pBlockData->aTSKEY = NULL;
|
pBlockData->aTSKEY = NULL;
|
||||||
pBlockData->nColData = 0;
|
pBlockData->nColData = 0;
|
||||||
pBlockData->aColData = NULL;
|
pBlockData->aColData = NULL;
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue