fix:add combine function for groupKey
This commit is contained in:
parent
323d8ee0b4
commit
ffa59f69aa
|
@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
|
||||||
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
|
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
|
||||||
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
|
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
|
||||||
bool fmIsInvertible(int32_t funcId);
|
bool fmIsInvertible(int32_t funcId);
|
||||||
|
char* fmGetFuncName(int32_t funcId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
||||||
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
|
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
|
||||||
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
|
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
}
|
||||||
|
} else if (pDestCtx[k].fpSet.combine == NULL) {
|
||||||
|
char* funName = fmGetFuncName(pDestCtx[k].functionId);
|
||||||
|
qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
|
||||||
|
taosMemoryFreeClear(funName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
|
int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "irate",
|
.name = "irate",
|
||||||
.type = FUNCTION_TYPE_IRATE,
|
.type = FUNCTION_TYPE_IRATE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||||
.translateFunc = translateIrate,
|
.translateFunc = translateIrate,
|
||||||
.getEnvFunc = getIrateFuncEnv,
|
.getEnvFunc = getIrateFuncEnv,
|
||||||
.initFunc = irateFuncSetup,
|
.initFunc = irateFuncSetup,
|
||||||
|
@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = groupKeyFunction,
|
.processFunc = groupKeyFunction,
|
||||||
.finalizeFunc = groupKeyFinalize,
|
.finalizeFunc = groupKeyFinalize,
|
||||||
|
.combineFunc = groupKeyCombine,
|
||||||
.pPartialFunc = "_group_key",
|
.pPartialFunc = "_group_key",
|
||||||
.pMergeFunc = "_group_key"
|
.pMergeFunc = "_group_key"
|
||||||
},
|
},
|
||||||
|
|
|
@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
|
SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
|
SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
|
||||||
|
// escape rest of data blocks to avoid first entry to be overwritten.
|
||||||
|
if (pDBuf->hasResult) {
|
||||||
|
goto _group_key_over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSBuf->isNull) {
|
||||||
|
pDBuf->isNull = true;
|
||||||
|
pDBuf->hasResult = true;
|
||||||
|
goto _group_key_over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
|
||||||
|
memcpy(pDBuf->data, pSBuf->data,
|
||||||
|
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
|
||||||
|
} else {
|
||||||
|
memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDBuf->hasResult = true;
|
||||||
|
|
||||||
|
_group_key_over:
|
||||||
|
|
||||||
|
SET_VAL(pDResInfo, 1, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
|
|
@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* fmGetFuncName(int32_t funcId) {
|
||||||
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
|
return taosStrdup("invalid function");
|
||||||
|
}
|
||||||
|
return taosStrdup(funcMgtBuiltins[funcId].name);
|
||||||
|
}
|
||||||
|
|
|
@ -272,4 +272,122 @@ if $data12 != 2 then
|
||||||
goto loop3
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===== step3
|
||||||
|
|
||||||
|
sql drop database if exists test4;
|
||||||
|
sql create database test4 vgroups 10;
|
||||||
|
sql use test4;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
|
||||||
|
sql create table aaa using st tags(1,1,1);
|
||||||
|
sql create table bbb using st tags(2,2,2);
|
||||||
|
sql create table ccc using st tags(3,2,2);
|
||||||
|
sql create table ddd using st tags(4,2,2);
|
||||||
|
|
||||||
|
|
||||||
|
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql delete from aaa where ts = 1648791223003 ;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql delete from ccc;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql delete from ddd;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop7:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ===== over
|
||||||
|
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
Loading…
Reference in New Issue