Merge branch '3.0' into enh/TS-5297-3.0
This commit is contained in:
commit
dc8ea6dc1c
|
@ -4,8 +4,6 @@ title: 数据压缩
|
||||||
toc_max_heading_level: 4
|
toc_max_heading_level: 4
|
||||||
---
|
---
|
||||||
|
|
||||||
## 概述
|
|
||||||
|
|
||||||
数据压缩是一种在不损失数据有效信息的前提下,利用特定算法对数据进行重新组织和处理,以减少数据占用的存储空间和提高数据传输效率的技术。TDengine 在数据的存储和传输过程中均采用了这一技术,旨在优化存储资源的使用并加快数据交换的速度。
|
数据压缩是一种在不损失数据有效信息的前提下,利用特定算法对数据进行重新组织和处理,以减少数据占用的存储空间和提高数据传输效率的技术。TDengine 在数据的存储和传输过程中均采用了这一技术,旨在优化存储资源的使用并加快数据交换的速度。
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,4 +56,4 @@ TDengine 在数据传输过程中提供了压缩功能,以减少网络带宽
|
||||||
|
|
||||||
下图展示了 TDengine 引擎在时序数据的整个传输及存储过程中的压缩及解压过程,以更好地理解整个处理过程。
|
下图展示了 TDengine 引擎在时序数据的整个传输及存储过程中的压缩及解压过程,以更好地理解整个处理过程。
|
||||||
|
|
||||||

|

