Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0
This commit is contained in:
commit
aad3a43530
|
@ -185,6 +185,7 @@ typedef struct SFuncInputRow {
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
bool isDataNull;
|
bool isDataNull;
|
||||||
char* pData;
|
char* pData;
|
||||||
|
char* pPk;
|
||||||
|
|
||||||
SSDataBlock* block; // prev row block or src block
|
SSDataBlock* block; // prev row block or src block
|
||||||
int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock
|
int32_t rowIndex; // prev row block ? 0 : rowIndex in srcBlock
|
||||||
|
@ -198,7 +199,8 @@ typedef struct SFuncInputRowIter {
|
||||||
bool hasPrev;
|
bool hasPrev;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput;
|
SInputColumnInfoData* pInput;
|
||||||
SColumnInfoData* pData;
|
SColumnInfoData* pDataCol;
|
||||||
|
SColumnInfoData* pPkCol;
|
||||||
TSKEY* tsList;
|
TSKEY* tsList;
|
||||||
int32_t rowIndex;
|
int32_t rowIndex;
|
||||||
int32_t inputEndIndex;
|
int32_t inputEndIndex;
|
||||||
|
@ -207,6 +209,7 @@ typedef struct SFuncInputRowIter {
|
||||||
TSKEY prevBlockTsEnd;
|
TSKEY prevBlockTsEnd;
|
||||||
bool prevIsDataNull;
|
bool prevIsDataNull;
|
||||||
char* pPrevData;
|
char* pPrevData;
|
||||||
|
char* pPrevPk;
|
||||||
SSDataBlock* pPrevRowBlock; // pre one row block
|
SSDataBlock* pPrevRowBlock; // pre one row block
|
||||||
|
|
||||||
//TODO:
|
//TODO:
|
||||||
|
|
|
@ -256,7 +256,7 @@ typedef enum EFuncDataRequired {
|
||||||
} EFuncDataRequired;
|
} EFuncDataRequired;
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow);
|
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo);
|
||||||
|
|
||||||
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
|
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
|
||||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
|
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
|
||||||
|
|
|
@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
pDataBlock->info.pks[0].val = *(int64_t*) ekey;
|
pDataBlock->info.pks[0].val = *(int32_t*) ekey;
|
||||||
pDataBlock->info.pks[1].val = *(int64_t*) skey;
|
pDataBlock->info.pks[1].val = *(int32_t*) skey;
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
||||||
|
|
|
@ -141,7 +141,7 @@ static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||||
pKey->ts = pKey->ts;
|
pKey->ts = pRow->ts;
|
||||||
pKey->numOfPKs = pRow->numOfPKs;
|
pKey->numOfPKs = pRow->numOfPKs;
|
||||||
if (pKey->numOfPKs == 0) {
|
if (pKey->numOfPKs == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -164,7 +164,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||||
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
|
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
|
||||||
pKey->pks[i].pData += pKey->pks[i].nData;
|
pKey->pks[i].pData += pKey->pks[i].nData;
|
||||||
} else {
|
} else {
|
||||||
pKey->pks[i].val = *(int64_t*) data + indices[i].offset;
|
pKey->pks[i].val = *(int64_t*) (data + indices[i].offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -694,7 +694,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
w.ekey = pScanInfo->lastProcKey.ts;
|
w.ekey = pScanInfo->lastProcKey.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval
|
if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 1) { // NOTE: specialized for open interval
|
||||||
k += 1;
|
k += 1;
|
||||||
|
|
||||||
if (k >= numOfTables) {
|
if (k >= numOfTables) {
|
||||||
|
@ -2012,41 +2012,58 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
SRowKey rowKey;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader);
|
||||||
|
if (!pIter->hasVal) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tRowGetKeyEx(pRow, &rowKey);
|
||||||
|
int32_t ret = pkCompEx(pReader->pkComparFn, pKey, &rowKey);
|
||||||
|
if (ret == 0) {
|
||||||
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle the open interval issue. Find the first row key that is greater than the given one.
|
||||||
|
static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader);
|
||||||
|
doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
STbData* d = NULL;
|
||||||
|
STbData* di = NULL;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
STsdbReadSnap* pSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
if (pBlockScanInfo->iterInit) {
|
if (pBlockScanInfo->iterInit) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* d = NULL;
|
|
||||||
STsdbRowKey startKey = {0};
|
STsdbRowKey startKey = {0};
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey);
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer;
|
||||||
.key = {
|
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts + 1,
|
|
||||||
.numOfPKs = pReader->suppInfo.numOfPks,
|
|
||||||
}};
|
|
||||||
} else {
|
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
|
||||||
.key = {
|
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts - 1,
|
|
||||||
.numOfPKs = pReader->suppInfo.numOfPks,
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code =
|
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem");
|
||||||
doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem");
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* di = NULL;
|
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pSnap->pIMem, &pBlockScanInfo->iiter, "imem");
|
||||||
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter,
|
|
||||||
"imem");
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
||||||
|
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
pBlockScanInfo->iterInit = true;
|
pBlockScanInfo->iterInit = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -4128,6 +4145,9 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
||||||
blockDataEnsureCapacity(pResBlock, capacity);
|
blockDataEnsureCapacity(pResBlock, capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for debug purpose
|
||||||
|
// capacity = 7;
|
||||||
|
|
||||||
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -4894,6 +4914,10 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pTReader->status;
|
SReaderStatus* pStatus = &pTReader->status;
|
||||||
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
|
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
|
||||||
|
|
||||||
|
// tsdbReaderSuspend2(pReader);
|
||||||
|
// tsdbReaderResume2(pReader);
|
||||||
|
|
||||||
return pTReader->resBlockInfo.pResBlock;
|
return pTReader->resBlockInfo.pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4904,6 +4928,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
// tsdbReaderSuspend2(pReader);
|
// tsdbReaderSuspend2(pReader);
|
||||||
// tsdbReaderResume2(pReader);
|
// tsdbReaderResume2(pReader);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -633,7 +633,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
|
||||||
pDst->numOfPKs = pSrc->numOfPKs;
|
pDst->numOfPKs = pSrc->numOfPKs;
|
||||||
|
|
||||||
if (pSrc->numOfPKs > 0) {
|
if (pSrc->numOfPKs > 0) {
|
||||||
for (int32_t i = 0; i < pDst->numOfPKs; ++i) {
|
for (int32_t i = 0; i < pSrc->numOfPKs; ++i) {
|
||||||
SValue *pVal = &pDst->pks[i];
|
SValue *pVal = &pDst->pks[i];
|
||||||
pVal->type = pSrc->pks[i].type;
|
pVal->type = pSrc->pks[i].type;
|
||||||
|
|
||||||
|
|
|
@ -201,7 +201,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
|
||||||
|
|
||||||
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
|
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
|
||||||
|
|
||||||
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
|
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo);
|
||||||
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
|
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
notLoadBlock = false;
|
notLoadBlock = false;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
||||||
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
|
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
|
||||||
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, STimeWindow* pTimeWindow);
|
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, SDataBlockInfo* pBlocInfo);
|
||||||
typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc);
|
typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc);
|
||||||
|
|
||||||
typedef struct SBuiltinFuncDefinition {
|
typedef struct SBuiltinFuncDefinition {
|
||||||
|
|
|
@ -158,8 +158,8 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pB
|
||||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes);
|
int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes);
|
||||||
EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo);
|
||||||
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo);
|
||||||
|
|
||||||
int32_t lastRowFunction(SqlFunctionCtx* pCtx);
|
int32_t lastRowFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
|
|
|
@ -424,7 +424,8 @@ int32_t funcInputUpdate(SqlFunctionCtx* pCtx) {
|
||||||
if (!pCtx->bInputFinished) {
|
if (!pCtx->bInputFinished) {
|
||||||
pIter->pInput = &pCtx->input;
|
pIter->pInput = &pCtx->input;
|
||||||
pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData;
|
pIter->tsList = (TSKEY*)pIter->pInput->pPTS->pData;
|
||||||
pIter->pData = pIter->pInput->pData[0];
|
pIter->pDataCol = pIter->pInput->pData[0];
|
||||||
|
pIter->pPkCol = pIter->pInput->pPrimaryKey;
|
||||||
pIter->rowIndex = pIter->pInput->startRowIndex;
|
pIter->rowIndex = pIter->pInput->startRowIndex;
|
||||||
pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1;
|
pIter->inputEndIndex = pIter->rowIndex + pIter->pInput->numOfRows - 1;
|
||||||
pIter->pSrcBlock = pCtx->pSrcBlock;
|
pIter->pSrcBlock = pCtx->pSrcBlock;
|
||||||
|
@ -454,10 +455,16 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
|
||||||
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
|
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
|
||||||
blockDataDestroy(pIter->pPrevRowBlock);
|
blockDataDestroy(pIter->pPrevRowBlock);
|
||||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||||
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex);
|
pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex);
|
||||||
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes);
|
|
||||||
char* srcData = colDataGetData(pIter->pData, pIter->inputEndIndex);
|
pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes);
|
||||||
memcpy(pIter->pPrevData, srcData, pIter->pData->info.bytes);
|
char* srcData = colDataGetData(pIter->pDataCol, pIter->inputEndIndex);
|
||||||
|
memcpy(pIter->pPrevData, srcData, pIter->pDataCol->info.bytes);
|
||||||
|
|
||||||
|
pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
|
||||||
|
char* pkData = colDataGetData(pIter->pPkCol, pIter->inputEndIndex);
|
||||||
|
memcpy(pIter->pPrevPk, pkData, pIter->pPkCol->info.bytes);
|
||||||
|
|
||||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||||
|
|
||||||
pIter->hasPrev = true;
|
pIter->hasPrev = true;
|
||||||
|
@ -475,8 +482,9 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
|
||||||
pRow->rowIndex = 0;
|
pRow->rowIndex = 0;
|
||||||
} else {
|
} else {
|
||||||
pRow->ts = pIter->tsList[idx - 1];
|
pRow->ts = pIter->tsList[idx - 1];
|
||||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx - 1);
|
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx - 1);
|
||||||
pRow->pData = colDataGetData(pIter->pData, idx - 1);
|
pRow->pData = colDataGetData(pIter->pDataCol, idx - 1);
|
||||||
|
pRow->pPk = colDataGetData(pIter->pPkCol, idx - 1);
|
||||||
pRow->block = pIter->pSrcBlock;
|
pRow->block = pIter->pSrcBlock;
|
||||||
pRow->rowIndex = idx - 1;
|
pRow->rowIndex = idx - 1;
|
||||||
}
|
}
|
||||||
|
@ -492,20 +500,22 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
|
||||||
++idx;
|
++idx;
|
||||||
}
|
}
|
||||||
pRow->ts = pIter->tsList[idx];
|
pRow->ts = pIter->tsList[idx];
|
||||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx);
|
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
|
||||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
pRow->pData = colDataGetData(pIter->pDataCol, idx);
|
||||||
|
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
|
||||||
pRow->block = pIter->pSrcBlock;
|
pRow->block = pIter->pSrcBlock;
|
||||||
pRow->rowIndex = idx;
|
|
||||||
|
|
||||||
pIter->rowIndex = idx + 1;
|
pIter->rowIndex = idx + 1;
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
pIter->hasPrev = true;
|
pIter->hasPrev = true;
|
||||||
pIter->prevBlockTsEnd = tsEnd;
|
pIter->prevBlockTsEnd = tsEnd;
|
||||||
// TODO
|
pIter->prevIsDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->inputEndIndex);
|
||||||
pIter->prevIsDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->inputEndIndex);
|
pIter->pPrevData = taosMemoryMalloc(pIter->pDataCol->info.bytes);
|
||||||
pIter->pPrevData = taosMemoryMalloc(pIter->pData->info.bytes);
|
memcpy(pIter->pPrevData, colDataGetData(pIter->pDataCol, pIter->inputEndIndex), pIter->pDataCol->info.bytes);
|
||||||
memcpy(pIter->pPrevData, colDataGetData(pIter->pData, pIter->inputEndIndex), pIter->pData->info.bytes);
|
pIter->pPrevPk = taosMemoryMalloc(pIter->pPkCol->info.bytes);
|
||||||
|
memcpy(pIter->pPrevPk, colDataGetData(pIter->pPkCol, pIter->inputEndIndex), pIter->pPkCol->info.bytes);
|
||||||
|
|
||||||
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
pIter->pPrevRowBlock = blockDataExtractBlock(pIter->pSrcBlock, pIter->inputEndIndex, 1);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -523,8 +533,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
++idx;
|
++idx;
|
||||||
}
|
}
|
||||||
pRow->ts = pIter->tsList[idx];
|
pRow->ts = pIter->tsList[idx];
|
||||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, idx);
|
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
|
||||||
pRow->pData = colDataGetData(pIter->pData, idx);
|
pRow->pData = colDataGetData(pIter->pDataCol, idx);
|
||||||
|
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
|
||||||
pRow->block = pIter->pSrcBlock;
|
pRow->block = pIter->pSrcBlock;
|
||||||
pRow->rowIndex = idx;
|
pRow->rowIndex = idx;
|
||||||
|
|
||||||
|
@ -535,8 +546,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
} else {
|
} else {
|
||||||
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
||||||
pRow->ts = pIter->tsList[pIter->rowIndex];
|
pRow->ts = pIter->tsList[pIter->rowIndex];
|
||||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex);
|
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
|
||||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
|
||||||
|
pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex);
|
||||||
pRow->block = pIter->pSrcBlock;
|
pRow->block = pIter->pSrcBlock;
|
||||||
pRow->rowIndex = pIter->rowIndex;
|
pRow->rowIndex = pIter->rowIndex;
|
||||||
|
|
||||||
|
@ -563,8 +575,9 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
if (pIter->rowIndex <= pIter->inputEndIndex) {
|
||||||
pRow->ts = pIter->tsList[pIter->rowIndex];
|
pRow->ts = pIter->tsList[pIter->rowIndex];
|
||||||
pRow->isDataNull = colDataIsNull_f(pIter->pData->nullbitmap, pIter->rowIndex);
|
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
|
||||||
pRow->pData = colDataGetData(pIter->pData, pIter->rowIndex);
|
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
|
||||||
|
pRow->pPk = NULL;
|
||||||
pRow->block = pIter->pSrcBlock;
|
pRow->block = pIter->pSrcBlock;
|
||||||
pRow->rowIndex = pIter->rowIndex;
|
pRow->rowIndex = pIter->rowIndex;
|
||||||
|
|
||||||
|
@ -2290,7 +2303,31 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
// TODO: change this function when block data info pks changed
|
||||||
|
static int32_t comparePkDataWithSValue(int8_t pkType, char* pkData, SValue* pVal, int32_t order) {
|
||||||
|
char numVal[8] = {0};
|
||||||
|
switch (pkType) {
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
*(int32_t*)numVal = (int32_t)pVal->val;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
*(uint32_t*)numVal = (uint32_t)pVal->val;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
*(int64_t*)numVal = (int64_t)pVal->val;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
*(uint64_t*)numVal = (uint64_t)pVal->val;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
char* blockData = (IS_NUMERIC_TYPE(pkType)) ? (char*) numVal : (char*)pVal->pData;
|
||||||
|
__compar_fn_t fn = getKeyComparFunc(pkType, order);
|
||||||
|
return fn(pkData, blockData);
|
||||||
|
}
|
||||||
|
|
||||||
|
EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
|
||||||
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
|
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
|
||||||
|
|
||||||
// not initialized yet, data is required
|
// not initialized yet, data is required
|
||||||
|
@ -2299,14 +2336,21 @@ EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
||||||
if (pResult->hasResult && pResult->ts <= pTimeWindow->skey) {
|
if (pResult->hasResult) {
|
||||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
if (pResult->ts < pBlockInfo->window.skey) {
|
||||||
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
} else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) {
|
||||||
|
if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 0, TSDB_ORDER_ASC) < 0) {
|
||||||
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
} else {
|
} else {
|
||||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
|
||||||
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
|
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
|
||||||
|
|
||||||
// not initialized yet, data is required
|
// not initialized yet, data is required
|
||||||
|
@ -2315,8 +2359,15 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
||||||
if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) {
|
if (pResult->hasResult) {
|
||||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
if (pResult->ts > pBlockInfo->window.ekey) {
|
||||||
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
} else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) {
|
||||||
|
if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 1, TSDB_ORDER_DESC) < 0) {
|
||||||
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
} else {
|
} else {
|
||||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
||||||
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
|
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow) {
|
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo) {
|
||||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -128,7 +128,7 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow*
|
||||||
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
|
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
|
||||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
} else {
|
} else {
|
||||||
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
|
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue