|
|
|
@ -130,6 +130,7 @@ typedef struct STsdbReadHandle {
|
|
|
|
|
SArray* prev; // previous row which is before than time window
|
|
|
|
|
SArray* next; // next row which is after the query time window
|
|
|
|
|
SIOCostSummary cost;
|
|
|
|
|
STSchema* pSchema;
|
|
|
|
|
} STsdbReadHandle;
|
|
|
|
|
|
|
|
|
|
typedef struct STableGroupSupporter {
|
|
|
|
@ -343,14 +344,14 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
|
|
|
|
|
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
|
|
|
|
|
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
|
|
|
|
|
if (pReadHandle == NULL) {
|
|
|
|
|
goto _end;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pReadHandle->order = pCond->order;
|
|
|
|
|
pReadHandle->pTsdb = tsdb;
|
|
|
|
|
pReadHandle->pTsdb = pVnode->pTsdb;
|
|
|
|
|
pReadHandle->type = TSDB_QUERY_TYPE_ALL;
|
|
|
|
|
pReadHandle->cur.fid = INT32_MIN;
|
|
|
|
|
pReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
|
|
|
@ -368,7 +369,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pC
|
|
|
|
|
snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
|
|
|
|
|
pReadHandle->idStr = strdup(buf);
|
|
|
|
|
|
|
|
|
|
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
|
|
|
|
|
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)pVnode->pTsdb) != 0) {
|
|
|
|
|
goto _end;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -422,9 +423,10 @@ _end:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsdbReaderT* tsdbQueryTablesT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
|
|
|
|
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
|
|
|
|
uint64_t taskId) {
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
|
|
|
|
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
|
|
|
|
if (pTsdbReadHandle == NULL) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
@ -441,6 +443,9 @@ tsdbReaderT* tsdbQueryTablesT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGro
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STableCheckInfo *pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
|
|
|
|
|
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
|
|
|
|
|
|
|
|
|
|
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
|
|
|
|
|
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
|
|
|
|
|
pTsdbReadHandle->idStr);
|
|
|
|
@ -526,7 +531,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon
|
|
|
|
|
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
|
|
|
|
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
|
|
|
|
uint64_t taskId) {
|
|
|
|
|
pCond->twindow = updateLastrowForEachGroup(groupList);
|
|
|
|
|
|
|
|
|
@ -535,7 +540,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGrou
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTablesT(tsdb, pCond, groupList, qId, taskId);
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, groupList, qId, taskId);
|
|
|
|
|
if (pTsdbReadHandle == NULL) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
@ -618,7 +623,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
|
|
|
|
|
return pNew;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
|
|
|
|
|
tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
|
|
|
|
|
uint64_t qId, uint64_t taskId) {
|
|
|
|
|
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
|
|
|
|
|
|
|
|
|
@ -634,7 +639,7 @@ tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCon
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTablesT(tsdb, pCond, pNew, qId, taskId);
|
|
|
|
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
|
|
|
|
|
pTsdbReadHandle->loadExternalRow = true;
|
|
|
|
|
pTsdbReadHandle->currentLoadExternalRows = true;
|
|
|
|
|
|
|
|
|
@ -986,7 +991,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
|
|
|
|
|
pCheckInfo->numOfBlocks = 0;
|
|
|
|
|
|
|
|
|
|
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
|
|
|
|
|
table.pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
|
|
|
|
table.pSchema = pTsdbReadHandle->pSchema;
|
|
|
|
|
|
|
|
|
|
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
code = terrno;
|
|
|
|
@ -1091,22 +1096,21 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|
|
|
|
int32_t slotIndex) {
|
|
|
|
|
int64_t st = taosGetTimestampUs();
|
|
|
|
|
|
|
|
|
|
STSchema* pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
|
|
|
|
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
|
|
|
|
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pTsdbReadHandle->pSchema);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
|
|
|
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema);
|
|
|
|
|
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pTsdbReadHandle->pSchema);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
|
|
|
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema);
|
|
|
|
|
code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pTsdbReadHandle->pSchema);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
|
|
|
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
|
|
@ -3292,6 +3296,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat
|
|
|
|
|
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
|
|
|
|
|
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t elapsed = taosGetTimestampUs() - stime;
|
|
|
|
|