fix td-838
This commit is contained in:
parent
3302f60e45
commit
87ae29492e
|
@ -384,11 +384,13 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdResetDataCols(SDataCols *pCols) {
|
void tdResetDataCols(SDataCols *pCols) {
|
||||||
|
if (pCols != NULL) {
|
||||||
pCols->numOfRows = 0;
|
pCols->numOfRows = 0;
|
||||||
for (int i = 0; i < pCols->maxCols; i++) {
|
for (int i = 0; i < pCols->maxCols; i++) {
|
||||||
dataColReset(pCols->cols + i);
|
dataColReset(pCols->cols + i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
|
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
|
||||||
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
|
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
|
||||||
|
|
|
@ -2202,7 +2202,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (true) {
|
||||||
|
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
summary->totalBlocks += 1;
|
summary->totalBlocks += 1;
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
|
@ -3188,6 +3194,9 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
||||||
|
|
||||||
// add ref for table
|
// add ref for table
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
switchCtxOrder(pRuntimeEnv);
|
switchCtxOrder(pRuntimeEnv);
|
||||||
|
@ -3260,6 +3269,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
|
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
|
@ -3916,7 +3928,14 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (true) {
|
||||||
|
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
@ -3960,7 +3979,14 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
|
while (true) {
|
||||||
|
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
|
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
@ -4059,16 +4085,16 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
if (onlyQueryTags(pQuery)) {
|
if (onlyQueryTags(pQuery)) {
|
||||||
return;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) {
|
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) {
|
||||||
return;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbQueryCond cond = {
|
STsdbQueryCond cond = {
|
||||||
|
@ -4090,6 +4116,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
cond.twindow = pCheckInfo->win;
|
cond.twindow = pCheckInfo->win;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
if (isFirstLastRowQuery(pQuery)) {
|
if (isFirstLastRowQuery(pQuery)) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
} else if (isPointInterpoQuery(pQuery)) {
|
} else if (isPointInterpoQuery(pQuery)) {
|
||||||
|
@ -4097,6 +4124,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
}
|
}
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
||||||
|
@ -4133,7 +4161,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
changeExecuteScanOrder(pQInfo, false);
|
changeExecuteScanOrder(pQInfo, false);
|
||||||
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
pQInfo->vgId = vgId;
|
pQInfo->vgId = vgId;
|
||||||
|
@ -4257,7 +4288,14 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
|
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (true) {
|
||||||
|
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
summary->totalBlocks += 1;
|
summary->totalBlocks += 1;
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
|
@ -4338,6 +4376,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
taosArrayDestroy(tx);
|
taosArrayDestroy(tx);
|
||||||
taosArrayDestroy(g1);
|
taosArrayDestroy(g1);
|
||||||
|
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||||
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
||||||
|
@ -4406,6 +4447,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(tx);
|
||||||
|
taosArrayDestroy(g1);
|
||||||
|
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
initCtxOutputBuf(pRuntimeEnv);
|
initCtxOutputBuf(pRuntimeEnv);
|
||||||
|
|
||||||
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
|
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
|
||||||
|
@ -4469,6 +4516,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
taosArrayDestroy(g1);
|
taosArrayDestroy(g1);
|
||||||
taosArrayDestroy(tx);
|
taosArrayDestroy(tx);
|
||||||
|
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
|
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
|
||||||
assert(taosArrayGetSize(s) >= 1);
|
assert(taosArrayGetSize(s) >= 1);
|
||||||
|
@ -4664,6 +4714,9 @@ static void doSaveContext(SQInfo *pQInfo) {
|
||||||
|
|
||||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
switchCtxOrder(pRuntimeEnv);
|
switchCtxOrder(pRuntimeEnv);
|
||||||
|
|
|
@ -179,7 +179,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
||||||
pQueryHandle->allocSize = 0;
|
pQueryHandle->allocSize = 0;
|
||||||
|
|
||||||
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
|
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
|
||||||
|
free(pQueryHandle);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
||||||
|
|
||||||
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
||||||
|
@ -238,11 +241,11 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
|
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
||||||
|
if (pQueryHandle != NULL) {
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
||||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||||
|
|
||||||
changeQueryHandleForLastrowQuery(pQueryHandle);
|
changeQueryHandleForLastrowQuery(pQueryHandle);
|
||||||
|
}
|
||||||
return pQueryHandle;
|
return pQueryHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,9 +267,10 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
|
||||||
|
|
||||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
|
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
||||||
|
if (pQueryHandle != NULL) {
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
|
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
|
||||||
changeQueryHandleForInterpQuery(pQueryHandle);
|
changeQueryHandleForInterpQuery(pQueryHandle);
|
||||||
|
}
|
||||||
return pQueryHandle;
|
return pQueryHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1522,7 +1526,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
pSecQueryHandle->activeIndex = 0;
|
pSecQueryHandle->activeIndex = 0;
|
||||||
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
|
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
|
||||||
|
|
||||||
tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
|
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
|
||||||
|
free(pSecQueryHandle);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
|
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
|
@ -1606,7 +1613,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: opt by consider the scan order
|
// TODO: opt by consider the scan order
|
||||||
return doHasDataInBuffer(pQueryHandle);
|
bool ret = doHasDataInBuffer(pQueryHandle);
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
||||||
|
|
Loading…
Reference in New Issue