Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
d80a35a41f
|
@ -117,8 +117,12 @@ int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId);
|
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
imbCnt++;
|
imbCnt++;
|
||||||
}
|
}
|
||||||
|
@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId);
|
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
ASSERT(pConsumerEp->consumerId > 0);
|
ASSERT(pConsumerEp->consumerId > 0);
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
|
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||||
|
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||||
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if all consumer is removed, put all vg into unassigned
|
// if all consumer is removed, put all vg into unassigned
|
||||||
|
|
|
@ -2086,7 +2086,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = topBotFunctionSetup,
|
.initFunc = topBotFunctionSetup,
|
||||||
.processFunc = topFunction,
|
.processFunc = topFunction,
|
||||||
.sprocessFunc = topScalarFunction,
|
.sprocessFunc = topBotScalarFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
.combineFunc = topCombine,
|
.combineFunc = topCombine,
|
||||||
.pPartialFunc = "top",
|
.pPartialFunc = "top",
|
||||||
|
@ -2101,7 +2101,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = topBotFunctionSetup,
|
.initFunc = topBotFunctionSetup,
|
||||||
.processFunc = bottomFunction,
|
.processFunc = bottomFunction,
|
||||||
.sprocessFunc = bottomScalarFunction,
|
.sprocessFunc = topBotScalarFunction,
|
||||||
.finalizeFunc = topBotFinalize,
|
.finalizeFunc = topBotFinalize,
|
||||||
.combineFunc = bottomCombine,
|
.combineFunc = bottomCombine,
|
||||||
.pPartialFunc = "bottom",
|
.pPartialFunc = "bottom",
|
||||||
|
@ -2231,6 +2231,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = lastRowFunction,
|
.processFunc = lastRowFunction,
|
||||||
|
.sprocessFunc = firstLastScalarFunction,
|
||||||
.finalizeFunc = firstLastFinalize
|
.finalizeFunc = firstLastFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2251,6 +2252,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = firstFunction,
|
.processFunc = firstFunction,
|
||||||
|
.sprocessFunc = firstLastScalarFunction,
|
||||||
.finalizeFunc = firstLastFinalize,
|
.finalizeFunc = firstLastFinalize,
|
||||||
.pPartialFunc = "_first_partial",
|
.pPartialFunc = "_first_partial",
|
||||||
.pMergeFunc = "_first_merge",
|
.pMergeFunc = "_first_merge",
|
||||||
|
@ -2286,6 +2288,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = lastFunction,
|
.processFunc = lastFunction,
|
||||||
|
.sprocessFunc = firstLastScalarFunction,
|
||||||
.finalizeFunc = firstLastFinalize,
|
.finalizeFunc = firstLastFinalize,
|
||||||
.pPartialFunc = "_last_partial",
|
.pPartialFunc = "_last_partial",
|
||||||
.pMergeFunc = "_last_merge",
|
.pMergeFunc = "_last_merge",
|
||||||
|
@ -2466,6 +2469,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getSampleFuncEnv,
|
.getEnvFunc = getSampleFuncEnv,
|
||||||
.initFunc = sampleFunctionSetup,
|
.initFunc = sampleFunctionSetup,
|
||||||
.processFunc = sampleFunction,
|
.processFunc = sampleFunction,
|
||||||
|
.sprocessFunc = sampleScalarFunction,
|
||||||
.finalizeFunc = sampleFinalize
|
.finalizeFunc = sampleFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2477,6 +2481,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getTailFuncEnv,
|
.getEnvFunc = getTailFuncEnv,
|
||||||
.initFunc = tailFunctionSetup,
|
.initFunc = tailFunctionSetup,
|
||||||
.processFunc = tailFunction,
|
.processFunc = tailFunction,
|
||||||
|
.sprocessFunc = tailScalarFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2488,6 +2493,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getUniqueFuncEnv,
|
.getEnvFunc = getUniqueFuncEnv,
|
||||||
.initFunc = uniqueFunctionSetup,
|
.initFunc = uniqueFunctionSetup,
|
||||||
.processFunc = uniqueFunction,
|
.processFunc = uniqueFunction,
|
||||||
|
.sprocessFunc = uniqueScalarFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2498,6 +2504,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getModeFuncEnv,
|
.getEnvFunc = getModeFuncEnv,
|
||||||
.initFunc = modeFunctionSetup,
|
.initFunc = modeFunctionSetup,
|
||||||
.processFunc = modeFunction,
|
.processFunc = modeFunction,
|
||||||
|
.sprocessFunc = modeScalarFunction,
|
||||||
.finalizeFunc = modeFinalize,
|
.finalizeFunc = modeFinalize,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -2838,10 +2838,26 @@ int32_t selectScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bottomScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
int32_t cnt = 0;
|
|
||||||
void* data = NULL;
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
int32_t cnt = 0;
|
||||||
if (qItem == NULL) {
|
void* data = NULL;
|
||||||
qDebug("stream exec over, queue empty");
|
while (1) {
|
||||||
break;
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
}
|
if (qItem == NULL) {
|
||||||
if (data == NULL) {
|
qDebug("stream exec over, queue empty");
|
||||||
data = qItem;
|
|
||||||
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
|
|
||||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
|
||||||
}
|
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
|
||||||
} else {
|
|
||||||
if (streamAppendQueueItem(data, qItem) < 0) {
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
|
||||||
break;
|
break;
|
||||||
} else {
|
}
|
||||||
cnt++;
|
if (data == NULL) {
|
||||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
data = qItem;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
taosFreeQitem(qItem);
|
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||||
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
cnt++;
|
||||||
|
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||||
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||||
|
taosFreeQitem(qItem);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
if (data) streamFreeQitem(data);
|
||||||
if (data) streamFreeQitem(data);
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data == NULL) return pRes;
|
|
||||||
|
|
||||||
if (pTask->execType == TASK_EXEC__NONE) {
|
|
||||||
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
|
||||||
streamTaskOutput(pTask, data);
|
|
||||||
return pRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
|
||||||
qDebug("stream task %d exec end", pTask->taskId);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
if (qRes == NULL) {
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
|
||||||
taosArrayDestroy(pRes);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
qRes->blocks = pRes;
|
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
|
||||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
|
|
||||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
|
||||||
qRes->childId = pTask->selfChildId;
|
|
||||||
qRes->sourceVer = pSubmit->ver;
|
|
||||||
}
|
|
||||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
}
|
|
||||||
|
|
||||||
streamFreeQitem(data);
|
if (data == NULL) break;
|
||||||
|
|
||||||
|
if (pTask->execType == TASK_EXEC__NONE) {
|
||||||
|
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
|
streamTaskOutput(pTask, data);
|
||||||
|
return pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
||||||
|
streamTaskExecImpl(pTask, data, pRes);
|
||||||
|
qDebug("stream task %d exec end", pTask->taskId);
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pRes) != 0) {
|
||||||
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
|
if (qRes == NULL) {
|
||||||
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
qRes->blocks = pRes;
|
||||||
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
|
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(qRes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||||
|
qRes->childId = pTask->selfChildId;
|
||||||
|
qRes->sourceVer = pSubmit->ver;
|
||||||
|
}
|
||||||
|
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||||
|
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
}
|
||||||
|
|
||||||
|
streamFreeQitem(data);
|
||||||
|
}
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
pRead->pHead->head.version, ver);
|
pRead->pHead->head.version, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pRead->curVersion++;
|
pRead->curVersion++;
|
||||||
|
|
Loading…
Reference in New Issue