fix(tsdb): set correct merge row.
This commit is contained in:
parent
af39260ec7
commit
97f7e67234
|
@ -1925,10 +1925,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey minKey = {0};
|
SRowKey minKey = k;
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
minKey = k; // let's find the minimum
|
|
||||||
|
|
||||||
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
@ -1941,7 +1939,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = k; // let find the maximum ts value
|
|
||||||
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
@ -1968,7 +1965,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) {
|
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -138,7 +138,7 @@ typedef struct SElapsedInfo {
|
||||||
|
|
||||||
typedef struct STwaInfo {
|
typedef struct STwaInfo {
|
||||||
double dOutput;
|
double dOutput;
|
||||||
bool isNull;
|
int64_t numOfElems;
|
||||||
SPoint1 p;
|
SPoint1 p;
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
} STwaInfo;
|
} STwaInfo;
|
||||||
|
@ -600,10 +600,10 @@ bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
||||||
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
||||||
if (pCtx->hasPrimaryKey) {
|
if (pCtx->hasPrimaryKey) {
|
||||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||||
return funcInputGetNextRowDescPk(pIter, pRow);
|
|
||||||
} else {
|
|
||||||
return funcInputGetNextRowAscPk(pIter, pRow);
|
return funcInputGetNextRowAscPk(pIter, pRow);
|
||||||
|
} else {
|
||||||
|
return funcInputGetNextRowDescPk(pIter, pRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return funcInputGetNextRowNoPk(pIter, pRow);
|
return funcInputGetNextRowNoPk(pIter, pRow);
|
||||||
|
@ -5556,7 +5556,7 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
pInfo->isNull = false;
|
pInfo->numOfElems = 0;
|
||||||
pInfo->p.key = INT64_MIN;
|
pInfo->p.key = INT64_MIN;
|
||||||
pInfo->win = TSWINDOW_INITIALIZER;
|
pInfo->win = TSWINDOW_INITIALIZER;
|
||||||
return true;
|
return true;
|
||||||
|
@ -5581,13 +5581,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SPoint1* last = &pInfo->p;
|
||||||
SPoint1* last = &pInfo->p;
|
|
||||||
int32_t numOfElems = 0;
|
|
||||||
|
|
||||||
if (IS_NULL_TYPE(pInputCol->info.type)) {
|
if (IS_NULL_TYPE(pInputCol->info.type)) {
|
||||||
pInfo->isNull = true;
|
pInfo->numOfElems = 0;
|
||||||
goto _twa_over;
|
goto _twa_over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5605,7 +5603,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
pInfo->dOutput += twa_get_area(pCtx->start, *last);
|
pInfo->dOutput += twa_get_area(pCtx->start, *last);
|
||||||
pInfo->win.skey = pCtx->start.key;
|
pInfo->win.skey = pCtx->start.key;
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (pInfo->p.key == INT64_MIN) {
|
} else if (pInfo->p.key == INT64_MIN) {
|
||||||
|
@ -5619,7 +5617,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
||||||
|
|
||||||
pInfo->win.skey = last->key;
|
pInfo->win.skey = last->key;
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5633,7 +5631,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5651,7 +5649,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5668,7 +5666,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5685,7 +5683,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5702,7 +5700,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5719,7 +5717,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5736,7 +5734,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5753,7 +5751,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5770,7 +5768,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5787,7 +5785,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5808,16 +5806,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (pCtx->end.key != INT64_MIN) {
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
|
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
|
||||||
pInfo->p = pCtx->end;
|
pInfo->p = pCtx->end;
|
||||||
numOfElems += 1;
|
pInfo->numOfElems += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->win.ekey = pInfo->p.key;
|
pInfo->win.ekey = pInfo->p.key;
|
||||||
|
|
||||||
_twa_over:
|
_twa_over:
|
||||||
if (numOfElems == 0) {
|
|
||||||
pInfo->isNull = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(pResInfo, 1, 1);
|
SET_VAL(pResInfo, 1, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -5838,7 +5832,7 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
if (pInfo->isNull == true) {
|
if (pInfo->numOfElems == 0) {
|
||||||
pResInfo->numOfRes = 0;
|
pResInfo->numOfRes = 0;
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->win.ekey == pInfo->win.skey) {
|
if (pInfo->win.ekey == pInfo->win.skey) {
|
||||||
|
|
Loading…
Reference in New Issue