Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-29 09:34:08 +00:00
commit ddb6745bd9
9 changed files with 390 additions and 37 deletions

View File

@ -250,8 +250,15 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
// bug api, deprecated, USE H version
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
// structs =======================
typedef struct {

View File

@ -65,7 +65,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
tdRowCpy(cacheRow, row);
} else {
tsdbCacheDeleteLastrow(pCache, uid);
tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
tsdbCacheInsertLastrow(pCache, uid, row);
}
}
@ -97,7 +97,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
tdRowCpy(cacheRow, row);
} else {
tsdbCacheDeleteLastrow(pCache, uid);
tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
tsdbCacheInsertLastrow(pCache, uid, row);
}
}
@ -581,6 +581,10 @@ typedef struct TsdbNextRowState {
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL;
@ -597,6 +601,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
if (pDelFile) {
SDelFReader *pDelFReader;
@ -604,7 +610,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
SDelIdx delIdx;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
if (code) goto _err;
@ -612,6 +617,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
if (code) goto _err;
tsdbDelFReaderClose(pDelFReader);
} else {
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
if (code) goto _err;
}
int iSkyline = taosArrayGetSize(pSkyline) - 1;
@ -644,7 +652,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
input[1].next = true;
}
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nilColCount = nCol - 1; // count of null & none cols
int iCol = 0; // index of first nil col index from left to right
bool setICol = false;
do {
for (int i = 0; i < 3; ++i) {
@ -667,15 +677,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
TSDBROW *max[3] = {0};
int iMax[3] = {-1, -1, -1};
int nMax = 0;
TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) {
if (input[i].pRow != NULL) {
if (!input[i].stop && input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(input[i].pRow);
TSDBKEY maxKey = TSDBROW_KEY(max[nMax]);
// merging & deduplicating on client side
if (maxKey.ts <= key.ts) {
if (maxKey.ts < key.ts) {
if (maxKey <= key.ts) {
if (maxKey < key.ts) {
nMax = 0;
maxKey = key.ts;
}
iMax[nMax] = i;
@ -686,6 +698,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// delete detection
TSDBROW *merge[3] = {0};
int iMerge[3] = {-1, -1, -1};
int nMerge = 0;
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
@ -693,6 +706,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// bool deleted = false;
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) {
iMerge[nMerge] = i;
merge[nMerge++] = max[i];
}
@ -716,6 +730,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
tRowMergerClear(&merger);
}
}
} while (*ppRow == NULL);
taosMemoryFreeClear(pTSchema);
@ -727,6 +742,245 @@ _err:
return code;
}
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
int16_t nCol = pTSchema->numOfCols;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
STbData *pMem = NULL;
if (pTsdb->mem) {
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
}
STbData *pIMem = NULL;
if (pTsdb->imem) {
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
}
*ppRow = NULL;
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
SDelIdx delIdx;
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
if (pDelFile) {
SDelFReader *pDelFReader;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
if (code) goto _err;
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
if (code) goto _err;
tsdbDelFReaderClose(pDelFReader);
} else {
code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline);
if (code) goto _err;
}
int iSkyline = taosArrayGetSize(pSkyline) - 1;
SBlockIdx idx = {.suid = suid, .uid = uid};
SFSNextRowIter fsState = {0};
fsState.state = SFSNEXTROW_FS;
fsState.pTsdb = pTsdb;
fsState.pBlockIdxExp = &idx;
SMemNextRowIter memState = {0};
SMemNextRowIter imemState = {0};
TSDBROW memRow, imemRow, fsRow;
TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem},
{&imemRow, true, false, &imemState, getNextRowFromMem},
{&fsRow, false, true, &fsState, getNextRowFromFS}};
if (pMem) {
memState.pMem = pMem;
memState.state = SMEMNEXTROW_ENTER;
input[0].stop = false;
input[0].next = true;
}
if (pIMem) {
imemState.pMem = pIMem;
imemState.state = SMEMNEXTROW_ENTER;
input[1].stop = false;
input[1].next = true;
}
int16_t nilColCount = nCol - 1; // count of null & none cols
int iCol = 0; // index of first nil col index from left to right
bool setICol = false;
do {
for (int i = 0; i < 3; ++i) {
if (input[i].next && !input[i].stop) {
code = input[i].nextRowFn(input[i].iter, &input[i].pRow);
if (code) goto _err;
if (input[i].pRow == NULL) {
input[i].stop = true;
input[i].next = false;
}
}
}
if (input[0].stop && input[1].stop && input[2].stop) {
break;
}
// select maxpoint(s) from mem, imem, fs
TSDBROW *max[3] = {0};
int iMax[3] = {-1, -1, -1};
int nMax = 0;
TSKEY maxKey = TSKEY_MIN;
for (int i = 0; i < 3; ++i) {
if (!input[i].stop && input[i].pRow != NULL) {
TSDBKEY key = TSDBROW_KEY(input[i].pRow);
// merging & deduplicating on client side
if (maxKey <= key.ts) {
if (maxKey < key.ts) {
nMax = 0;
maxKey = key.ts;
}
iMax[nMax] = i;
max[nMax++] = input[i].pRow;
}
}
}
// delete detection
TSDBROW *merge[3] = {0};
int iMerge[3] = {-1, -1, -1};
int nMerge = 0;
for (int i = 0; i < nMax; ++i) {
TSDBKEY maxKey = TSDBROW_KEY(max[i]);
// bool deleted = false;
bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline);
if (!deleted) {
iMerge[nMerge] = i;
merge[nMerge++] = max[i];
}
input[iMax[i]].next = deleted;
}
// merge if nMerge > 1
if (nMerge > 0) {
if (nMerge == 1) {
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
if (code) goto _err;
} else {
// merge 2 or 3 rows
SRowMerger merger = {0};
tRowMergerInit(&merger, merge[0], pTSchema);
for (int i = 1; i < nMerge; ++i) {
tRowMerge(&merger, merge[i]);
}
tRowMergerGetRow(&merger, ppRow);
tRowMergerClear(&merger);
}
}
if (iCol == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
SColVal *pColVal = &(SColVal){0};
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey});
if (taosArrayPush(pColArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
++iCol;
setICol = false;
for (int16_t i = iCol; iCol < nCol; ++i) {
// tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal);
if (taosArrayPush(pColArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (pColVal->isNull || pColVal->isNone) {
for (int j = 0; j < nMerge; ++j) {
SColVal jColVal = {0};
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
if (jColVal.isNull || jColVal.isNone) {
input[iMerge[j]].next = true;
}
}
if (!setICol) {
iCol = i;
setICol = true;
}
} else {
--nilColCount;
}
}
continue;
}
setICol = false;
for (int16_t i = iCol; i < nCol; ++i) {
SColVal colVal = {0};
tTSRowGetVal(*ppRow, pTSchema, i, &colVal);
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i);
if (!colVal.isNone && !colVal.isNull) {
if (tColVal->isNull || tColVal->isNone) {
taosArraySet(pColArray, i, &colVal);
--nilColCount;
}
} else {
if (tColVal->isNull || tColVal->isNone && !setICol) {
iCol = i;
setICol = true;
for (int j = 0; j < nMerge; ++j) {
SColVal jColVal = {0};
tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal);
if (jColVal.isNull || jColVal.isNone) {
input[iMerge[j]].next = true;
}
}
}
}
}
} while (nilColCount > 0);
// if () new ts row from pColArray if non empty
if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
}
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
_err:
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
@ -749,9 +1003,11 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
// taosLRUCacheRelease(pCache, h, true);
return code;
}
#if 0
int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
@ -763,7 +1019,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
// code = mergeLast(uid, pTsdb, &pRow);
code = mergeLast(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
@ -774,10 +1030,12 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow *
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
// taosLRUCacheRelease(pCache, h, true);
return code;
}
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
#endif
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
@ -785,10 +1043,89 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) {
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLastRow(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
}
tsdbCacheInsertLastrow(pCache, uid, pRow);
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
*handle = h;
// taosLRUCacheRelease(pCache, h, true);
return code;
}
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "l", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLast(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
}
tsdbCacheInsertLast(pCache, uid, pRow);
h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
*handle = h;
// taosLRUCacheRelease(pCache, h, true);
return code;
}
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
getTableCacheKey(uid, "l", key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
// clear last cache anyway, no matter where eKey ends.
taosLRUCacheRelease(pCache, h, true);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t
// keyLen);
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}

