Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-27 00:18:53 +08:00
commit 88a59eee37
3 changed files with 142 additions and 66 deletions

View File

@ -277,8 +277,8 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState; bool transferState;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
@ -289,7 +289,7 @@ typedef struct SHistDataRange {
typedef struct SSTaskBasicInfo { typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id int32_t nodeId; // vgroup id or snode id
SEpSet epSet; SEpSet epSet;
SEpSet mnodeEpset; // mnode epset for send heartbeat SEpSet mnodeEpset; // mnode epset for send heartbeat
int32_t selfChildId; int32_t selfChildId;
int32_t totalLevel; int32_t totalLevel;
int8_t taskLevel; int8_t taskLevel;
@ -397,6 +397,7 @@ typedef struct SStreamMeta {
SMetaHbInfo hbInfo; SMetaHbInfo hbInfo;
int32_t closedTask; int32_t closedTask;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t rid;
int64_t chkpId; int64_t chkpId;
SArray* chkpSaved; SArray* chkpSaved;
@ -558,7 +559,7 @@ typedef struct STaskStatusEntry {
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {
int32_t vgId; int32_t vgId;
int32_t numOfTasks; int32_t numOfTasks;
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry> SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
} SStreamHbMsg; } SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
@ -629,8 +630,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
@ -727,8 +728,8 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
SStreamTask* pTask, int8_t isSucceed); int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed); int8_t isSucceed);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -16,9 +16,9 @@
#include "streamInt.h" #include "streamInt.h"
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
SStreamGlobalEnv streamEnv; SStreamGlobalEnv streamEnv;
int32_t streamInit() { int32_t streamInit() {
@ -66,8 +66,8 @@ static void streamSchedByTimer(void* param, void* tmrId) {
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam); qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask);
qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr);
streamMetaReleaseTask(NULL, pTask);
return; return;
} }
@ -141,7 +141,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
} }
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (*pBuf == NULL) { if (*pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -240,46 +240,47 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return 0; return 0;
} }
//static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { // static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
// int8_t status = 0; // int8_t status = 0;
// //
// SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); // SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
// if (pBlock == NULL) { // if (pBlock == NULL) {
// streamTaskInputFail(pTask); // streamTaskInputFail(pTask);
// status = TASK_INPUT_STATUS__FAILED; // status = TASK_INPUT_STATUS__FAILED;
// qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, // qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
// pTask->id.idStr); // pTask->id.idStr);
// } else { // } else {
// if (pBlock->type == STREAM_INPUT__TRANS_STATE) { // if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
// pTask->status.appendTranstateBlock = true; // pTask->status.appendTranstateBlock = true;
// } // }
// //
// int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// // input queue is full, upstream is blocked now // // input queue is full, upstream is blocked now
// status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; // status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
// } // }
// //
// return status; // return status;
//} // }
//static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { // static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void**
// *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); // pBuf) {
// if (*pBuf == NULL) { // *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
// return TSDB_CODE_OUT_OF_MEMORY; // if (*pBuf == NULL) {
// } // return TSDB_CODE_OUT_OF_MEMORY;
// }
// //
// ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); // ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
// SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); // SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
// //
// pDispatchRsp->inputStatus = status; // pDispatchRsp->inputStatus = status;
// pDispatchRsp->streamId = htobe64(pReq->streamId); // pDispatchRsp->streamId = htobe64(pReq->streamId);
// pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); // pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
// pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); // pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
// pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); // pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
// pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); // pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
// //
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
//} // }
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
@ -300,7 +301,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
pReq->upstreamTaskId); pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__BLOCKED;
} else { } else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked // Current task has received the checkpoint req from the upstream task, from which the message should all be
// blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
@ -380,7 +382,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
msgLen, ver, total, size + SIZE_IN_MB(msgLen)); msgLen, ver, total, size + SIZE_IN_MB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) { if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */ (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem); destroyStreamDataBlock((SStreamDataBlock*)pItem);
@ -393,10 +395,11 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
destroyStreamDataBlock((SStreamDataBlock*)pItem); destroyStreamDataBlock((SStreamDataBlock*)pItem);
return code; return code;
} }
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
@ -441,7 +444,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
return; return;
} }
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
pInfo->dataAllowed = true; pInfo->dataAllowed = true;
} }
@ -456,7 +459,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
if (pInfo->taskId == taskId) { if (pInfo->taskId == taskId) {
return pInfo; return pInfo;

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tmisce.h"
#include "executor.h" #include "executor.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
#include "tmisce.h"
#include "tref.h" #include "tref.h"
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
@ -25,22 +25,67 @@
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); int32_t streamBackendId = 0;
static void metaHbToMnode(void* param, void* tmrId); int32_t streamBackendCfWrapperId = 0;
static void streamMetaClear(SStreamMeta* pMeta); int32_t streamMetaId = 0;
typedef struct {
TdThreadMutex mutex;
SHashObj* pTable;
} SGStreamMetaMgt;
SGStreamMetaMgt gStreamMetaMgt;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
void streamMetaCloseImpl(void* arg);
static void streamMetaEnvInit() { static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
taosThreadMutexInit(&(gStreamMetaMgt.mutex), NULL);
gStreamMetaMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
} }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { void streamMetaCleanup() {
taosCloseRef(streamBackendId); taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId);
taosThreadMutexDestroy(&gStreamMetaMgt.mutex);
void* pIter = taosHashIterate(gStreamMetaMgt.pTable, NULL);
while (pIter) {
SArray* list = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(list); i++) {
void* rid = taosArrayGetP(list, i);
taosMemoryFree(rid);
}
taosArrayDestroy(list);
pIter = taosHashIterate(gStreamMetaMgt.pTable, pIter);
}
taosHashCleanup(gStreamMetaMgt.pTable);
}
int32_t streamMetaAddRidToGlobalMgt(int64_t vgId, int64_t* rid) {
taosThreadMutexLock(&gStreamMetaMgt.mutex);
void* p = taosHashGet(gStreamMetaMgt.pTable, &vgId, sizeof(vgId));
if (p == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*));
taosArrayPush(list, &rid);
taosHashPut(gStreamMetaMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
} else {
SArray* list = *(SArray**)p;
taosArrayPush(list, &rid);
}
taosThreadMutexUnlock(&gStreamMetaMgt.mutex);
return 0;
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
@ -92,7 +137,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->stage = stage; pMeta->stage = stage;
// send heartbeat every 5sec. // send heartbeat every 5sec.
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer); pMeta->rid = taosAddRef(streamMetaId, pMeta);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
*pRid = pMeta->rid;
streamMetaAddRidToGlobalMgt(pMeta->vgId, pRid);
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0; pMeta->hbInfo.stopFlag = 0;
@ -116,9 +167,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
} }
// if (pMeta->streamBackend == NULL) {
// goto _err;
// }
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
code = streamBackendLoadCheckpointInfo(pMeta); code = streamBackendLoadCheckpointInfo(pMeta);
@ -207,6 +255,18 @@ void streamMetaClear(SStreamMeta* pMeta) {
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta"); qDebug("start to close stream meta");
// int64_t rid = *(int64_t*)pMeta->pRid;
// if (taosTmrStop(pMeta->hbInfo.hbTmr)) {
// taosMemoryFree(pMeta->pRid);
// } else {
// // do nothing, stop by timer thread
// }
taosRemoveRef(streamMetaId, pMeta->rid);
}
void streamMetaCloseImpl(void* arg) {
SStreamMeta* pMeta = arg;
qDebug("start to do-close stream meta");
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
} }
@ -623,18 +683,26 @@ static bool readyToSendHb(SMetaHbInfo* pInfo) {
} }
void metaHbToMnode(void* param, void* tmrId) { void metaHbToMnode(void* param, void* tmrId) {
SStreamMeta* pMeta = param; int64_t rid = *(int64_t*)param;
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
// taosMemoryFree(param);
return;
}
// need to stop, stop now // need to stop, stop now
if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) { if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) {
pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP; pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP;
qDebug("vgId:%d jump out of meta timer", pMeta->vgId); qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return; return;
} }
if (!readyToSendHb(&pMeta->hbInfo)) { if (!readyToSendHb(&pMeta->hbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid);
return; return;
} }
@ -673,6 +741,7 @@ void metaHbToMnode(void* param, void* tmrId) {
if (code < 0) { if (code < 0) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return; return;
} }
@ -680,6 +749,7 @@ void metaHbToMnode(void* param, void* tmrId) {
if (buf == NULL) { if (buf == NULL) {
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return; return;
} }
@ -689,6 +759,7 @@ void metaHbToMnode(void* param, void* tmrId) {
rpcFreeCont(buf); rpcFreeCont(buf);
qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
taosReleaseRef(streamMetaId, rid);
return; return;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
@ -702,5 +773,6 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid);
} }