fix(stream): fix error in tq timer.

This commit is contained in:
Haojun Liao 2024-01-09 09:13:53 +08:00
parent bbaaad6f1f
commit f9d5c0d403
7 changed files with 34 additions and 25 deletions

View File

@ -28,9 +28,6 @@
} \ } \
} while (0) } while (0)
extern int32_t streamTimerInit();
extern void streamTimerCleanUp();
static SDnode globalDnode = {0}; static SDnode globalDnode = {0};
SDnode *dmInstance() { return &globalDnode; } SDnode *dmInstance() { return &globalDnode; }
@ -169,7 +166,6 @@ int32_t dmInit() {
#if defined(USE_S3) #if defined(USE_S3)
if (s3Begin() != 0) return -1; if (s3Begin() != 0) return -1;
#endif #endif
if (streamTimerInit() != 0) return -1;
dInfo("dnode env is initialized"); dInfo("dnode env is initialized");
return 0; return 0;
@ -196,10 +192,10 @@ void dmCleanup() {
udfStopUdfd(); udfStopUdfd();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dmDiskClose(); dmDiskClose();
#if defined(USE_S3) #if defined(USE_S3)
s3End(); s3End();
#endif #endif
streamTimerCleanUp();
dInfo("dnode env is cleaned up"); dInfo("dnode env is cleaned up");

View File

@ -90,6 +90,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
goto _OVER; goto _OVER;
} }
#endif #endif
indexInit(tsNumOfCommitThreads); indexInit(tsNumOfCommitThreads);
streamMetaInit(); streamMetaInit();
@ -108,7 +109,9 @@ _OVER:
} }
void dmCleanupDnode(SDnode *pDnode) { void dmCleanupDnode(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) {
return;
}
dmCleanupClient(pDnode); dmCleanupClient(pDnode);
dmCleanupStatusClient(pDnode); dmCleanupStatusClient(pDnode);

View File

@ -42,7 +42,6 @@ extern "C" {
// clang-format on // clang-format on
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
extern void* tqTimer;
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
@ -108,6 +107,7 @@ struct STQ {
TTB* pExecStore; TTB* pExecStore;
TTB* pCheckStore; TTB* pCheckStore;
SStreamMeta* pStreamMeta; SStreamMeta* pStreamMeta;
void* tqTimer;
}; };
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);

View File

@ -26,16 +26,16 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
int32_t tqTimerInit() { static int32_t tqTimerInit(STQ* pTq) {
tqTimer = taosTmrInit(100, 100, 1000, "TQ"); pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ");
if (tqTimer == NULL) { if (pTq->tqTimer == NULL) {
return -1; return -1;
} }
return 0; return 0;
} }
void tqTimerCleanUp() { static void tqTimerCleanUp(STQ* pTq) {
taosTmrCleanUp(tqTimer); taosTmrCleanUp(pTq->tqTimer);
} }
void tqDestroyTqHandle(void* data) { void tqDestroyTqHandle(void* data) {
@ -118,7 +118,7 @@ int32_t tqInitialize(STQ* pTq) {
return -1; return -1;
} }
tqTimerInit(); tqTimerInit(pTq);
return 0; return 0;
} }
@ -149,7 +149,7 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
tqTimerCleanUp(); tqTimerCleanUp(pTq);
qDebug("end to close tq"); qDebug("end to close tq");
taosMemoryFree(pTq); taosMemoryFree(pTq);

View File

@ -92,6 +92,9 @@ extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId; extern int32_t taskDbWrapperId;
int32_t streamTimerInit();
void streamTimerCleanUp();
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);

View File

@ -21,13 +21,18 @@ void* streamTimer = NULL;
int32_t streamTimerInit() { int32_t streamTimerInit() {
streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); streamTimer = taosTmrInit(1000, 100, 10000, "STREAM");
if (streamTimer == NULL) { if (streamTimer == NULL) {
stError("init stream timer failed, code:%s", tstrerror(terrno));
return -1; return -1;
} }
stInfo("init stream timer, %p", streamTimer);
return 0; return 0;
} }
void streamTimerCleanUp() { void streamTimerCleanUp() {
stInfo("cleanup stream timer, %p", streamTimer);
taosTmrCleanUp(streamTimer); taosTmrCleanUp(streamTimer);
streamTimer = NULL;
} }
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {

View File

@ -23,7 +23,7 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
@ -61,9 +61,10 @@ static void streamMetaEnvInit() {
streamMetaId = taosOpenRef(64, streamMetaCloseImpl); streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
metaRefMgtInit(); metaRefMgtInit();
streamTimerInit();
} }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { /*taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);*/ streamMetaEnvInit();}
void streamMetaCleanup() { void streamMetaCleanup() {
taosCloseRef(streamBackendId); taosCloseRef(streamBackendId);
@ -71,6 +72,7 @@ void streamMetaCleanup() {
taosCloseRef(streamMetaId); taosCloseRef(streamMetaId);
metaRefMgtCleanup(); metaRefMgtCleanup();
streamTimerCleanUp();
} }
void metaRefMgtInit() { void metaRefMgtInit() {
@ -391,7 +393,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid); metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); // pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0; pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
@ -1202,7 +1204,7 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
if (!waitForEnoughDuration(pMeta->pHbInfo)) { if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); // taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
@ -1212,7 +1214,7 @@ void metaHbToMnode(void* param, void* tmrId) {
metaHeartbeatToMnodeImpl(pMeta); metaHeartbeatToMnodeImpl(pMeta);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); // taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }
@ -1262,11 +1264,11 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; // pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { // while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100); // taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); // stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
} // }
} }
stDebug("vgId:%d start to check all tasks", vgId); stDebug("vgId:%d start to check all tasks", vgId);
@ -1285,7 +1287,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid); metaRefMgtAdd(pMeta->vgId, pRid);
*pRid = pMeta->rid; *pRid = pMeta->rid;
metaHbToMnode(pRid, NULL); // metaHbToMnode(pRid, NULL);
} }
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {