Merge pull request #29034 from taosdata/fix/liaohj

fix(stream): fix deadlock during update checkpoint info
This commit is contained in:
Shengliang Guan 2024-12-09 10:27:14 +08:00 committed by GitHub
commit 4978c0aa59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 180 additions and 220 deletions

View File

@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return terrno = code; return terrno = code;
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
if (code) {
mError("failed to register trans, transId:%d, and continue", pTrans->id);
}
} }
if (!includeAllNodes) { if (!includeAllNodes) {
@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id); pStream->name, pTrans->id);
// NOTE: for each stream, we register one trans entry for task update
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
if (code) {
mError("failed to register trans, transId:%d, and continue", pTrans->id);
}
code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again // todo: not continue, drop all and retry again

View File

@ -35,7 +35,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
size_t keyLen = 0; size_t keyLen = 0;
void *pIter = NULL; void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
int32_t num = 0; int32_t numOfChkpt = 0;
if (pNumOfActiveChkpt != NULL) {
*pNumOfActiveChkpt = 0;
}
if (pList == NULL) { if (pList == NULL) {
return terrno; return terrno;
@ -50,15 +54,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
void *pKey = taosHashGetKey(pEntry, &keyLen); void *pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name // key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
pEntry->startTime); pEntry->streamId, pEntry->name, pEntry->startTime);
void* p = taosArrayPush(pList, &info); void* p = taosArrayPush(pList, &info);
if (p == NULL) { if (p == NULL) {
return terrno; return terrno;
} }
} else { } else {
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
num++; numOfChkpt++;
} }
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
@ -78,48 +82,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
} }
} }
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size,
taosHashGetSize(execInfo.transMgmt.pDBTrans), num); taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt);
taosArrayDestroy(pList); taosArrayDestroy(pList);
if (pNumOfActiveChkpt != NULL) { if (pNumOfActiveChkpt != NULL) {
*pNumOfActiveChkpt = num; *pNumOfActiveChkpt = numOfChkpt;
} }
return 0; return 0;
} }
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
// For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans.
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) {
streamMutexLock(&execInfo.lock);
}
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) { if (num <= 0) {
if (lock) {
streamMutexUnlock(&execInfo.lock);
}
return 0; return 0;
} }
// if any task updates exist, any other stream trans are not allowed to be created
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
if (code) { if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code)); mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
} }
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) { if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry; SStreamTransInfo tInfo = *pEntry;
if (lock) {
streamMutexUnlock(&execInfo.lock);
}
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
(strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
@ -141,11 +131,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
} }
return TSDB_CODE_SUCCESS;
}
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
// For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans.
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) {
streamMutexLock(&execInfo.lock);
}
int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
if (lock) { if (lock) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
} }
return 0; return code;
} }
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {

View File

@ -254,11 +254,12 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask
} }
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaWLock(pMeta);
int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId); int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
if (code != 0) { if (code != 0) {
smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code)); smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
} }
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk

View File

@ -1108,91 +1108,76 @@ _OVER:
return code; return code;
} }
// always return success to mnode
//todo: handle failure of build and send msg to mnode
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
int32_t taskId) {
SRpcMsg rsp = {0};
int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
if (ret) { // suppress the error in build checkpoint source rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
}
tmsgSendRsp(&rsp); // error occurs
}
// no matter what kinds of error happened, make sure the mnode will receive the success execution code. // no matter what kinds of error happened, make sure the mnode will receive the success execution code.
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0; int32_t code = 0;
SStreamCheckpointSourceReq req = {0};
SDecoder decoder = {0};
SStreamTask* pTask = NULL;
int64_t checkpointId = 0;
// disable auto rsp to mnode // disable auto rsp to mnode
pRsp->info.handle = NULL; pRsp->info.handle = NULL;
SStreamCheckpointSourceReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
SRpcMsg rsp = {0}; return TSDB_CODE_SUCCESS; // always return success to mnode,
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0}; doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS; // always return success to mnode
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64 tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
", transId:%d s-task:0x%x ignore it", ", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId); vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0}; doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); return TSDB_CODE_SUCCESS; // always return success to mnode
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
} }
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL || code != 0) { if (pTask == NULL || code != 0) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
" transId:%d it may have been destroyed", " transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId); vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0}; doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pTask->status.downstreamReady != 1) { if (pTask->status.downstreamReady != 1) {
streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id // record the latest failed checkpoint id
streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
", transId:%d set it failed", ", transId:%d set it failed",
pTask->id.idStr, req.checkpointId, req.transId); pTask->id.idStr, req.checkpointId, req.transId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
SRpcMsg rsp = {0};
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // todo retry handle error return TSDB_CODE_SUCCESS; // todo retry handle error
} }
@ -1207,14 +1192,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
SRpcMsg rsp = {0};
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
@ -1226,7 +1204,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not. // check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId); streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
@ -1235,7 +1212,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status } else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) { if (req.checkpointId <= pTask->chkInfo.checkpointId) {
@ -1245,15 +1222,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
SRpcMsg rsp = {0};
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -1264,7 +1233,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (code) { if (code) {
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
tstrerror(code)); tstrerror(code));
return code; streamMetaReleaseTask(pMeta, pTask);
doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
return TSDB_CODE_SUCCESS;
} }
if (req.mndTrigger) { if (req.mndTrigger) {
@ -1279,13 +1250,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0}; streamTaskSetCheckpointFailed(pTask); // set the checkpoint failed
int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
if (ret) { // suppress the error in build checkpointsource rsp
tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
}
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);

