Merge pull request #14762 from taosdata/feature/3_liaohj
fix(query): add more check for fetch rsp, and set the correct start time window for fill.
This commit is contained in:
commit
0b45f95211
|
@ -1476,12 +1476,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
|
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
if (setupOneRowPtr) {
|
||||||
doSetOneRowPtr(pResultInfo);
|
doSetOneRowPtr(pResultInfo);
|
||||||
pResultInfo->current += 1;
|
pResultInfo->current += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
||||||
|
|
|
@ -779,6 +779,8 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||||
|
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
||||||
|
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||||
|
|
||||||
|
|
|
@ -869,7 +869,7 @@ static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
||||||
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
|
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
|
||||||
|
|
||||||
STimeWindow win = *pWindow;
|
STimeWindow win = *pWindow;
|
||||||
|
|
|
@ -545,9 +545,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
|
||||||
if (pCtx[k].fpSet.process == NULL) {
|
if (pCtx[k].fpSet.process == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
|
||||||
qDebug("page_process");
|
|
||||||
#endif
|
|
||||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||||
|
@ -4010,6 +4008,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
|
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
|
getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
|
||||||
|
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
|
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
|
||||||
|
|
Loading…
Reference in New Issue