From a4d86275d2ec7560ec29253786b9a814b4758d83 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 18 Jul 2022 15:06:21 +0800 Subject: [PATCH 1/4] feat(query): add first/last/last_row scalar version --- include/libs/scalar/scalar.h | 1 + source/libs/function/src/builtins.c | 3 +++ source/libs/scalar/src/sclfunc.c | 4 ++++ 3 files changed, 8 insertions(+) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index b667207684..5ab01159cf 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -119,6 +119,7 @@ int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SSca int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b6a351b4a6..c1741fe879 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2231,6 +2231,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastRowFunction, + .sprocessFunc = firstLastScalarFunction, .finalizeFunc = firstLastFinalize }, { @@ -2251,6 +2252,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = firstFunction, + .sprocessFunc = firstLastScalarFunction, .finalizeFunc = firstLastFinalize, .pPartialFunc = "_first_partial", .pMergeFunc = "_first_merge", @@ -2286,6 +2288,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastFunction, + .sprocessFunc = firstLastScalarFunction, .finalizeFunc = firstLastFinalize, .pPartialFunc = "_last_partial", .pMergeFunc = "_last_merge", diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 1cbaf9690e..cca1528800 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2845,3 +2845,7 @@ int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return selectScalarFunction(pInput, inputNum, pOutput); } + +int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return selectScalarFunction(pInput, inputNum, pOutput); +} From 75371098a6145568b3dfecbcb75c02ab8082a92c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 18 Jul 2022 15:14:20 +0800 Subject: [PATCH 2/4] refactor --- include/libs/scalar/scalar.h | 3 +-- source/libs/function/src/builtins.c | 4 ++-- source/libs/scalar/src/sclfunc.c | 6 +----- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 5ab01159cf..de7e35b7f8 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -117,8 +117,7 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c1741fe879..07154ba8dd 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2086,7 +2086,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, .processFunc = topFunction, - .sprocessFunc = topScalarFunction, + .sprocessFunc = topBotScalarFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, .pPartialFunc = "top", @@ -2101,7 +2101,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, .processFunc = bottomFunction, - .sprocessFunc = bottomScalarFunction, + .sprocessFunc = topBotScalarFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, .pPartialFunc = "bottom", diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index cca1528800..a495020220 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2838,11 +2838,7 @@ int32_t selectScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara return TSDB_CODE_SUCCESS; } -int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - return selectScalarFunction(pInput, inputNum, pOutput); -} - -int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { +int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return selectScalarFunction(pInput, inputNum, pOutput); } From 5b12fcb3a125c502a1c11f1062aa5c3316cbda80 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 18 Jul 2022 15:23:28 +0800 Subject: [PATCH 3/4] feat(query): add selective function scalar version --- include/libs/scalar/scalar.h | 4 ++++ source/libs/function/src/builtins.c | 4 ++++ source/libs/scalar/src/sclfunc.c | 16 ++++++++++++++++ 3 files changed, 24 insertions(+) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index de7e35b7f8..770b0442ea 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -119,6 +119,10 @@ int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SSca int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 07154ba8dd..489a11da39 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2469,6 +2469,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, .processFunc = sampleFunction, + .sprocessFunc = sampleScalarFunction, .finalizeFunc = sampleFinalize }, { @@ -2480,6 +2481,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTailFuncEnv, .initFunc = tailFunctionSetup, .processFunc = tailFunction, + .sprocessFunc = tailScalarFunction, .finalizeFunc = NULL }, { @@ -2491,6 +2493,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getUniqueFuncEnv, .initFunc = uniqueFunctionSetup, .processFunc = uniqueFunction, + .sprocessFunc = uniqueScalarFunction, .finalizeFunc = NULL }, { @@ -2501,6 +2504,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getModeFuncEnv, .initFunc = modeFunctionSetup, .processFunc = modeFunction, + .sprocessFunc = modeScalarFunction, .finalizeFunc = modeFinalize, }, { diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a495020220..55debe51a8 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2845,3 +2845,19 @@ int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return selectScalarFunction(pInput, inputNum, pOutput); } + +int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return selectScalarFunction(pInput, inputNum, pOutput); +} + +int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return selectScalarFunction(pInput, inputNum, pOutput); +} + +int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return selectScalarFunction(pInput, inputNum, pOutput); +} + +int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return selectScalarFunction(pInput, inputNum, pOutput); +} From 053786e6b44368e71988ac5830d0522690781bb5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Jul 2022 15:50:36 +0800 Subject: [PATCH 4/4] fix(stream): data exec --- source/dnode/mnode/impl/src/mndSubscribe.c | 17 ++- source/libs/stream/src/streamExec.c | 126 +++++++++++---------- source/libs/wal/src/walRead.c | 2 + 3 files changed, 80 insertions(+), 65 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index d67e4e8783..05e197150e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + pConsumerEp->consumerId); } imbCnt++; } @@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + pConsumerEp->consumerId); } } } @@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); } } @@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ASSERT(pConsumerEp->consumerId > 0); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; + if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { + mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); + continue; + } taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + pConsumerEp->consumerId); } } else { // if all consumer is removed, put all vg into unassigned diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f6eb9e32f2..b59a812678 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { } static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { - int32_t cnt = 0; - void* data = NULL; while (1) { - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - qDebug("stream exec over, queue empty"); - break; - } - if (data == NULL) { - data = qItem; - if (qItem->type == STREAM_INPUT__DATA_BLOCK) { - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ - } - streamQueueProcessSuccess(pTask->inputQueue); - } else { - if (streamAppendQueueItem(data, qItem) < 0) { - streamQueueProcessFail(pTask->inputQueue); + int32_t cnt = 0; + void* data = NULL; + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + qDebug("stream exec over, queue empty"); break; - } else { - cnt++; - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + } + if (data == NULL) { + data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); - taosFreeQitem(qItem); + if (qItem->type == STREAM_INPUT__DATA_BLOCK) { + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + } else { + break; + } + } else { + if (streamAppendQueueItem(data, qItem) < 0) { + streamQueueProcessFail(pTask->inputQueue); + break; + } else { + cnt++; + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + streamQueueProcessSuccess(pTask->inputQueue); + taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); + taosFreeQitem(qItem); + } } } - } - if (pTask->taskStatus == TASK_STATUS__DROPPING) { - if (data) streamFreeQitem(data); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return NULL; - } - - if (data == NULL) return pRes; - - if (pTask->execType == TASK_EXEC__NONE) { - ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, data); - return pRes; - } - - qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); - streamTaskExecImpl(pTask, data, pRes); - qDebug("stream task %d exec end", pTask->taskId); - - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); - return NULL; - } - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - if (streamTaskOutput(pTask, qRes) < 0) { - /*streamQueueProcessFail(pTask->inputQueue);*/ + if (pTask->taskStatus == TASK_STATUS__DROPPING) { + if (data) streamFreeQitem(data); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); return NULL; } - if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pSubmit->ver; - } - /*streamQueueProcessSuccess(pTask->inputQueue);*/ - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - } - streamFreeQitem(data); + if (data == NULL) break; + + if (pTask->execType == TASK_EXEC__NONE) { + ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, data); + return pRes; + } + + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); + streamTaskExecImpl(pTask, data, pRes); + qDebug("stream task %d exec end", pTask->taskId); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + streamQueueProcessFail(pTask->inputQueue); + taosArrayDestroy(pRes); + return NULL; + } + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + if (streamTaskOutput(pTask, qRes) < 0) { + /*streamQueueProcessFail(pTask->inputQueue);*/ + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return NULL; + } + if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + qRes->childId = pTask->selfChildId; + qRes->sourceVer = pSubmit->ver; + } + /*streamQueueProcessSuccess(pTask->inputQueue);*/ + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + } + + streamFreeQitem(data); + } return pRes; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 908523f2a6..c47964803a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { pRead->pHead->head.version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); return -1; } @@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(0); return -1; } pRead->curVersion++;