|
||||||
|
|
|
@ -785,7 +785,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts);
|
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
||||||
|
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts);
|
||||||
|
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||||
|
|
|
@ -775,7 +775,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
|
code = TSDB_CODE_MND_INVALID_PLATFORM;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -138,6 +138,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMeta->startInfo.startAllTasks) {
|
||||||
|
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
pMeta->scanInfo.scanCounter += 1;
|
pMeta->scanInfo.scanCounter += 1;
|
||||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||||
|
|
|
@ -1191,14 +1191,13 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
SRestoreCheckpointInfo req = {0};
|
SRestoreCheckpointInfo req = {0};
|
||||||
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
@ -1211,7 +1210,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||||
if (pTask == NULL || (code != 0)) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError(
|
tqError(
|
||||||
|
@ -1238,9 +1236,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
||||||
|
|
||||||
if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) {
|
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
|
||||||
|
if (pConsenInfo->consenChkptTransId >= req.transId) {
|
||||||
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
||||||
pTask->status.consenChkptInfo.consenChkptTransId, req.transId);
|
pConsenInfo->consenChkptTransId, req.transId);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1256,9 +1255,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.consenChkptInfo.consenChkptTransId = req.transId;
|
streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
|
||||||
pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV;
|
|
||||||
pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs();
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
|
@ -193,8 +193,9 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
||||||
pAggInfo->pNewGroupBlock = NULL;
|
pAggInfo->pNewGroupBlock = NULL;
|
||||||
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
|
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
|
||||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
|
||||||
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
code = doAggregateImpl(pOperator, pSup->pCtx);
|
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,8 @@
|
||||||
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
|
#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint"
|
||||||
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
|
#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint"
|
||||||
|
|
||||||
|
#define MAX_STREAM_HISTORY_RESULT 100000000
|
||||||
|
|
||||||
typedef struct SStateWindowInfo {
|
typedef struct SStateWindowInfo {
|
||||||
SResultWindowInfo winInfo;
|
SResultWindowInfo winInfo;
|
||||||
SStateKeys* pStateKey;
|
SStateKeys* pStateKey;
|
||||||
|
@ -161,11 +163,19 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
|
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
|
||||||
|
if (tSimpleHashGetSize(pStUpdated) > MAX_STREAM_HISTORY_RESULT) {
|
||||||
|
qError("%s failed at line %d since too many history result. ", __func__, __LINE__);
|
||||||
|
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
|
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
|
||||||
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
|
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
|
||||||
|
if (tSimpleHashGetSize(pUpdatedMap) > MAX_STREAM_HISTORY_RESULT) {
|
||||||
|
qError("%s failed at line %d since too many history result. ", __func__, __LINE__);
|
||||||
|
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
|
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -481,6 +491,12 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
blockDataDestroy(pInfo->pMidRetriveRes);
|
blockDataDestroy(pInfo->pMidRetriveRes);
|
||||||
blockDataDestroy(pInfo->pMidPulloverRes);
|
blockDataDestroy(pInfo->pMidPulloverRes);
|
||||||
|
if (pInfo->pUpdatedMap != NULL) {
|
||||||
|
tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
|
||||||
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
|
pInfo->pUpdatedMap = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
|
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
|
||||||
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
|
||||||
}
|
}
|
||||||
|
@ -495,11 +511,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
||||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
cleanupExprSupp(&pInfo->scalarSupp);
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
if (pInfo->pUpdatedMap != NULL) {
|
|
||||||
tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
|
|
||||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
|
||||||
pInfo->pUpdatedMap = NULL;
|
|
||||||
}
|
|
||||||
tSimpleHashCleanup(pInfo->pDeletedMap);
|
tSimpleHashCleanup(pInfo->pDeletedMap);
|
||||||
|
|
||||||
blockDataDestroy(pInfo->pCheckpointRes);
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
@ -994,7 +1005,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
||||||
|
|
||||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
||||||
|
|
||||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||||
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
|
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1166,6 +1177,7 @@ _end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
|
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
|
||||||
|
@ -1718,7 +1730,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
||||||
code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
|
code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
|
||||||
|
if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
break;
|
||||||
|
}
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
|
||||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||||
|
@ -5184,7 +5201,12 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
|
code = doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap, pInfo->pDeletedMap);
|
||||||
|
if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
break;
|
||||||
|
}
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,7 +131,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
|
||||||
SListCell *cell = nodeList->pNodeList->pHead;
|
SListCell *cell = nodeList->pNodeList->pHead;
|
||||||
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
||||||
if (out.columnData == NULL) {
|
if (out.columnData == NULL) {
|
||||||
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCL_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void *buf = NULL;
|
void *buf = NULL;
|
||||||
|
@ -879,7 +879,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
SScalarParam *params = NULL;
|
SScalarParam *params = NULL;
|
||||||
int32_t rowNum = 0;
|
int32_t rowNum = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t paramNum = 0;
|
int32_t paramNum = scalarGetOperatorParamNum(node->opType);
|
||||||
|
|
||||||
// json not support in in operator
|
// json not support in in operator
|
||||||
if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
|
if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
|
||||||
|
@ -890,7 +890,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCL_ERR_RET(sclInitOperatorParams(¶ms, node, ctx, &rowNum));
|
SCL_ERR_JRET(sclInitOperatorParams(¶ms, node, ctx, &rowNum));
|
||||||
if (output->columnData == NULL) {
|
if (output->columnData == NULL) {
|
||||||
code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
|
code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -900,7 +900,6 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
|
|
||||||
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);
|
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);
|
||||||
|
|
||||||
paramNum = scalarGetOperatorParamNum(node->opType);
|
|
||||||
SScalarParam *pLeft = ¶ms[0];
|
SScalarParam *pLeft = ¶ms[0];
|
||||||
SScalarParam *pRight = paramNum > 1 ? ¶ms[1] : NULL;
|
SScalarParam *pRight = paramNum > 1 ? ¶ms[1] : NULL;
|
||||||
|
|
||||||
|
|
|
@ -615,7 +615,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
|
||||||
if (restored) {
|
if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1371,29 +1371,23 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
||||||
// if (pInfo->alreadySendChkptId == true) {
|
// if (pInfo->alreadySendChkptId == true) {
|
||||||
// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
// stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
|
||||||
// streamMutexUnlock(&pTask->lock);
|
// streamMutexUnlock(&pTask->lock);
|
||||||
// return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
// } else {
|
// } else {
|
||||||
// pInfo->alreadySendChkptId = true;
|
// pInfo->alreadySendChkptId = true;
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
|
streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pTask->pBackend != NULL) {
|
if (pTask->pBackend != NULL) {
|
||||||
streamFreeTaskState(pTask, p);
|
streamFreeTaskState(pTask, p);
|
||||||
pTask->pBackend = NULL;
|
pTask->pBackend = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->status = TASK_CONSEN_CHKPT_REQ;
|
|
||||||
pInfo->statusTs = taosGetTimestampMs();
|
|
||||||
stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ int32_t streamMetaId = 0;
|
||||||
|
|
||||||
struct SMetaHbInfo {
|
struct SMetaHbInfo {
|
||||||
tmr_h hbTmr;
|
tmr_h hbTmr;
|
||||||
int32_t stopFlag;
|
|
||||||
int32_t tickCounter;
|
int32_t tickCounter;
|
||||||
int32_t hbCount;
|
int32_t hbCount;
|
||||||
int64_t hbStart;
|
int64_t hbStart;
|
||||||
|
@ -197,10 +196,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts);
|
streamMutexLock(&(*pTask)->lock);
|
||||||
|
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts);
|
||||||
if (entry.checkpointInfo.consensusChkptId) {
|
if (entry.checkpointInfo.consensusChkptId) {
|
||||||
entry.checkpointInfo.consensusTs = pMsg->ts;
|
entry.checkpointInfo.consensusTs = pMsg->ts;
|
||||||
}
|
}
|
||||||
|
streamMutexUnlock(&(*pTask)->lock);
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
|
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
|
||||||
|
@ -240,6 +241,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
void streamMetaHbToMnode(void* param, void* tmrId) {
|
void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
int64_t rid = *(int64_t*)param;
|
int64_t rid = *(int64_t*)param;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t vgId = 0;
|
||||||
|
int32_t role = 0;
|
||||||
|
|
||||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
|
@ -247,29 +250,41 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vgId = pMeta->vgId;
|
||||||
|
role = pMeta->role;
|
||||||
|
|
||||||
// need to stop, stop now
|
// need to stop, stop now
|
||||||
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
|
if (pMeta->closeFlag) {
|
||||||
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
|
pMeta->pHbInfo->hbStart = 0;
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
|
stDebug("vgId:%d jump out of meta timer", vgId);
|
||||||
} else {
|
} else {
|
||||||
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// not leader not send msg
|
// not leader not send msg
|
||||||
if (pMeta->role != NODE_ROLE_LEADER) {
|
if (pMeta->role != NODE_ROLE_LEADER) {
|
||||||
|
pMeta->pHbInfo->hbStart = 0;
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
|
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
|
||||||
} else {
|
} else {
|
||||||
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, pMeta->vgId,
|
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
|
||||||
pMeta->role, rid);
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pMeta->pHbInfo->hbStart = 0;
|
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
||||||
|
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
|
||||||
|
"meta-hb-tmr");
|
||||||
|
|
||||||
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
|
if (code) {
|
||||||
|
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,17 +293,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
|
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
|
||||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
|
||||||
"meta-hb-tmr");
|
|
||||||
|
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
|
||||||
if (code) {
|
|
||||||
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
streamMetaRLock(pMeta);
|
||||||
code = streamMetaSendHbHelper(pMeta);
|
code = streamMetaSendHbHelper(pMeta);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -298,10 +302,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||||
"meta-hb-tmr");
|
"meta-hb-tmr");
|
||||||
code = taosReleaseRef(streamMetaId, rid);
|
|
||||||
|
|
||||||
|
code = taosReleaseRef(streamMetaId, rid);
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
|
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,7 +318,6 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
|
||||||
|
|
||||||
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
|
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
|
||||||
pInfo->tickCounter = 0;
|
pInfo->tickCounter = 0;
|
||||||
pInfo->stopFlag = 0;
|
|
||||||
pInfo->msgSendTs = -1;
|
pInfo->msgSendTs = -1;
|
||||||
pInfo->hbCount = 0;
|
pInfo->hbCount = 0;
|
||||||
|
|
||||||
|
@ -338,11 +341,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
|
||||||
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
|
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
|
taosMsleep(2 * META_HB_CHECK_INTERVAL);
|
||||||
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
|
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
||||||
taosMsleep(100);
|
|
||||||
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -444,7 +444,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
|
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
|
||||||
|
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
@ -455,11 +455,13 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
vgId, pConChkptInfo->statusTs);
|
vgId, pConChkptInfo->statusTs);
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) {
|
int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
|
||||||
|
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
|
||||||
pConChkptInfo->statusTs = ts;
|
pConChkptInfo->statusTs = ts;
|
||||||
|
|
||||||
stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64,
|
stWarn(
|
||||||
pTask->id.idStr, vgId, pConChkptInfo->statusTs);
|
"s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
|
||||||
|
pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -467,4 +469,22 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
|
||||||
|
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
||||||
|
pInfo->consenChkptTransId = transId;
|
||||||
|
pInfo->status = TASK_CONSEN_CHKPT_RECV;
|
||||||
|
pInfo->statusTs = ts;
|
||||||
|
|
||||||
|
stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
|
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
||||||
|
int32_t prevTrans = pInfo->consenChkptTransId;
|
||||||
|
|
||||||
|
pInfo->status = TASK_CONSEN_CHKPT_REQ;
|
||||||
|
pInfo->statusTs = ts;
|
||||||
|
pInfo->consenChkptTransId = 0;
|
||||||
|
|
||||||
|
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue