[td-225] fix bugs in sliding query
This commit is contained in:
parent
1761b01107
commit
b1f85ec011
|
@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
|
||||
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
|
||||
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
|
||||
pUpdateMsg->tid = htonl(pTableMeta->sid);
|
||||
pUpdateMsg->uid = htobe64(pTableMeta->uid);
|
||||
pUpdateMsg->colId = htons(pTagsSchema->colId);
|
||||
pUpdateMsg->tversion = htons(pTableMeta->tversion);
|
||||
pUpdateMsg->tid = htonl(pTableMeta->sid);
|
||||
pUpdateMsg->uid = htobe64(pTableMeta->uid);
|
||||
pUpdateMsg->colId = htons(pTagsSchema->colId);
|
||||
pUpdateMsg->type = htons(pTagsSchema->type);
|
||||
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
|
||||
pUpdateMsg->tversion = htons(pTableMeta->tversion);
|
||||
pUpdateMsg->numOfTags = htons(numOfTags);
|
||||
pUpdateMsg->schemaLen = htonl(schemaLen);
|
||||
|
||||
|
|
|
@ -285,6 +285,8 @@ typedef struct {
|
|||
int32_t tid;
|
||||
int16_t tversion;
|
||||
int16_t colId;
|
||||
int16_t type;
|
||||
int16_t bytes;
|
||||
int32_t tagValLen;
|
||||
int16_t numOfTags;
|
||||
int32_t schemaLen;
|
||||
|
|
|
@ -1099,7 +1099,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
|||
// todo add comments
|
||||
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
|
||||
return pCtx->param[0].i64Key == pQuery->order.order;
|
||||
// return !QUERY_IS_ASC_QUERY(pQuery);
|
||||
}
|
||||
|
||||
// in the supplementary scan, only the following functions need to be executed
|
||||
|
@ -4716,8 +4715,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
el = scanMultiTableDataBlocks(pQInfo);
|
||||
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
|
||||
|
||||
doRestoreContext(pQInfo);
|
||||
doCloseAllTimeWindowAfterScan(pQInfo);
|
||||
doRestoreContext(pQInfo);
|
||||
} else {
|
||||
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
|
||||
}
|
||||
|
|
|
@ -180,19 +180,34 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
|
|||
|
||||
/*
|
||||
* remove the results that are not the FIRST time window that spreads beyond the
|
||||
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
|
||||
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
|
||||
* NOTE: remove redundant, only when the result set order equals to traverse order
|
||||
*/
|
||||
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
|
||||
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
|
||||
|
||||
int32_t i = 0;
|
||||
while (i < pWindowResInfo->size &&
|
||||
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) ||
|
||||
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) {
|
||||
++i;
|
||||
if (pWindowResInfo->size <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
// get the result order
|
||||
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)?
|
||||
TSDB_ORDER_ASC:TSDB_ORDER_DESC;
|
||||
|
||||
if (order != resultOrder) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
if (order == QUERY_ASC_FORWARD_STEP) {
|
||||
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.ekey < lastKey)) {
|
||||
++i;
|
||||
}
|
||||
} else if (order == QUERY_DESC_FORWARD_STEP) {
|
||||
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.skey > lastKey)) {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
// assert(i < pWindowResInfo->size);
|
||||
if (i < pWindowResInfo->size) {
|
||||
pWindowResInfo->size = (i + 1);
|
||||
}
|
||||
|
|
|
@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
|
|||
* To flush data to disk to accommodate more data
|
||||
*/
|
||||
if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) {
|
||||
if (!tExtMemBufferFlush(pMemBuffer)) {
|
||||
if (tExtMemBufferFlush(pMemBuffer) != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
|
|||
size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
if (retVal <= 0) { // failed to write to buffer, may be not enough space
|
||||
ret = TAOS_SYSTEM_ERROR(errno);
|
||||
return ret;
|
||||
}
|
||||
|
||||
pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
|
||||
|
|
|
@ -274,6 +274,8 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
|||
pMsg->tid = htonl(pMsg->tid);
|
||||
pMsg->tversion = htons(pMsg->tversion);
|
||||
pMsg->colId = htons(pMsg->colId);
|
||||
pMsg->type = htons(pMsg->type);
|
||||
pMsg->bytes = htons(pMsg->bytes);
|
||||
pMsg->tagValLen = htonl(pMsg->tagValLen);
|
||||
pMsg->numOfTags = htons(pMsg->numOfTags);
|
||||
pMsg->schemaLen = htonl(pMsg->schemaLen);
|
||||
|
|
Loading…
Reference in New Issue