Merge pull request #25375 from taosdata/fix/3_liaohj
fix(stream): generate the checkpoint id
This commit is contained in:
commit
a439f61708
|
@ -247,11 +247,11 @@ typedef enum {
|
|||
TASK_SCANHISTORY_CONT = 0x1,
|
||||
TASK_SCANHISTORY_QUIT = 0x2,
|
||||
TASK_SCANHISTORY_REXEC = 0x3,
|
||||
} EScanHistoryRet;
|
||||
} EScanHistoryCode;
|
||||
|
||||
typedef struct {
|
||||
EScanHistoryRet ret;
|
||||
int32_t idleTime;
|
||||
EScanHistoryCode ret;
|
||||
int32_t idleTime;
|
||||
} SScanhistoryDataInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -811,7 +811,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
|||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
|
|
|
@ -817,23 +817,57 @@ _OVER:
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t maxChkpId = 0;
|
||||
int64_t maxChkptId = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
|
||||
maxChkptId = TMAX(maxChkptId, pStream->checkpointId);
|
||||
mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid,
|
||||
pStream->checkpointId);
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1);
|
||||
return maxChkpId + 1;
|
||||
{ // check the max checkpoint id from all vnodes.
|
||||
int64_t maxCheckpointId = -1;
|
||||
if (lock) {
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->checkpointInfo.failed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (maxCheckpointId < pEntry->checkpointInfo.latestId) {
|
||||
maxCheckpointId = pEntry->checkpointInfo.latestId;
|
||||
}
|
||||
}
|
||||
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
if (maxCheckpointId > maxChkptId) {
|
||||
mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId,
|
||||
maxCheckpointId);
|
||||
maxChkptId = maxCheckpointId;
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1);
|
||||
return maxChkptId + 1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||
|
@ -844,7 +878,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||
pMsg->checkpointId = mndStreamGenChkpId(pMnode);
|
||||
pMsg->checkpointId = mndStreamGenChkptId(pMnode, true);
|
||||
|
||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||
|
@ -2301,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
|
||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||
if (total == numOfTasks) { // all tasks has send the reqs
|
||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||
int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
|
||||
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
||||
|
||||
if (pStream != NULL) { // TODO:handle error
|
||||
|
|
|
@ -936,7 +936,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
|
||||
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
|
||||
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
|
||||
streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
|
||||
} else {
|
||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||
ETaskStatus s = p->state;
|
||||
|
|
|
@ -87,6 +87,11 @@ struct SStreamQueue {
|
|||
int8_t status;
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
EXEC_CONTINUE = 0x0,
|
||||
EXEC_AFTER_IDLE = 0x1,
|
||||
} EExtractDataCode;
|
||||
|
||||
extern void* streamTimer;
|
||||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
|
@ -125,7 +130,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
|||
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize);
|
||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#define STREAM_RESULT_DUMP_THRESHOLD 300
|
||||
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
|
||||
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
|
||||
#define MIN_INVOKE_INTERVAL 50 // 50ms
|
||||
|
||||
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
|
||||
|
||||
|
@ -580,16 +581,21 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) {
|
||||
if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) {
|
||||
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id);
|
||||
setTaskSchedInfo(pTask, 50);
|
||||
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||
if (pInput == NULL) {
|
||||
ASSERT(numOfBlocks == 0);
|
||||
return 0;
|
||||
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||
if (ret == EXEC_AFTER_IDLE) {
|
||||
ASSERT(pInput == NULL && numOfBlocks == 0);
|
||||
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
|
||||
} else {
|
||||
if (pInput == NULL) {
|
||||
ASSERT(numOfBlocks == 0);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch checkpoint msg to all downstream tasks
|
||||
|
|
|
@ -145,7 +145,7 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
|
@ -157,13 +157,13 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_AFTER_IDLE;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
|
||||
|
@ -179,7 +179,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
streamTaskPutbackToken(pTask->outputInfo.pTokenBucket);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
||||
// do not merge blocks for sink node and check point data block
|
||||
|
@ -196,7 +196,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*blockSize = 0;
|
||||
*numOfBlocks = 1;
|
||||
*pInput = qItem;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
} else { // previous existed blocks needs to be handle, before handle the checkpoint msg block
|
||||
stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks);
|
||||
*blockSize = streamQueueItemGetSize(*pInput);
|
||||
|
@ -205,7 +205,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
} else {
|
||||
if (*pInput == NULL) {
|
||||
|
@ -226,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
}
|
||||
|
||||
streamQueueProcessFail(pTask->inputq.queue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
||||
*pInput = newRet;
|
||||
|
@ -243,7 +243,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
|
||||
if (numOfTicks <= 0) {
|
||||
numOfTicks = 1;
|
||||
|
|
Loading…
Reference in New Issue