refactor and debug
This commit is contained in:
parent
88ba2f4935
commit
fac8cf100b
|
@ -60,7 +60,6 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter
|
|||
int *blkIdx);
|
||||
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||
TSKEY maxKey, int maxRows);
|
||||
static int tsdbCompareTidIdx(const void *key1, const void *key2);
|
||||
|
||||
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
||||
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||
|
@ -276,32 +275,31 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
|||
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
||||
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
||||
|
||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||
if (pHelper->idxH.numOfIdx > 0) {
|
||||
if (pHelper->idxH.numOfIdx > 0) {
|
||||
while (true) {
|
||||
if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) {
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
} else {
|
||||
SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]);
|
||||
if (pIdx->tid == TABLE_TID(pTable)) {
|
||||
break;
|
||||
}
|
||||
|
||||
SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]);
|
||||
if (pIdx->tid == TABLE_TID(pTable)) {
|
||||
if (pIdx->uid == TABLE_UID(pTable)) {
|
||||
pHelper->curCompIdx = *pIdx;
|
||||
pHelper->idxH.curIdx++;
|
||||
} else {
|
||||
ASSERT(pIdx->tid > TABLE_TID(pTable));
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
}
|
||||
pHelper->idxH.curIdx++;
|
||||
break;
|
||||
} else if (pIdx->tid > TABLE_TID(pTable)) {
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
break;
|
||||
} else {
|
||||
pHelper->idxH.curIdx++;
|
||||
}
|
||||
} else {
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
}
|
||||
} else {
|
||||
// TODO: make it more efficient
|
||||
void *ptr = bsearch(&TABLE_TID(pTable), (void *)pHelper->idxH.pIdxArray, pHelper->idxH.numOfIdx, sizeof(SCompIdx),
|
||||
tsdbCompareTidIdx);
|
||||
if (ptr == NULL) {
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
} else {
|
||||
pHelper->curCompIdx = *(SCompIdx *)ptr;
|
||||
}
|
||||
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
|
||||
}
|
||||
|
||||
helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
|
||||
|
@ -1681,14 +1679,4 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
|
|||
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCompareTidIdx(const void *key1, const void *key2) {
|
||||
if (*(int32_t *)key1 > ((SCompIdx *)key2)->tid) {
|
||||
return 1;
|
||||
} else if (*(int32_t *)key1 < ((SCompIdx *)key2)->tid) {
|
||||
return -1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -126,12 +126,13 @@ typedef struct STsdbQueryHandle {
|
|||
SIOCostSummary cost;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbQueryHandle* pQueryHandle);
|
||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2);
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slot = -1;
|
||||
|
@ -236,7 +237,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
||||
|
||||
tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
|
||||
|
@ -2431,3 +2433,13 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
|
|||
taosArrayDestroy(pGroupList->pGroupList);
|
||||
}
|
||||
|
||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
||||
if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) {
|
||||
return -1;
|
||||
} else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) {
|
||||
return 1;
|
||||
} else {
|
||||
ASSERT(false);
|
||||
return 0;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue