[td-225] fix bugs in sliding query.

This commit is contained in:
Haojun Liao 2020-07-07 10:56:58 +08:00
parent b93509f35f
commit 96da5e233e
3 changed files with 91 additions and 61 deletions

View File

@ -81,7 +81,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
return startTime; return startTime;
} }
int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime; int64_t start = ((startTime - slidingTime) / slidingTime + 1) * slidingTime;
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) { if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
/* /*
* here we revised the start time of day according to the local time zone, * here we revised the start time of day according to the local time zone,

View File

@ -396,13 +396,14 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataSt
} }
static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
int16_t bytes) { int16_t bytes, bool masterscan) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes); int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes);
if (p1 != NULL) { if (p1 != NULL) {
pWindowResInfo->curIndex = *p1; pWindowResInfo->curIndex = *p1;
} else { // more than the capacity, reallocate the resources } else {
if (masterscan) { // more than the capacity, reallocate the resources
if (pWindowResInfo->size >= pWindowResInfo->capacity) { if (pWindowResInfo->size >= pWindowResInfo->capacity) {
int64_t newCap = pWindowResInfo->capacity * 2; int64_t newCap = pWindowResInfo->capacity * 2;
@ -424,6 +425,9 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
// add a new result set for a new group // add a new result set for a new group
pWindowResInfo->curIndex = pWindowResInfo->size++; pWindowResInfo->curIndex = pWindowResInfo->size++;
taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
} else {
return NULL;
}
} }
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex); return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex);
@ -510,15 +514,19 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
} }
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid, static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
STimeWindow *win) { STimeWindow *win, bool masterscan, bool* newWind) {
assert(win->skey <= win->ekey); assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey,
TSDB_KEYSIZE, masterscan);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return -1; *newWind = false;
return masterscan? -1:0;
} }
*newWind = true;
// not assign result buffer yet, add new result buffer // not assign result buffer yet, add new result buffer
if (pWindowRes->pos.pageId == -1) { if (pWindowRes->pos.pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
@ -685,6 +693,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].base.functionId; int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
@ -705,6 +714,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
} }
}
} }
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
@ -712,6 +722,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].nStartQueryTimestamp = pWin->skey;
@ -720,6 +731,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
aAggs[functionId].xFunctionF(&pCtx[k], offset); aAggs[functionId].xFunctionF(&pCtx[k], offset);
} }
} }
}
} }
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin, static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin,
@ -879,6 +891,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
__block_search_fn_t searchFn, SArray *pDataBlock) { __block_search_fn_t searchFn, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
TSKEY *tsCols = NULL; TSKEY *tsCols = NULL;
@ -902,18 +915,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = tsCols[offset]; TSKEY ts = tsCols[offset];
bool hasTimeWindow = false;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
tfree(sasArray); tfree(sasArray);
return; return;
} }
if (hasTimeWindow) {
TSKEY ekey = reviseWindowEkey(pQuery, &win); TSKEY ekey = reviseWindowEkey(pQuery, &win);
int32_t forwardStep = int32_t forwardStep =
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows);
}
int32_t index = pWindowResInfo->curIndex; int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win; STimeWindow nextWin = win;
@ -925,14 +941,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
} }
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { bool hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break; break;
} }
ekey = reviseWindowEkey(pQuery, &nextWin); if (!hasTimeWindow) {
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); continue;
}
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
int32_t forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
SWindowStatus* pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
} }
@ -982,7 +1003,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
} }
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2); // assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return -1; return -1;
} }
@ -1112,6 +1133,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current; STableQueryInfo* item = pQuery->current;
@ -1183,11 +1205,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int64_t ts = tsCols[offset]; int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win); bool hasTimeWindow = false;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue; continue;
} }
if (!hasTimeWindow) {
continue;
}
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset); doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
@ -1207,13 +1234,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
} }
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { bool hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) {
break; break;
} }
if (hasTimeWindow) {
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset); doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset);
} }
}
pWindowResInfo->curIndex = index; pWindowResInfo->curIndex = index;
} else { // other queries } else { // other queries
@ -3388,7 +3418,8 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
} }
int32_t GROUPRESULTID = 1; int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, sizeof(groupIndex)); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true);
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return; return;
} }

View File

@ -190,8 +190,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
} }
// get the result order // get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? 1:-1;
TSDB_ORDER_ASC:TSDB_ORDER_DESC;
if (order != resultOrder) { if (order != resultOrder) {
return; return;