diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index e6e78bd1df..d2e01f2186 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -174,6 +174,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2c46b2aa7b..e5f7093aef 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -14,6 +14,7 @@ */ #include "os.h" +#include "ttimer.h" #include "streamState.h" #include "tdatablock.h" #include "tdbInt.h" @@ -369,6 +370,7 @@ typedef struct SStreamMeta { int64_t streamBackendRid; SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; + tmr_h hbTmr; int32_t chkptNotReadyTasks; SArray* checkpointSaved; @@ -515,6 +517,14 @@ typedef struct { int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); +typedef struct { + int32_t vgId; + int32_t numOfTasks; +} SStreamHbMsg; + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); + typedef struct { int64_t streamId; int32_t upstreamTaskId; @@ -572,7 +582,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); @@ -637,7 +647,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); +int32_t streamLoadTasks(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/include/util/tencode.h b/include/util/tencode.h index ff97a20507..d05d4914e3 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -89,7 +89,7 @@ typedef struct { RET = -1; \ } \ tEncoderClear(&coder); \ - } while (0) + } while (0); static void* tEncoderMalloc(SEncoder* pCoder, int32_t size); static void* tDecoderMalloc(SDecoder* pCoder, int32_t size); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 18ca3fb8b8..94993f99af 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -112,17 +112,10 @@ struct STQ { SStreamMeta* pStreamMeta; }; -typedef struct { - int8_t inited; - tmr_h timer; -} STqMgmt; - typedef struct { int32_t size; } STqOffsetHead; -static STqMgmt tqMgmt = {0}; - int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d4a897f99c..d9355d26da 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,12 @@ #include "tq.h" +typedef struct { + int8_t inited; +} STqMgmt; + +static STqMgmt tqMgmt = {0}; + // 0: not init // 1: already inited // 2: wait to be inited or cleaup @@ -32,11 +38,6 @@ int32_t tqInit() { } if (old == 0) { - tqMgmt.timer = taosTmrInit(10000, 100, 10000, "TQ"); - if (tqMgmt.timer == NULL) { - atomic_store_8(&tqMgmt.inited, 0); - return -1; - } if (streamInit() < 0) { return -1; } @@ -54,7 +55,6 @@ void tqCleanUp() { } if (old == 1) { - taosTmrCleanUp(tqMgmt.timer); streamCleanUp(); atomic_store_8(&tqMgmt.inited, 0); } @@ -132,9 +132,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - // the version is kept in task's meta data - // todo check if this version is required or not - if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) { + if (streamLoadTasks(pTq->pStreamMeta) < 0) { return -1; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 43c085a4fb..d6474b8fde 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -31,7 +31,7 @@ int32_t streamInit() { } if (old == 0) { - streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM"); + streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM"); if (streamEnv.timer == NULL) { atomic_store_8(&streamEnv.inited, 0); return -1; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c5ae27029b..0792ed208b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -32,7 +32,7 @@ typedef struct { SEpSet epset; } SStreamChkptReadyInfo; -static void doRetryDispatchData(void* param, void* tmrId); +static void doRetryDispatchData(void* param, void* tmrId); static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, @@ -40,10 +40,10 @@ static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* p static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); -static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { - pMsg->msgType = msgType; - pMsg->pCont = pCont; - pMsg->contLen = contLen; +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { + pMsg->msgType = msgType; + pMsg->pCont = pCont; + pMsg->contLen = contLen; } int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4711f4af19..67b81bc291 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -25,6 +25,8 @@ int32_t streamBackendCfWrapperId = 0; int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); +static void metaHbToMnode(void* param, void* tmrId); + static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); @@ -90,13 +92,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - // memset(streamPath, 0, len); - // sprintf(streamPath, "%s/%s", pMeta->path, "state"); - // code = taosMulModeMkDir(streamPath, 0755); - // if (code != 0) { - // terrno = TAOS_SYSTEM_ERROR(code); - // goto _err; - // } + // send heartbeat every 20sec. +// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -394,35 +391,24 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { // todo add error log int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { - qError("failed to commit stream meta"); + qError("vgId:%d failed to commit stream meta", pMeta->vgId); return -1; } if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { - qError("failed to commit stream meta"); + qError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); return -1; } if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + qError("vgId:%d failed to begin trans", pMeta->vgId); return -1; } return 0; } -int32_t streamMetaAbort(SStreamMeta* pMeta) { - if (tdbAbort(pMeta->db, pMeta->txn) < 0) { - return -1; - } - - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - return -1; - } - return 0; -} - int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { int64_t chkpId = 0; @@ -430,6 +416,7 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return chkpId; } + void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; @@ -448,14 +435,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { chkpId = TMAX(chkpId, info.checkpointId); } -_err: tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return chkpId; } -int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { + +int32_t streamLoadTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return -1; @@ -519,3 +506,57 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return 0; } + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->numOfTasks) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +void metaHbToMnode(void* param, void* tmrId) { + SStreamMeta* pMeta = param; + SStreamHbMsg hbMsg = {0}; + + int32_t code = 0; + int32_t tlen = 0; + + tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); + if (code < 0) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return; + } + + void* buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return; + } + +// ((SMsgHead*)buf)->vgId = htonl(nodeId); +// void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); +// +// SEncoder encoder; +// tEncoderInit(&encoder, pBuf, tlen); +// if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { +// rpcFreeCont(buf); +// qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); +// return; +// } +// tEncoderClear(&encoder); +// +// SRpcMsg msg = {0}; +// initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); +// qDebug("vgId:%d, send hb to mnode", nodeId); +// +// tmsgSendReq(pEpSet, &msg); +} \ No newline at end of file