View File

@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid);
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
}
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64

View File

@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
}
} else {
SColVal cv = {0};
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
}
pReader->pResBlock->info.rows = pBlockData->nRow;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
/*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
}
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer);
}
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
while(1) {
@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
}
}
static int32_t loadDataInFiles(STsdbReader* pReader) {
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter;
@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
// // check if the query range overlaps with the file data block
// bool exists = true;
// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists);
// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists);
// if (code != TSDB_CODE_SUCCESS) {
// pTsdbReadHandle->checkFiles = false;
// return false;
@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
if (pStatus->loadFromFile) {
int32_t code = loadDataInFiles(pReader);
int32_t code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pBlock->info.rows > 0) {
return true;
} else {
buildInmemBlockSeqentially(pReader);
buildBlockFromBufferSeqentially(pReader);
return pBlock->info.rows > 0;
}
} else { // no data in files, let's try the buffer
buildInmemBlockSeqentially(pReader);
buildBlockFromBufferSeqentially(pReader);
return pBlock->info.rows > 0;
}
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
// if (pReader->checkFiles) {
// // check if the query range overlaps with the file data block
// bool exists = true;
// int32_t code = loadDataInFiles(pReader, &exists);
// int32_t code = buildBlockFromFiles(pReader, &exists);
// if (code != TSDB_CODE_SUCCESS) {
// pReader->activeIndex = 0;
// pReader->checkFiles = false;
@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return pReader->pResBlock->pDataBlock;
}

View File

@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);

View File

@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
return result;
}
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
//code = doFilterTag(pTagIndexCond, &metaArg, res);
code = TSDB_CODE_INDEX_REBUILDING;
if (code == TSDB_CODE_INDEX_REBUILDING) {
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
} else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
taosArrayDestroy(res);
@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
}
taosArrayDestroy(res);
} else {
code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
}
if(pTagCond){

View File

@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList);
int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList);
code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
}
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo {
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return 0;
}
// TODO consider the page meta size
int32_t getProperSortPageSize(size_t rowSize) {
uint32_t defaultPageSize = 4096;