refactor: do some internal refactor.
This commit is contained in:
parent
cbf994f04f
commit
6693efaa85
|
@ -16,6 +16,8 @@
|
||||||
#ifndef TDENGINE_STREAMMSG_H
|
#ifndef TDENGINE_STREAMMSG_H
|
||||||
#define TDENGINE_STREAMMSG_H
|
#define TDENGINE_STREAMMSG_H
|
||||||
|
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -45,6 +47,9 @@ typedef struct {
|
||||||
int64_t expireTime;
|
int64_t expireTime;
|
||||||
} SStreamCheckpointSourceReq;
|
} SStreamCheckpointSourceReq;
|
||||||
|
|
||||||
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
|
||||||
|
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
@ -55,9 +60,6 @@ typedef struct {
|
||||||
int8_t success;
|
int8_t success;
|
||||||
} SStreamCheckpointSourceRsp;
|
} SStreamCheckpointSourceRsp;
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
|
|
||||||
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
||||||
|
|
||||||
typedef struct SStreamTaskNodeUpdateMsg {
|
typedef struct SStreamTaskNodeUpdateMsg {
|
||||||
|
|
|
@ -720,60 +720,3 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
taosArrayDestroy(pTimeoutList);
|
taosArrayDestroy(pTimeoutList);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -34,92 +34,6 @@ static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
|
||||||
static int32_t deleteCheckpoint(const char* id);
|
static int32_t deleteCheckpoint(const char* id);
|
||||||
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
|
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
|
||||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
|
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
|
||||||
|
|
|
@ -43,67 +43,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||||
pMsg->contLen = contLen;
|
pMsg->contLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
|
|
||||||
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
|
||||||
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
|
||||||
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
|
||||||
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
|
|
||||||
void* data = taosArrayGetP(pReq->data, i);
|
|
||||||
if (tEncodeI32(pEncoder, len) < 0) return -1;
|
|
||||||
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
|
||||||
}
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
|
|
||||||
|
|
||||||
ASSERT(pReq->blockNum > 0);
|
|
||||||
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
|
||||||
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
|
|
||||||
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
|
||||||
int32_t len1;
|
|
||||||
uint64_t len2;
|
|
||||||
void* data;
|
|
||||||
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
|
|
||||||
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
|
|
||||||
ASSERT(len1 == len2);
|
|
||||||
taosArrayPush(pReq->dataLen, &len1);
|
|
||||||
taosArrayPush(pReq->data, &data);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
||||||
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
@ -129,41 +68,6 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
|
|
||||||
taosArrayDestroyP(pReq->data, taosMemoryFree);
|
|
||||||
taosArrayDestroy(pReq->dataLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
|
|
||||||
if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
|
|
||||||
uint64_t len = 0;
|
|
||||||
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
|
|
||||||
pReq->retrieveLen = (int32_t)len;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
|
||||||
|
|
||||||
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
||||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||||
|
@ -1262,45 +1166,3 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
|
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pMsg->pNodeList);
|
|
||||||
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
|
||||||
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
|
|
||||||
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo this new attribute will be result in being incompatible with previous version
|
|
||||||
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
|
|
||||||
|
|
||||||
int32_t size = 0;
|
|
||||||
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
|
||||||
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
|
||||||
SNodeUpdateInfo info = {0};
|
|
||||||
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
|
|
||||||
taosArrayPush(pMsg->pNodeList, &info);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
|
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -944,102 +944,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
taosArrayDestroy(pRecycleList);
|
taosArrayDestroy(pRecycleList);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
|
||||||
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
|
||||||
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
|
|
||||||
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
|
|
||||||
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
|
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
|
||||||
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
|
||||||
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
|
|
||||||
|
|
||||||
for (int j = 0; j < numOfVgs; ++j) {
|
|
||||||
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
|
|
||||||
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
|
|
||||||
|
|
||||||
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
|
|
||||||
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
|
||||||
int32_t taskId = 0;
|
|
||||||
STaskStatusEntry entry = {0};
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
|
|
||||||
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
|
|
||||||
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
|
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
|
||||||
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
|
|
||||||
|
|
||||||
entry.id.taskId = taskId;
|
|
||||||
taosArrayPush(pReq->pTaskStatus, &entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfVgs = 0;
|
|
||||||
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
|
|
||||||
|
|
||||||
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
|
|
||||||
|
|
||||||
for (int j = 0; j < numOfVgs; ++j) {
|
|
||||||
int32_t vgId = 0;
|
|
||||||
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
|
|
||||||
taosArrayPush(pReq->pUpdateNodes, &vgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
|
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
|
||||||
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
|
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
|
||||||
pInfo->tickCounter = 0;
|
pInfo->tickCounter = 0;
|
||||||
|
@ -1048,20 +952,6 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
|
|
||||||
if (pMsg == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->pUpdateNodes != NULL) {
|
|
||||||
taosArrayDestroy(pMsg->pUpdateNodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMsg->pTaskStatus != NULL) {
|
|
||||||
taosArrayDestroy(pMsg->pTaskStatus);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
|
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
|
||||||
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
|
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
|
||||||
for (int k = 0; k < numOfExisted; ++k) {
|
for (int k = 0; k < numOfExisted; ++k) {
|
||||||
|
|
|
@ -137,24 +137,6 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
||||||
return pTask;
|
return pTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
|
|
||||||
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
|
|
||||||
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
|
||||||
|
|
|
@ -0,0 +1,432 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "streammsg.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
|
||||||
|
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
|
||||||
|
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pMsg->pNodeList);
|
||||||
|
if (tEncodeI32(pEncoder, size) < 0) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i);
|
||||||
|
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo this new attribute will be result in being incompatible with previous version
|
||||||
|
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
|
||||||
|
|
||||||
|
int32_t size = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &size) < 0) return -1;
|
||||||
|
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SNodeUpdateInfo info = {0};
|
||||||
|
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
|
||||||
|
taosArrayPush(pMsg->pNodeList, &info);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
|
||||||
|
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
||||||
|
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum);
|
||||||
|
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
||||||
|
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
|
||||||
|
void* data = taosArrayGetP(pReq->data, i);
|
||||||
|
if (tEncodeI32(pEncoder, len) < 0) return -1;
|
||||||
|
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
||||||
|
}
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
|
||||||
|
|
||||||
|
ASSERT(pReq->blockNum > 0);
|
||||||
|
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
|
||||||
|
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
|
||||||
|
for (int32_t i = 0; i < pReq->blockNum; i++) {
|
||||||
|
int32_t len1;
|
||||||
|
uint64_t len2;
|
||||||
|
void* data;
|
||||||
|
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
|
||||||
|
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
|
||||||
|
ASSERT(len1 == len2);
|
||||||
|
taosArrayPush(pReq->dataLen, &len1);
|
||||||
|
taosArrayPush(pReq->data, &data);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
|
||||||
|
taosArrayDestroyP(pReq->data, taosMemoryFree);
|
||||||
|
taosArrayDestroy(pReq->dataLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->srcTaskId) < 0) return -1;
|
||||||
|
if (tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->srcTaskId) < 0) return -1;
|
||||||
|
uint64_t len = 0;
|
||||||
|
if (tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len) < 0) return -1;
|
||||||
|
pReq->retrieveLen = (int32_t)len;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||||
|
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
|
||||||
|
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
|
||||||
|
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
|
||||||
|
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
|
||||||
|
if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1;
|
||||||
|
if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
|
||||||
|
if (tEncodeI32(pEncoder, numOfVgs) < 0) return -1;
|
||||||
|
|
||||||
|
for (int j = 0; j < numOfVgs; ++j) {
|
||||||
|
int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j);
|
||||||
|
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
|
||||||
|
|
||||||
|
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
|
||||||
|
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
|
||||||
|
int32_t taskId = 0;
|
||||||
|
STaskStatusEntry entry = {0};
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
|
||||||
|
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
|
||||||
|
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
|
||||||
|
if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1;
|
||||||
|
if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
|
||||||
|
|
||||||
|
entry.id.taskId = taskId;
|
||||||
|
taosArrayPush(pReq->pTaskStatus, &entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfVgs = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &numOfVgs) < 0) return -1;
|
||||||
|
|
||||||
|
pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t));
|
||||||
|
|
||||||
|
for (int j = 0; j < numOfVgs; ++j) {
|
||||||
|
int32_t vgId = 0;
|
||||||
|
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
|
||||||
|
taosArrayPush(pReq->pUpdateNodes, &vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pUpdateNodes != NULL) {
|
||||||
|
taosArrayDestroy(pMsg->pUpdateNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pTaskStatus != NULL) {
|
||||||
|
taosArrayDestroy(pMsg->pTaskStatus);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue