[td-225] refactor codes.
This commit is contained in:
parent
15b2e41d7a
commit
e7fb3fdc53
|
@ -182,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
|
|||
return pLocalIdList;
|
||||
}
|
||||
|
||||
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
|
||||
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1 && pMeta != NULL);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo));
|
||||
if (pTableCheckInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||
SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i);
|
||||
|
||||
size_t gsize = taosArrayGetSize(group);
|
||||
assert(gsize > 0);
|
||||
|
||||
for (int32_t j = 0; j < gsize; ++j) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
|
||||
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
|
||||
|
||||
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
info.tableId.tid = info.pTableObj->tableId.tid;
|
||||
info.tableId.uid = info.pTableObj->tableId.uid;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||
assert(info.lastKey >= pQueryHandle->window.skey);
|
||||
} else {
|
||||
assert(info.lastKey <= pQueryHandle->window.skey);
|
||||
}
|
||||
|
||||
taosArrayPush(pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
|
||||
info.tableId.tid, info.lastKey, pQueryHandle->qinfo);
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
return pTableCheckInfo;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) {
|
||||
size_t si = taosArrayGetSize(pTableCheckInfo);
|
||||
SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo));
|
||||
if (pNew == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < si; ++j) {
|
||||
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j);
|
||||
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
|
||||
|
||||
info.tableId = pCheckInfo->tableId;
|
||||
taosArrayPush(pNew, &info);
|
||||
}
|
||||
|
||||
// it is ordered already, no need to sort again.
|
||||
taosArraySort(pNew, tsdbCheckInfoCompar);
|
||||
return pNew;
|
||||
}
|
||||
|
||||
static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo) {
|
||||
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
||||
if (pQueryHandle == NULL) {
|
||||
goto out_of_memory;
|
||||
|
@ -206,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
}
|
||||
|
||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
||||
|
||||
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
||||
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
||||
assert(pCond != NULL && pCond->numOfCols > 0);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pCond->order)) {
|
||||
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
|
||||
|
@ -217,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
}
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
int32_t numOfCols = pCond->numOfCols;
|
||||
|
||||
pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
|
||||
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
|
||||
if (pQueryHandle->statis == NULL) {
|
||||
goto out_of_memory;
|
||||
}
|
||||
|
||||
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
|
||||
pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
|
||||
if (pQueryHandle->pColumns == NULL) {
|
||||
goto out_of_memory;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {{0}, 0};
|
||||
|
||||
|
||||
colInfo.info = pCond->colList[i];
|
||||
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
|
||||
if (colInfo.pData == NULL) {
|
||||
|
@ -241,47 +302,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
pQueryHandle->statis[i].colId = colInfo.info.colId;
|
||||
}
|
||||
|
||||
pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo));
|
||||
if (pQueryHandle->pTableCheckInfo == NULL) {
|
||||
goto out_of_memory;
|
||||
}
|
||||
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
||||
|
||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||
assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
|
||||
|
||||
size_t gsize = taosArrayGetSize(group);
|
||||
assert(gsize > 0);
|
||||
|
||||
for (int32_t j = 0; j < gsize; ++j) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||
|
||||
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
|
||||
info.tableId = ((STable*)(pKeyInfo->pTable))->tableId;
|
||||
|
||||
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||
|
||||
info.tableId.tid = info.pTableObj->tableId.tid;
|
||||
info.tableId.uid = info.pTableObj->tableId.uid;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||
assert(info.lastKey >= pQueryHandle->window.skey);
|
||||
} else {
|
||||
assert(info.lastKey <= pQueryHandle->window.skey);
|
||||
}
|
||||
|
||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
|
||||
info.tableId.tid, info.lastKey, qinfo);
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar);
|
||||
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
||||
assert(pMeta != NULL);
|
||||
|
||||
pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||
if (pQueryHandle->pDataCols == NULL) {
|
||||
|
@ -290,19 +314,35 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
goto out_of_memory;
|
||||
}
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
|
||||
|
||||
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
||||
|
||||
return (TsdbQueryHandleT) pQueryHandle;
|
||||
|
||||
out_of_memory:
|
||||
out_of_memory:
|
||||
tsdbCleanupQueryHandle(pQueryHandle);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
|
||||
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo);
|
||||
|
||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||
assert(pMeta != NULL);
|
||||
|
||||
// todo apply the lastkey of table check to avoid to load header file
|
||||
pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta);
|
||||
if (pQueryHandle->pTableCheckInfo == NULL) {
|
||||
tsdbCleanupQueryHandle(pQueryHandle);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
|
||||
return (TsdbQueryHandleT) pQueryHandle;
|
||||
}
|
||||
|
||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||
pCond->twindow = changeTableGroupByLastrow(groupList);
|
||||
|
||||
|
@ -1920,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||
return true;
|
||||
} else {
|
||||
STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
|
||||
if (pSecQueryHandle == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
|
||||
STsdbQueryCond cond = {
|
||||
.order = TSDB_ORDER_ASC,
|
||||
.numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)),
|
||||
.twindow = win};
|
||||
|
||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||
if (cond.colList == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return false;
|
||||
}
|
||||
|
||||
pSecQueryHandle->order = TSDB_ORDER_ASC;
|
||||
pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
|
||||
pSecQueryHandle->pTsdb = pQueryHandle->pTsdb;
|
||||
pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||
pSecQueryHandle->cur.fid = -1;
|
||||
pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||
pSecQueryHandle->checkFiles = true;
|
||||
pSecQueryHandle->activeIndex = 0;
|
||||
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
|
||||
|
||||
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
free(pSecQueryHandle);
|
||||
return false;
|
||||
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
|
||||
}
|
||||
|
||||
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
|
||||
STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
||||
taosTFree(cond.colList);
|
||||
|
||||
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
|
||||
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||
if (pSecQueryHandle->statis == NULL || pSecQueryHandle->pColumns == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
|
||||
if (pSecQueryHandle->pTableCheckInfo == NULL) {
|
||||
tsdbCleanupQueryHandle(pSecQueryHandle);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {{0}, 0};
|
||||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
|
||||
colInfo.info = pCol->info;
|
||||
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
|
||||
if (colInfo.pData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbCleanupQueryHandle(pSecQueryHandle);
|
||||
return false;
|
||||
}
|
||||
|
||||
taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
|
||||
}
|
||||
|
||||
size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo));
|
||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
||||
assert(pMeta != NULL);
|
||||
|
||||
for (int32_t j = 0; j < si; ++j) {
|
||||
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
|
||||
STableCheckInfo info = {
|
||||
.lastKey = pSecQueryHandle->window.skey,
|
||||
.pTableObj = pCheckInfo->pTableObj,
|
||||
};
|
||||
|
||||
info.tableId = pCheckInfo->tableId;
|
||||
|
||||
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
|
||||
}
|
||||
|
||||
tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo);
|
||||
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
|
||||
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
|
||||
|
||||
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
|
||||
tsdbCleanupQueryHandle(pSecQueryHandle);
|
||||
return false;
|
||||
|
@ -1999,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
|
||||
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
|
||||
|
||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle));
|
||||
size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
|
||||
|
@ -2012,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
|
||||
|
||||
// it is ascending order
|
||||
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
pQueryHandle->window = pQueryHandle->cur.win;
|
||||
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
|
||||
pQueryHandle->cur.rows = 2;
|
||||
pQueryHandle->cur.mixBlock = true;
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
|
||||
int32_t step = -1;// one step for ascending order traverse
|
||||
for (int32_t j = 0; j < si; ++j) {
|
||||
|
|
Loading…
Reference in New Issue