Merge pull request #29969 from taosdata/fix/droptask

refactor(stream): not generated hb when trying to restart tasks.
This commit is contained in:
Simon Guan 2025-03-03 18:24:50 +08:00 committed by GitHub
commit 611a0f3d1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 44 additions and 22 deletions

View File

@ -789,6 +789,7 @@ void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta);
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
void streamMetaRLock(SStreamMeta* pMeta);
int32_t streamMetaTryRlock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);

View File

@ -299,6 +299,8 @@ _OVER:
}
sdbRelease(pMnode->pSdb, vObj);
cfgArrayCleanUp(array);
tFreeSConfigReq(&configReq);
return code;
}

View File

@ -1418,6 +1418,7 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
@ -1429,12 +1430,14 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_
tbData.pCreateTbReq = NULL;
}
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return code;
}
void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData);
if (p == NULL) {
tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno));
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return terrno;
}

View File

@ -148,9 +148,13 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
streamMetaRUnLock(pMeta);
code = streamMetaTryRlock(pMeta);
if (code == 0) {
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
streamMetaRUnLock(pMeta);
} else {
numOfTasks = 0;
}
if (numOfTasks == 0) {
goto _end;
@ -169,7 +173,6 @@ static void doStartScanWal(void* param, void* tmrId) {
}
_end:
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);

View File

@ -1585,10 +1585,13 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
pTask->pBackend = NULL;
}
streamMetaWLock(pTask->pMeta);
if (pTask->exec.pExecutor != NULL) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
streamMetaWUnLock(pTask->pMeta);
return 0;
}

View File

@ -363,13 +363,24 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
streamMetaRLock(pMeta);
code = streamMetaSendHbHelper(pMeta);
if (code) {
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
// NOTE: stream task in restart procedure. not generate the hb now, try to acquire the lock may cause stuck this timer.
int32_t count = 30;
bool send = false;
while ((--count) >= 0) {
int32_t ret = streamMetaTryRlock(pMeta);
if (ret != 0) {
taosMsleep(10);
} else {
send = true;
code = streamMetaSendHbHelper(pMeta);
streamMetaRUnLock(pMeta);
break;
}
}
streamMetaRUnLock(pMeta);
if (!send) {
stError("vgId:%d failed to send hmMsg to mnode, retry again in 5s, code:%s", pMeta->vgId, tstrerror(code));
}
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");

View File

@ -144,21 +144,11 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem
// let's try the ordinary input q
pQueue->qItem = NULL;
int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
if (code) {
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
}
int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
if (pQueue->qItem == NULL) {
code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
if (code) {
stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code));
}
code = taosGetQitem(pQueue->qall, &pQueue->qItem);
if (code) {
stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code));
}
num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
num = taosGetQitem(pQueue->qall, &pQueue->qItem);
}
*pItem = streamQueueCurItem(pQueue);

View File

@ -54,6 +54,15 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
}
}
int32_t streamMetaTryRlock(SStreamMeta* pMeta) {
int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
return code;
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);