Merge remote-tracking branch 'origin/feat/TS-4243-3.0' into feat/TS-4243-3.0

This commit is contained in:
Haojun Liao 2024-03-25 14:21:00 +08:00
commit 2cddeccfc7
8 changed files with 59 additions and 17 deletions

View File

@ -256,7 +256,7 @@ typedef enum EFuncDataRequired {
} EFuncDataRequired;
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow);
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo);
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);

View File

@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
}
} else {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int64_t*) ekey;
pDataBlock->info.pks[1].val = *(int64_t*) skey;
pDataBlock->info.pks[0].val = *(int32_t*) ekey;
pDataBlock->info.pks[1].val = *(int32_t*) skey;
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
pDataBlock->info.pks[0].nData = varDataLen(ekey);

View File

@ -501,8 +501,12 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
char* pk = (pSchema->flags & COL_IS_KEY) ? "PRIMARY KEY" : "";
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s %s", ((i > 0) ? ", " : ""), pSchema->name, type, pk);
if (!(pSchema->flags & COL_IS_KEY)) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
} else {
char* pk = "PRIMARY KEY";
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s %s", ((i > 0) ? ", " : ""), pSchema->name, type, pk);
}
}
}

View File

@ -201,7 +201,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo);
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
notLoadBlock = false;
break;

View File

@ -25,7 +25,7 @@ extern "C" {
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, STimeWindow* pTimeWindow);
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, SDataBlockInfo* pBlocInfo);
typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc);
typedef struct SBuiltinFuncDefinition {

View File

@ -158,8 +158,8 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pB
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes, int32_t pkBytes);
EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow);
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo);
EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo);
int32_t lastRowFunction(SqlFunctionCtx* pCtx);

View File

@ -2290,7 +2290,31 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
return TSDB_CODE_SUCCESS;
}
EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
// TODO: change this function when block data info pks changed
static int32_t comparePkDataWithSValue(int8_t pkType, char* pkData, SValue* pVal, int32_t order) {
char numVal[8] = {0};
switch (pkType) {
case TSDB_DATA_TYPE_INT:
*(int32_t*)numVal = (int32_t)pVal->val;
break;
case TSDB_DATA_TYPE_UINT:
*(uint32_t*)numVal = (uint32_t)pVal->val;
break;
case TSDB_DATA_TYPE_BIGINT:
*(int64_t*)numVal = (int64_t)pVal->val;
break;
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t*)numVal = (uint64_t)pVal->val;
break;
default:
break;
}
char* blockData = (IS_NUMERIC_TYPE(pkType)) ? (char*) numVal : (char*)pVal->pData;
__compar_fn_t fn = getKeyComparFunc(pkType, order);
return fn(pkData, blockData);
}
EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
// not initialized yet, data is required
@ -2299,14 +2323,21 @@ EFuncDataRequired firstDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
}
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
if (pResult->hasResult && pResult->ts <= pTimeWindow->skey) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
if (pResult->hasResult) {
if (pResult->ts < pBlockInfo->window.skey) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
} else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) {
if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 0, TSDB_ORDER_ASC) < 0) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
}
}
return FUNC_DATA_REQUIRED_DATA_LOAD;
} else {
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
}
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*)pRes;
// not initialized yet, data is required
@ -2315,8 +2346,15 @@ EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
}
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
if (pResult->hasResult) {
if (pResult->ts > pBlockInfo->window.ekey) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
} else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) {
if (comparePkDataWithSValue(pResult->pkType, pResult->pkData, pBlockInfo->pks + 1, TSDB_ORDER_DESC) < 0) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
}
}
return FUNC_DATA_REQUIRED_DATA_LOAD;
} else {
return FUNC_DATA_REQUIRED_DATA_LOAD;
}

View File

@ -115,7 +115,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
}
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* pTimeWindow) {
EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, SDataBlockInfo* pBlockInfo) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED;
}
@ -128,7 +128,7 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow*
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
return FUNC_DATA_REQUIRED_DATA_LOAD;
} else {
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pBlockInfo);
}
}