View File

@ -13,9 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <common/tmsg.h>
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h"
#include "tq.h" #include "tq.h"
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) #define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
@ -50,7 +48,7 @@ static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkI
static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo); static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode); static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); static void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
int64_t earlyTs); int64_t earlyTs);
@ -1062,7 +1060,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return; return;
} }
reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
} }
} }
@ -1165,7 +1163,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
int32_t code = 0; int32_t code = 0;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;

View File

@ -17,19 +17,20 @@
#include "vnd.h" #include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3 #define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100 #define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
typedef struct SBuildScanWalMsgParam { typedef struct SBuildScanWalMsgParam {
int64_t metaId; int64_t metaId;
int32_t numOfTasks; int32_t numOfTasks;
} SBuildScanWalMsgParam; } SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask); static bool taskReadyForDataFromWal(SStreamTask* pTask);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
@ -37,12 +38,11 @@ int32_t tqScanWal(STQ* pTq) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0; int32_t numOfTasks = 0;
bool shouldIdle = true;
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
// check all tasks // check all tasks
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); int32_t code = doScanWalForAllTasks(pMeta);
if (code) { if (code) {
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
return code; return code;
@ -133,10 +133,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
} }
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored; bool alreadyRestored = pTq->pVnode->restored;
int32_t numOfTasks = 0; int32_t code = 0;
// do not launch the stream tasks, if it is a follower or not restored vnode. // do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
@ -144,47 +143,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
code = doScanWalAsync(pTq, ckPause);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
streamMetaWUnLock(pMeta);
return 0;
}
if (pMeta->startInfo.startAllTasks) {
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
streamMetaWUnLock(pMeta);
return 0;
}
pMeta->scanInfo.scanCounter += 1;
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
}
if (pMeta->scanInfo.scanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
streamMetaWUnLock(pMeta);
return 0;
}
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
// reset the counter value, since we do not launch the scan wal operation.
pMeta->scanInfo.scanCounter = 0;
streamMetaWUnLock(pMeta);
return 0;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
numOfTasks, alreadyRestored);
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return code; return code;
} }
@ -368,11 +328,8 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
return code; return code;
} }
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
*pScanIdle = true;
bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -410,8 +367,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
*pScanIdle = false;
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
code = setWalReaderStartOffset(pTask, vgId); code = setWalReaderStartOffset(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -437,7 +392,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) { if ((numOfItems > 0) || hasNewData) {
noDataInWal = false;
code = streamTrySchedExec(pTask); code = streamTrySchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
@ -449,11 +403,47 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
} }
// all wal are checked, and no new data available in wal.
if (noDataInWal) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskList); taosArrayDestroy(pTaskList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
if (pMeta->startInfo.startAllTasks) {
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
return 0;
}
pMeta->scanInfo.scanCounter += 1;
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
}
if (pMeta->scanInfo.scanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
return 0;
}
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
if (ckPause && numOfTasks == numOfPauseTasks) {
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
// reset the counter value, since we do not launch the scan wal operation.
pMeta->scanInfo.scanCounter = 0;
return 0;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
numOfTasks, alreadyRestored);
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
}

