Merge pull request #29884 from taosdata/enh/killtrans
enh(stream): auto kill checkpoint trans that last long time
This commit is contained in:
commit
e429264432
|
@ -116,7 +116,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
||||||
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
|
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
|
||||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
|
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans);
|
||||||
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
||||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
|
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
|
||||||
|
|
||||||
|
@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
|
||||||
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
|
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
|
||||||
SVgroupChangeInfo *pInfo);
|
SVgroupChangeInfo *pInfo);
|
||||||
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
||||||
|
void killChkptAndResetStreamTask(SMnode *pMnode, SArray *pLongChkpts);
|
||||||
bool isNodeUpdateTransActive();
|
bool isNodeUpdateTransActive();
|
||||||
|
|
||||||
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
|
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
|
||||||
|
|
|
@ -1258,17 +1258,47 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t numOfCheckpointTrans = 0;
|
int32_t numOfCheckpointTrans = 0;
|
||||||
|
SArray *pLongChkpts = NULL;
|
||||||
|
SArray *pList = NULL;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
|
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
|
pList = taosArrayInit(4, sizeof(SCheckpointInterval));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
mError("failed to init chkptInterval info, not handle stream checkpoint, code:%s", tstrerror(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
pLongChkpts = taosArrayInit(4, sizeof(SStreamTransInfo));
|
||||||
|
if (pLongChkpts == NULL) {
|
||||||
|
mError("failed to init long checkpoint list, not handle stream checkpoint, code:%s", tstrerror(terrno));
|
||||||
|
taosArrayDestroy(pList);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if ongong checkpoint trans or long chkpt trans exist.
|
||||||
|
code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans, pLongChkpts);
|
||||||
|
if (code) {
|
||||||
|
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
||||||
|
|
||||||
|
taosArrayDestroy(pList);
|
||||||
|
taosArrayDestroy(pLongChkpts);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// kill long exec checkpoint and set task status
|
||||||
|
if (taosArrayGetSize(pLongChkpts) > 0) {
|
||||||
|
killChkptAndResetStreamTask(pMnode, pLongChkpts);
|
||||||
|
|
||||||
|
taosArrayDestroy(pList);
|
||||||
|
taosArrayDestroy(pLongChkpts);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pLongChkpts);
|
||||||
|
|
||||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||||
int64_t duration = now - pStream->checkpointFreq;
|
int64_t duration = now - pStream->checkpointFreq;
|
||||||
|
@ -1304,12 +1334,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArraySort(pList, streamWaitComparFn);
|
taosArraySort(pList, streamWaitComparFn);
|
||||||
code = mndStreamClearFinishedTrans(pMnode, &numOfCheckpointTrans);
|
|
||||||
if (code) {
|
|
||||||
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
|
||||||
taosArrayDestroy(pList);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfQual = taosArrayGetSize(pList);
|
int32_t numOfQual = taosArrayGetSize(pList);
|
||||||
if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
|
if (numOfCheckpointTrans >= tsMaxConcurrentCheckpoint) {
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
#define MAX_CHKPT_EXEC_ELAPSED (600*1000) // 600s
|
||||||
|
|
||||||
typedef struct SKeyInfo {
|
typedef struct SKeyInfo {
|
||||||
void *pKey;
|
void *pKey;
|
||||||
int32_t keyLen;
|
int32_t keyLen;
|
||||||
|
@ -31,11 +33,12 @@ int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t s
|
||||||
return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
|
return taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) {
|
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt, SArray*pLongChkptTrans) {
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
|
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
|
||||||
int32_t numOfChkpt = 0;
|
int32_t numOfChkpt = 0;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
if (pNumOfActiveChkpt != NULL) {
|
if (pNumOfActiveChkpt != NULL) {
|
||||||
*pNumOfActiveChkpt = 0;
|
*pNumOfActiveChkpt = 0;
|
||||||
|
@ -63,6 +66,18 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
|
||||||
} else {
|
} else {
|
||||||
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
|
||||||
numOfChkpt++;
|
numOfChkpt++;
|
||||||
|
|
||||||
|
// last for 10min, kill it
|
||||||
|
int64_t dur = now - pTrans->createdTime;
|
||||||
|
if ((dur >= MAX_CHKPT_EXEC_ELAPSED) && (pLongChkptTrans != NULL)) {
|
||||||
|
mInfo("long chkpt transId:%d, start:%" PRId64
|
||||||
|
" exec duration:%.2fs, beyond threshold %.2f min, kill it and reset task status",
|
||||||
|
pTrans->id, pTrans->createdTime, dur / 1000.0, MAX_CHKPT_EXEC_ELAPSED/(1000*60.0));
|
||||||
|
void* p = taosArrayPush(pLongChkptTrans, pEntry);
|
||||||
|
if (p == NULL) {
|
||||||
|
mError("failed to add long checkpoint trans, transId:%d, code:%s", pEntry->transId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
}
|
}
|
||||||
|
@ -101,7 +116,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
|
||||||
}
|
}
|
||||||
|
|
||||||
// if any task updates exist, any other stream trans are not allowed to be created
|
// if any task updates exist, any other stream trans are not allowed to be created
|
||||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
|
mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
@ -160,7 +175,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
|
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL, NULL);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
mError("failed to clear finish trans, code:%s", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
@ -361,3 +376,37 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||||
|
|
||||||
mDebug("complete clear checkpoints in all Dbs");
|
mDebug("complete clear checkpoints in all Dbs");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int32_t num = taosArrayGetSize(pLongChkpts);
|
||||||
|
|
||||||
|
mInfo("start to kill %d long checkpoint trans", num);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SStreamTransInfo* pTrans = (SStreamTransInfo*) taosArrayGet(pLongChkpts, i);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
double el = (now - pTrans->startTime) / 1000.0;
|
||||||
|
mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed",
|
||||||
|
pTrans->streamId, pTrans->transId, el);
|
||||||
|
|
||||||
|
SStreamObj *p = NULL;
|
||||||
|
code = mndGetStreamObj(pMnode, pTrans->streamId, &p);
|
||||||
|
if (code == 0 && p != NULL) {
|
||||||
|
mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb);
|
||||||
|
|
||||||
|
mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name,
|
||||||
|
pTrans->streamId, pTrans->transId, p->checkpointId);
|
||||||
|
|
||||||
|
code = mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId);
|
||||||
|
if (code) {
|
||||||
|
mError("stream:%s 0x%"PRIx64" failed to create reset stream task, code:%s", p->name, p->uid, tstrerror(code));
|
||||||
|
}
|
||||||
|
sdbRelease(pMnode->pSdb, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -777,7 +777,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
stDebug("s-task:%s start to extract data block from inputQ", id);
|
int64_t st = taosGetTimestampMs();
|
||||||
|
stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t blockSize = 0;
|
int32_t blockSize = 0;
|
||||||
|
@ -807,8 +808,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
|
|
||||||
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||||
if (ret == EXEC_AFTER_IDLE) {
|
if (ret == EXEC_AFTER_IDLE) {
|
||||||
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
|
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
|
||||||
|
@ -825,6 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
// dispatch checkpoint msg to all downstream tasks
|
// dispatch checkpoint msg to all downstream tasks
|
||||||
int32_t type = pInput->type;
|
int32_t type = pInput->type;
|
||||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
|
#if 0
|
||||||
|
// Injection error: for automatic kill long trans test
|
||||||
|
taosMsleep(50*1000);
|
||||||
|
#endif
|
||||||
code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
|
code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
|
stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue