fix:[TD-27644]heartbeat closed before snode close leading to snode is hanged

This commit is contained in:
wangmm0220 2023-12-01 17:32:29 +08:00
parent acce0852df
commit f06117dbc9
10 changed files with 32 additions and 100 deletions

View File

@ -216,9 +216,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
int32_t streamInit();
void streamCleanUp();
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);

View File

@ -28,6 +28,9 @@
} \
} while (0)
extern int32_t streamTimerInit();
extern void streamTimerCleanUp();
static SDnode globalDnode = {0};
SDnode *dmInstance() { return &globalDnode; }
@ -166,6 +169,7 @@ int32_t dmInit() {
#if defined(USE_S3)
if (s3Begin() != 0) return -1;
#endif
if (streamTimerInit() != 0) return -1;
dInfo("dnode env is initialized");
return 0;
@ -194,6 +198,8 @@ void dmCleanup() {
#if defined(USE_S3)
s3End();
#endif
streamTimerCleanUp();
dInfo("dnode env is cleaned up");
taosCleanupCfg();

View File

@ -223,8 +223,6 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
// tq
int tqInit();
void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);

View File

@ -17,12 +17,6 @@
#include "vnd.h"
#include "tqCommon.h"
typedef struct {
int8_t inited;
} STqMgmt;
static STqMgmt tqMgmt = {0};
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
@ -32,36 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
int32_t tqInit() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
if (old != 2) break;
}
if (old == 0) {
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1);
}
return 0;
}
void tqCleanUp() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
streamCleanUp();
atomic_store_8(&tqMgmt.inited, 0);
}
}
void tqDestroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);

View File

@ -39,12 +39,6 @@ int vnodeInit(int nthreads) {
if (walInit() < 0) {
return -1;
}
if (tqInit() < 0) {
return -1;
}
if (s3Init() < 0) {
return -1;
}
return 0;
}
@ -58,7 +52,5 @@ void vnodeCleanup() {
vnodeAsyncDestroy(&vnodeAsyncHandle[1]);
walCleanUp();
tqCleanUp();
smaCleanUp();
s3CleanUp();
}

View File

@ -57,11 +57,6 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamGlobalEnv {
int8_t inited;
void* timer;
} SStreamGlobalEnv;
typedef struct SStreamContinueExecInfo {
SEpSet epset;
int32_t taskId;
@ -92,7 +87,7 @@ struct SStreamQueue {
int8_t status;
};
extern SStreamGlobalEnv streamEnv;
extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;

View File

@ -16,38 +16,18 @@
#include "streamInt.h"
#include "ttimer.h"
SStreamGlobalEnv streamEnv;
void* streamTimer = NULL;
int32_t streamInit() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
if (old != 2) break;
}
if (old == 0) {
streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM");
if (streamEnv.timer == NULL) {
atomic_store_8(&streamEnv.inited, 0);
int32_t streamTimerInit() {
streamTimer = taosTmrInit(1000, 100, 10000, "STREAM");
if (streamTimer == NULL) {
return -1;
}
atomic_store_8(&streamEnv.inited, 1);
}
return 0;
}
void streamCleanUp() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
taosTmrCleanUp(streamEnv.timer);
atomic_store_8(&streamEnv.inited, 0);
}
void streamTimerCleanUp() {
taosTmrCleanUp(streamTimer);
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
@ -77,7 +57,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
if (pTrigger == NULL) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -88,7 +68,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -97,7 +77,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -105,7 +85,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
@ -115,7 +95,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer);
pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}

View File

@ -506,9 +506,9 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer);
} else {
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer);
}
}

View File

@ -369,7 +369,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
@ -1099,7 +1099,7 @@ void metaHbToMnode(void* param, void* tmrId) {
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
@ -1215,7 +1215,7 @@ void metaHbToMnode(void* param, void* tmrId) {
_end:
clearHbMsg(&hbMsg, pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}

View File

@ -114,7 +114,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
}
}
@ -137,9 +137,9 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer);
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
@ -485,7 +485,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer);
}
}
@ -726,7 +726,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
@ -754,7 +754,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
@ -815,7 +815,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
if (pTask->hTaskInfo.pTimer == NULL) {
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
@ -828,7 +828,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer);
}
return TSDB_CODE_SUCCESS;