Merge pull request #9854 from taosdata/feature/3.0_liaohj
[td-11818]fix bug in tsdbread.
This commit is contained in:
commit
a40ebba704
|
@ -85,7 +85,6 @@ enum {
|
||||||
typedef struct STableCheckInfo {
|
typedef struct STableCheckInfo {
|
||||||
uint64_t tableId;
|
uint64_t tableId;
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
STable* pTableObj;
|
|
||||||
SBlockInfo* pCompInfo;
|
SBlockInfo* pCompInfo;
|
||||||
int32_t compSize;
|
int32_t compSize;
|
||||||
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
|
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
|
||||||
|
@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
|
||||||
STableBlockInfo* pDataBlockInfo;
|
STableBlockInfo* pDataBlockInfo;
|
||||||
SDataCols *pDataCols; // in order to hold current file data block
|
SDataCols *pDataCols; // in order to hold current file data block
|
||||||
int32_t allocSize; // allocated data block size
|
int32_t allocSize; // allocated data block size
|
||||||
// STsdb
|
|
||||||
// STsdbMemTable * pMemTable;
|
|
||||||
SArray *defaultLoadColumn;// default load column
|
SArray *defaultLoadColumn;// default load column
|
||||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
|
||||||
|
@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
|
||||||
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
||||||
|
|
||||||
// the primary timestamp column does not be included in the the specified load column list, add it
|
// the primary timestamp column does not be included in the the specified load column list, add it
|
||||||
if (loadTS && colId != 0) {
|
if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
int16_t columnId = 0;
|
int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
taosArrayInsert(pLocalIdList, 0, &columnId);
|
taosArrayInsert(pLocalIdList, 0, &columnId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
for (int32_t j = 0; j < gsize; ++j) {
|
for (int32_t j = 0; j < gsize; ++j) {
|
||||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||||
|
|
||||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
|
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
|
||||||
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||||
|
|
||||||
|
@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
|
|
||||||
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||||
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
size_t gsize = taosArrayGetSize(pTableCheckInfo);
|
||||||
for (int32_t i = 0; i < gsize; ++i) {
|
// for (int32_t i = 0; i < gsize; ++i) {
|
||||||
STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
|
||||||
taosArrayPush(pTable, &pInfo->pTableObj);
|
// }
|
||||||
}
|
|
||||||
|
|
||||||
*psTable = pTable;
|
*psTable = pTable;
|
||||||
return pTableCheckInfo;
|
return pTableCheckInfo;
|
||||||
|
@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
// only one table, not need to sort again
|
// only one table, not need to sort again
|
||||||
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
|
||||||
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
|
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
|
||||||
SArray* pTable = taosArrayInit(1, sizeof(STable*));
|
|
||||||
|
|
||||||
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
|
STableCheckInfo info = { .lastKey = skey};
|
||||||
|
|
||||||
info.tableId = pCheckInfo->tableId;
|
info.tableId = pCheckInfo->tableId;
|
||||||
taosArrayPush(pNew, &info);
|
taosArrayPush(pNew, &info);
|
||||||
taosArrayPush(pTable, &pCheckInfo->pTableObj);
|
|
||||||
|
|
||||||
*psTable = pTable;
|
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
||||||
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
|
|
||||||
// assert(pMeta != NULL);
|
|
||||||
|
|
||||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||||
if (pReadHandle->pDataCols == NULL) {
|
if (pReadHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
|
||||||
|
@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||||
SArray* res = taosArrayInit(size, POINTER_BYTES);
|
SArray* res = taosArrayInit(size, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
|
|
||||||
taosArrayPush(res, &pCheckInfo->pTableObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
|
||||||
pCheckInfo->numOfBlocks = 0;
|
pCheckInfo->numOfBlocks = 0;
|
||||||
|
|
||||||
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
|
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
|
||||||
|
table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||||
|
|
||||||
|
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
|
||||||
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
STSchema *pSchema = NULL;//tsdbGetTableSchema(pCheckInfo->pTableObj);
|
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
|
||||||
|
@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
||||||
|
|
||||||
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
|
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
|
||||||
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
|
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
|
||||||
pBlockLoadInfo->uid = pCheckInfo->pTableObj->uid;
|
pBlockLoadInfo->uid = pCheckInfo->tableId;
|
||||||
|
|
||||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||||
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
|
||||||
|
@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
|
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
||||||
|
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
STable* pTable = NULL;
|
||||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
||||||
|
@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
rv2 = memRowVersion(row2);
|
rv2 = memRowVersion(row2);
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = key;
|
cur->win.skey = key;
|
||||||
|
@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
}
|
}
|
||||||
|
|
||||||
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = key;
|
cur->win.skey = key;
|
||||||
|
@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
// if (ret != TSDB_CODE_SUCCESS) {
|
// if (ret != TSDB_CODE_SUCCESS) {
|
||||||
// return false;
|
// return false;
|
||||||
// }
|
// }
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true);
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
|
@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
|
||||||
if (pHandle->cur.mixBlock) {
|
if (pHandle->cur.mixBlock) {
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else {
|
} else {
|
||||||
SDataBlockInfo binfo = {0};/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
|
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
|
||||||
assert(pHandle->realNumOfRows <= binfo.rows);
|
assert(pHandle->realNumOfRows <= binfo.rows);
|
||||||
|
|
||||||
// data block has been loaded, todo extract method
|
// data block has been loaded, todo extract method
|
||||||
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
||||||
|
|
||||||
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
|
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
|
||||||
pBlockLoadInfo->uid == pCheckInfo->pTableObj->tid) {
|
pBlockLoadInfo->uid == pCheckInfo->tableId) {
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else { // only load the file block
|
} else { // only load the file block
|
||||||
SBlock* pBlock = pBlockInfo->compBlock;
|
SBlock* pBlock = pBlockInfo->compBlock;
|
||||||
|
|
|
@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
|
||||||
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
|
||||||
int numOfColIds) {
|
int numOfColIds) {
|
||||||
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
||||||
ASSERT(colIds[0] == 0);
|
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||||
|
|
||||||
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||||
SBlockCol blockCol = {0};
|
SBlockCol blockCol = {0};
|
||||||
|
@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
if (pDataCol == NULL) continue;
|
if (pDataCol == NULL) continue;
|
||||||
ASSERT(pDataCol->colId == colId);
|
ASSERT(pDataCol->colId == colId);
|
||||||
|
|
||||||
if (colId == 0) { // load the key row
|
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
|
||||||
blockCol.colId = colId;
|
blockCol.colId = colId;
|
||||||
blockCol.len = pBlock->keyLen;
|
blockCol.len = pBlock->keyLen;
|
||||||
blockCol.type = pDataCol->type;
|
blockCol.type = pDataCol->type;
|
||||||
|
|
|
@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
||||||
|
|
||||||
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
||||||
if (NULL == pMsg) { // todo handle malloc error
|
if (NULL == pMsg) { // todo handle malloc error
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
|
||||||
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
||||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||||
cond.twindow = pTableScanNode->window;
|
cond.twindow = pTableScanNode->window;
|
||||||
|
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||||
|
|
||||||
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||||
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
||||||
|
|
Loading…
Reference in New Issue