enh(stream): prepare hb to mnode from stream meta.
This commit is contained in:
parent
728112ed89
commit
74834aea11
|
@ -174,6 +174,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "ttimer.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdbInt.h"
|
#include "tdbInt.h"
|
||||||
|
@ -369,6 +370,7 @@ typedef struct SStreamMeta {
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
SHashObj* pTaskBackendUnique;
|
SHashObj* pTaskBackendUnique;
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
|
tmr_h hbTmr;
|
||||||
|
|
||||||
int32_t chkptNotReadyTasks;
|
int32_t chkptNotReadyTasks;
|
||||||
SArray* checkpointSaved;
|
SArray* checkpointSaved;
|
||||||
|
@ -515,6 +517,14 @@ typedef struct {
|
||||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
|
||||||
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
int32_t numOfTasks;
|
||||||
|
} SStreamHbMsg;
|
||||||
|
|
||||||
|
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||||
|
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t upstreamTaskId;
|
int32_t upstreamTaskId;
|
||||||
|
@ -572,7 +582,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
|
|
||||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
|
|
||||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||||
|
@ -637,7 +647,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
|
int32_t streamLoadTasks(SStreamMeta* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -89,7 +89,7 @@ typedef struct {
|
||||||
RET = -1; \
|
RET = -1; \
|
||||||
} \
|
} \
|
||||||
tEncoderClear(&coder); \
|
tEncoderClear(&coder); \
|
||||||
} while (0)
|
} while (0);
|
||||||
|
|
||||||
static void* tEncoderMalloc(SEncoder* pCoder, int32_t size);
|
static void* tEncoderMalloc(SEncoder* pCoder, int32_t size);
|
||||||
static void* tDecoderMalloc(SDecoder* pCoder, int32_t size);
|
static void* tDecoderMalloc(SDecoder* pCoder, int32_t size);
|
||||||
|
|
|
@ -112,17 +112,10 @@ struct STQ {
|
||||||
SStreamMeta* pStreamMeta;
|
SStreamMeta* pStreamMeta;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t inited;
|
|
||||||
tmr_h timer;
|
|
||||||
} STqMgmt;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t size;
|
int32_t size;
|
||||||
} STqOffsetHead;
|
} STqOffsetHead;
|
||||||
|
|
||||||
static STqMgmt tqMgmt = {0};
|
|
||||||
|
|
||||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
void tqDestroyTqHandle(void* data);
|
void tqDestroyTqHandle(void* data);
|
||||||
|
|
|
@ -15,6 +15,12 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t inited;
|
||||||
|
} STqMgmt;
|
||||||
|
|
||||||
|
static STqMgmt tqMgmt = {0};
|
||||||
|
|
||||||
// 0: not init
|
// 0: not init
|
||||||
// 1: already inited
|
// 1: already inited
|
||||||
// 2: wait to be inited or cleaup
|
// 2: wait to be inited or cleaup
|
||||||
|
@ -32,11 +38,6 @@ int32_t tqInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ");
|
|
||||||
if (tqMgmt.timer == NULL) {
|
|
||||||
atomic_store_8(&tqMgmt.inited, 0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (streamInit() < 0) {
|
if (streamInit() < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -54,7 +55,6 @@ void tqCleanUp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (old == 1) {
|
if (old == 1) {
|
||||||
taosTmrCleanUp(tqMgmt.timer);
|
|
||||||
streamCleanUp();
|
streamCleanUp();
|
||||||
atomic_store_8(&tqMgmt.inited, 0);
|
atomic_store_8(&tqMgmt.inited, 0);
|
||||||
}
|
}
|
||||||
|
@ -132,9 +132,7 @@ int32_t tqInitialize(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the version is kept in task's meta data
|
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
|
||||||
// todo check if this version is required or not
|
|
||||||
if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ int32_t streamInit() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM");
|
streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM");
|
||||||
if (streamEnv.timer == NULL) {
|
if (streamEnv.timer == NULL) {
|
||||||
atomic_store_8(&streamEnv.inited, 0);
|
atomic_store_8(&streamEnv.inited, 0);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -32,7 +32,7 @@ typedef struct {
|
||||||
SEpSet epset;
|
SEpSet epset;
|
||||||
} SStreamChkptReadyInfo;
|
} SStreamChkptReadyInfo;
|
||||||
|
|
||||||
static void doRetryDispatchData(void* param, void* tmrId);
|
static void doRetryDispatchData(void* param, void* tmrId);
|
||||||
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
|
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
|
||||||
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
|
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
|
||||||
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
||||||
|
@ -40,10 +40,10 @@ static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* p
|
||||||
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet);
|
SEpSet* pEpSet);
|
||||||
|
|
||||||
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||||
pMsg->msgType = msgType;
|
pMsg->msgType = msgType;
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
pMsg->contLen = contLen;
|
pMsg->contLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
|
|
|
@ -25,6 +25,8 @@ int32_t streamBackendCfWrapperId = 0;
|
||||||
|
|
||||||
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
|
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||||
|
|
||||||
|
static void metaHbToMnode(void* param, void* tmrId);
|
||||||
|
|
||||||
static void streamMetaEnvInit() {
|
static void streamMetaEnvInit() {
|
||||||
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
||||||
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
||||||
|
@ -90,13 +92,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
|
|
||||||
// memset(streamPath, 0, len);
|
// send heartbeat every 20sec.
|
||||||
// sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);
|
||||||
// code = taosMulModeMkDir(streamPath, 0755);
|
|
||||||
// if (code != 0) {
|
|
||||||
// terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
|
|
||||||
pMeta->pTaskBackendUnique =
|
pMeta->pTaskBackendUnique =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -394,35 +391,24 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
// todo add error log
|
// todo add error log
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
||||||
if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
|
if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
|
||||||
qError("failed to commit stream meta");
|
qError("vgId:%d failed to commit stream meta", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
|
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
|
||||||
qError("failed to commit stream meta");
|
qError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
qError("vgId:%d failed to begin trans", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaAbort(SStreamMeta* pMeta) {
|
|
||||||
if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
|
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
int64_t chkpId = 0;
|
int64_t chkpId = 0;
|
||||||
|
|
||||||
|
@ -430,6 +416,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
return chkpId;
|
return chkpId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
int32_t kLen = 0;
|
int32_t kLen = 0;
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
|
@ -448,14 +435,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
chkpId = TMAX(chkpId, info.checkpointId);
|
chkpId = TMAX(chkpId, info.checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
_err:
|
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
|
|
||||||
return chkpId;
|
return chkpId;
|
||||||
}
|
}
|
||||||
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
|
||||||
|
int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -519,3 +506,57 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->numOfTasks) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
SStreamMeta* pMeta = param;
|
||||||
|
SStreamHbMsg hbMsg = {0};
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ((SMsgHead*)buf)->vgId = htonl(nodeId);
|
||||||
|
// void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
//
|
||||||
|
// SEncoder encoder;
|
||||||
|
// tEncoderInit(&encoder, pBuf, tlen);
|
||||||
|
// if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
|
||||||
|
// rpcFreeCont(buf);
|
||||||
|
// qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg msg = {0};
|
||||||
|
// initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
|
||||||
|
// qDebug("vgId:%d, send hb to mnode", nodeId);
|
||||||
|
//
|
||||||
|
// tmsgSendReq(pEpSet, &msg);
|
||||||
|
}
|
Loading…
Reference in New Issue