View File

@ -718,8 +718,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
} }
streamMetaWUnLock(pMeta);
// drop the related fill-history task firstly // drop the related fill-history task firstly
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
@ -737,7 +735,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
// commit the update // commit the update
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);

View File

@ -37,7 +37,7 @@ extern "C" {
#define META_HB_CHECK_INTERVAL 200 #define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 5120
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
// clang-format off // clang-format off

View File

@ -4378,7 +4378,6 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
} }
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) { int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
stDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) { if (!pCur) {
return -1; return -1;
} }

View File

@ -591,7 +591,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
{ // destroy the related fill-history tasks { // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now // drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -599,7 +598,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
id, vgId, pReq->taskId, numOfTasks); id, vgId, pReq->taskId, numOfTasks);
} }
streamMetaWLock(pMeta);
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaCommit(pMeta); code = streamMetaCommit(pMeta);
} }
@ -675,8 +673,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
streamMetaWUnLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now // drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
@ -685,9 +681,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
(int32_t)pReq->hTaskId, numOfTasks); (int32_t)pReq->hTaskId, numOfTasks);
} }
streamMetaWLock(pMeta);
code = streamMetaCommit(pMeta); code = streamMetaCommit(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -522,7 +522,10 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
if (pItem->type == STREAM_INPUT__GET_RES) { if (pItem->type == STREAM_INPUT__GET_RES) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey);
(*pVer) = pTrigger->pBlock->info.window.skey;
}
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
@ -671,7 +674,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
// update the currentVer if processing the submit blocks. // update the currentVer if processing the submitted blocks.
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
pInfo->checkpointVer, pInfo->nextProcessVer, ver); pInfo->checkpointVer, pInfo->nextProcessVer, ver);
@ -688,6 +691,34 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
return code; return code;
} }
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
int32_t code = 0;
const char* id = pTask->id.idStr;
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
} else { // todo refactor
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
tstrerror(code));
}
}
streamMutexUnlock(&pTask->lock);
return code;
}
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
@ -832,36 +863,16 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
} }
if (type != STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
code = doHandleChkptBlock(pTask);
streamFreeQitem(pInput);
return code;
} else {
code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks); code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
streamFreeQitem(pInput); streamFreeQitem(pInput);
if (code) { if (code) {
return code; return code;
} }
} else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
} else { // todo refactor
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
tstrerror(code));
}
}
streamMutexUnlock(&pTask->lock);
streamFreeQitem(pInput);
return code;
} }
} }
} }

View File

@ -501,8 +501,6 @@ _err:
void streamMetaInitBackend(SStreamMeta* pMeta) { void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
streamMetaWUnLock(pMeta);
while (1) { while (1) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
@ -908,8 +906,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
int32_t code = 0; int32_t code = 0;
STaskId id = {.streamId = streamId, .taskId = taskId}; STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta);
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) { if (code == 0) {
// desc the paused task counter // desc the paused task counter
@ -958,10 +954,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
streamMetaWUnLock(pMeta);
} else { } else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
} }
return 0; return 0;

View File

@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int8_t precision = pTask->info.interval.precision; int8_t precision = pTask->info.interval.precision;
SStreamTrigger* pTrigger = NULL; SStreamTrigger* pTrigger = NULL;
bool isFull = false;
while (1) { while (1) {
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
@ -214,7 +215,6 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
// check whether the time window gaps exist or not // check whether the time window gaps exist or not
int64_t now = taosGetTimestamp(precision); int64_t now = taosGetTimestamp(precision);
int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
// there are gaps, needs to be filled // there are gaps, needs to be filled
STimeWindow w = pTrigger->pBlock->info.window; STimeWindow w = pTrigger->pBlock->info.window;
@ -226,13 +226,18 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
} }
pTask->status.latestForceWindow = w; pTask->status.latestForceWindow = w;
if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) { isFull = streamQueueIsFull(pTask->inputq.queue);
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) {
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
if (!isFull) {
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
}
*pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now;
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev, pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval;
*pNextTrigger); stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id,
num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);
return code; return code;
} else { } else {
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
@ -289,7 +294,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
} }
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 sec
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL); stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL);
} else { } else {
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {