diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 4ee0989118..78bd1d807b 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -185,6 +185,7 @@ typedef struct SFuncInputRow { TSKEY ts; bool isDataNull; char* pData; + char* pPk; SSDataBlock* block; // prev row block or src block int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock @@ -198,7 +199,8 @@ typedef struct SFuncInputRowIter { bool hasPrev; SInputColumnInfoData* pInput; - SColumnInfoData* pData; + SColumnInfoData* pDataCol; + SColumnInfoData* pPkCol; TSKEY* tsList; int32_t rowIndex; int32_t inputEndIndex; @@ -207,6 +209,7 @@ typedef struct SFuncInputRowIter { TSKEY prevBlockTsEnd; bool prevIsDataNull; char* pPrevData; + char* pPrevPk; SSDataBlock* pPrevRowBlock; // pre one row block //TODO: diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f126bb587a..72e2815ade 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -256,7 +256,7 @@ typedef enum EFuncDataRequired { } EFuncDataRequired; EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); -EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow); +EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo); int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0d805cb8a4..d2164b024b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b } } else { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { - pDataBlock->info.pks[0].val = *(int64_t*) ekey; - pDataBlock->info.pks[1].val = *(int64_t*) skey; + pDataBlock->info.pks[0].val = *(int32_t*) ekey; + pDataBlock->info.pks[1].val = *(int32_t*) skey; } else { // todo refactor memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); pDataBlock->info.pks[0].nData = varDataLen(ekey); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6e134698b5..891fd823e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -141,7 +141,7 @@ static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) { } static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { - pKey->ts = pKey->ts; + pKey->ts = pRow->ts; pKey->numOfPKs = pRow->numOfPKs; if (pKey->numOfPKs == 0) { return; @@ -164,7 +164,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; } else { - pKey->pks[i].val = *(int64_t*) data + indices[i].offset; + pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); } } } @@ -694,7 +694,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN w.ekey = pScanInfo->lastProcKey.ts; } - if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval + if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 1) { // NOTE: specialized for open interval k += 1; if (k >= numOfTables) { @@ -2012,41 +2012,58 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan return code; } +static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + SRowKey rowKey; + + while (1) { + TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader); + if (!pIter->hasVal) { + break; + } + + tRowGetKeyEx(pRow, &rowKey); + int32_t ret = pkCompEx(pReader->pkComparFn, pKey, &rowKey); + if (ret == 0) { + pIter->hasVal = tsdbTbDataIterNext(pIter->iter); + } else { + break; + } + } +} + +// handle the open interval issue. Find the first row key that is greater than the given one. +static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader); + doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader); + return TSDB_CODE_SUCCESS; +} + static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + STbData* d = NULL; + STbData* di = NULL; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STsdbReadSnap* pSnap = pReader->pReadSnap; + if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; } - STbData* d = NULL; STsdbRowKey startKey = {0}; - if (ASCENDING_TRAVERSE(pReader->info.order)) { - startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer, - .key = { - .ts = pBlockScanInfo->lastProcKey.ts + 1, - .numOfPKs = pReader->suppInfo.numOfPks, - }}; - } else { - startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer, - .key = { - .ts = pBlockScanInfo->lastProcKey.ts - 1, - .numOfPKs = pReader->suppInfo.numOfPks, - }}; - } + tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey); + startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer; - int32_t code = - doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem"); + int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem"); if (code != TSDB_CODE_SUCCESS) { return code; } - STbData* di = NULL; - code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter, - "imem"); + code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pSnap->pIMem, &pBlockScanInfo->iiter, "imem"); if (code != TSDB_CODE_SUCCESS) { return code; } loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + forwardDataIter(&startKey.key, pBlockScanInfo, pReader); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -4128,6 +4145,9 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi blockDataEnsureCapacity(pResBlock, capacity); } + // for debug purpose +// capacity = 7; + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4894,6 +4914,10 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { SReaderStatus* pStatus = &pTReader->status; if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) { + +// tsdbReaderSuspend2(pReader); +// tsdbReaderResume2(pReader); + return pTReader->resBlockInfo.pResBlock; } @@ -4904,6 +4928,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); + return ret; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index fe7a3457bd..59452ebb9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -633,7 +633,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { pDst->numOfPKs = pSrc->numOfPKs; if (pSrc->numOfPKs > 0) { - for (int32_t i = 0; i < pDst->numOfPKs; ++i) { + for (int32_t i = 0; i < pSrc->numOfPKs; ++i) { SValue *pVal = &pDst->pks[i]; pVal->type = pSrc->pks[i].type; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1e5fa065de..a51e627272 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -201,7 +201,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset); - int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); + int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo); if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { notLoadBlock = false; break; diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 4c1e46dbba..af33c3ffa2 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -25,7 +25,7 @@ extern "C" { typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len); typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow); typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters); -typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, STimeWindow* pTimeWindow); +typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, SDataBlockInfo* pBlocInfo); typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc); typedef struct SBuiltinFuncDefinition { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ccb50ff2a8..54bcd26bc3 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -158,8 +158,8 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pB int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes); -EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow); -EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); +EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo); +EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo); int32_t lastRowFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 097b263b8e..2907c096fe 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -424,7 +424,8 @@ int32_t funcInputUpdate(SqlFunctionCtx* pCtx) { if (!pCtx->bInputFinished) { pIter->pInput = &pCtx->input; pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData; - pIter->pData = pIter->pInput->pData[0]; + pIter->pDataCol = pIter->pInput->pData[0]; + pIter->pPkCol = pIter->pInput->pPrimaryKey; pIter->rowIndex = pIter->pInput->startRowIndex; pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1; pIter->pSrcBlock = pCtx->pSrcBlock; @@ -454,10 +455,16 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { blockDataDestroy(pIter->pPrevRowBlock); pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); - pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); - pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes); - char* srcData = colDataGetData(pIter->pData, pIter->inputEndIndex); - memcpy(pIter->pPrevData, srcData, pIter->pData->info.bytes); + pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); + + pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); + char* srcData = colDataGetData(pIter->pDataCol, pIter->inputEndIndex); + memcpy(pIter->pPrevData, srcData, pIter->pDataCol->info.bytes); + + pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes); + char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex); + memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes); + pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); pIter->hasPrev = true; @@ -475,8 +482,9 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { pRow->rowIndex = 0; } else { pRow->ts = pIter->tsList[idx - 1]; - pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx - 1); - pRow->pData = colDataGetData(pIter->pData, idx - 1); + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx - 1); + pRow->pData = colDataGetData(pIter->pDataCol, idx - 1); + pRow->pPk = colDataGetData(pIter->pPkCol, idx - 1); pRow->block = pIter->pSrcBlock; pRow->rowIndex = idx - 1; } @@ -492,20 +500,22 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { ++idx; } pRow->ts = pIter->tsList[idx]; - pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); - pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx); + pRow->pData = colDataGetData(pIter->pDataCol, idx); + pRow->pPk = colDataGetData(pIter->pPkCol, idx); pRow->block = pIter->pSrcBlock; - pRow->rowIndex = idx; pIter->rowIndex = idx + 1; return true; } else { pIter->hasPrev = true; pIter->prevBlockTsEnd = tsEnd; - // TODO - pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex); - pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes); - memcpy(pIter->pPrevData, colDataGetData(pIter->pData, pIter->inputEndIndex), pIter->pData->info.bytes); + pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex); + pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes); + memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes); + pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes); + memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes); + pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1); return false; } @@ -523,8 +533,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { ++idx; } pRow->ts = pIter->tsList[idx]; - pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx); - pRow->pData = colDataGetData(pIter->pData, idx); + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx); + pRow->pData = colDataGetData(pIter->pDataCol, idx); + pRow->pPk = colDataGetData(pIter->pPkCol, idx); pRow->block = pIter->pSrcBlock; pRow->rowIndex = idx; @@ -535,8 +546,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { } else { if (pIter->rowIndex <= pIter->inputEndIndex) { pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); + pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); + pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex); pRow->block = pIter->pSrcBlock; pRow->rowIndex = pIter->rowIndex; @@ -563,8 +575,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { if (pIter->rowIndex <= pIter->inputEndIndex) { pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex); + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); + pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); + pRow->pPk = NULL; pRow->block = pIter->pSrcBlock; pRow->rowIndex = pIter->rowIndex; @@ -2290,7 +2303,31 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } -EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) { +// TODO: change this function when block data info pks changed +static int32_t comparePkDataWithSValue(int8_t pkType, char* pkData, SValue* pVal, int32_t order) { + char numVal[8] = {0}; + switch (pkType) { + case TSDB_DATA_TYPE_INT: + *(int32_t*)numVal = (int32_t)pVal->val; + break; + case TSDB_DATA_TYPE_UINT: + *(uint32_t*)numVal = (uint32_t)pVal->val; + break; + case TSDB_DATA_TYPE_BIGINT: + *(int64_t*)numVal = (int64_t)pVal->val; + break; + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t*)numVal = (uint64_t)pVal->val; + break; + default: + break; + } + char* blockData = (IS_NUMERIC_TYPE(pkType)) ? (char*) numVal : (char*)pVal->pData; + __compar_fn_t fn = getKeyComparFunc(pkType, order); + return fn(pkData, blockData); +} + +EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) { SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes; // not initialized yet, data is required @@ -2299,14 +2336,21 @@ EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) { } SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); - if (pResult->hasResult && pResult->ts <= pTimeWindow->skey) { - return FUNC_DATA_REQUIRED_NOT_LOAD; + if (pResult->hasResult) { + if (pResult->ts < pBlockInfo->window.skey) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) { + if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 0, TSDB_ORDER_ASC) < 0) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } + } + return FUNC_DATA_REQUIRED_DATA_LOAD; } else { return FUNC_DATA_REQUIRED_DATA_LOAD; } } -EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { +EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) { SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes; // not initialized yet, data is required @@ -2315,8 +2359,15 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { } SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); - if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) { - return FUNC_DATA_REQUIRED_NOT_LOAD; + if (pResult->hasResult) { + if (pResult->ts > pBlockInfo->window.ekey) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) { + if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 1, TSDB_ORDER_DESC) < 0) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } + } + return FUNC_DATA_REQUIRED_DATA_LOAD; } else { return FUNC_DATA_REQUIRED_DATA_LOAD; } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index ae3958647b..697efb63fd 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -115,7 +115,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow); } -EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow) { +EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; } @@ -128,7 +128,7 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) { return FUNC_DATA_REQUIRED_DATA_LOAD; } else { - return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo); } }