refactor: 1) adjust wal scan interval to be 500ms, 2) record the checkpointVer to be timestamp for force_window_close, 3) do some other internal refactor.
This commit is contained in:
parent
8e319d777b
commit
3847ed2dc9
|
@ -13,9 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <common/tmsg.h>
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
|
||||
|
@ -50,7 +48,7 @@ static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkI
|
|||
static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo);
|
||||
static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id);
|
||||
static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode);
|
||||
static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||
static void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs);
|
||||
static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode,
|
||||
int64_t earlyTs);
|
||||
|
||||
|
@ -1062,7 +1060,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
return;
|
||||
}
|
||||
|
||||
reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||
rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1165,7 +1163,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||
void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
|
|
@ -17,19 +17,20 @@
|
|||
#include "vnd.h"
|
||||
|
||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
|
||||
|
||||
typedef struct SBuildScanWalMsgParam {
|
||||
int64_t metaId;
|
||||
int32_t numOfTasks;
|
||||
} SBuildScanWalMsgParam;
|
||||
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
|
||||
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
|
||||
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
int32_t tqScanWal(STQ* pTq) {
|
||||
|
@ -37,12 +38,11 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t numOfTasks = 0;
|
||||
bool shouldIdle = true;
|
||||
|
||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
|
||||
// check all tasks
|
||||
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
|
||||
int32_t code = doScanWalForAllTasks(pMeta);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
|
||||
return code;
|
||||
|
@ -133,10 +133,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
|||
}
|
||||
|
||||
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t numOfTasks = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
// do not launch the stream tasks, if it is a follower or not restored vnode.
|
||||
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
|
||||
|
@ -144,47 +143,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
code = doScanWalAsync(pTq, ckPause);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pMeta->startInfo.startAllTasks) {
|
||||
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pMeta->scanInfo.scanCounter += 1;
|
||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanCounter > 1) {
|
||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
|
||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
||||
|
||||
// reset the counter value, since we do not launch the scan wal operation.
|
||||
pMeta->scanInfo.scanCounter = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||
numOfTasks, alreadyRestored);
|
||||
|
||||
int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -368,11 +328,8 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||
*pScanIdle = true;
|
||||
bool noDataInWal = true;
|
||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
||||
int32_t vgId = pStreamMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -410,8 +367,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
*pScanIdle = false;
|
||||
|
||||
// seek the stored version and extract data from WAL
|
||||
code = setWalReaderStartOffset(pTask, vgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -437,7 +392,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((numOfItems > 0) || hasNewData) {
|
||||
noDataInWal = false;
|
||||
code = streamTrySchedExec(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
|
@ -449,11 +403,47 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
}
|
||||
|
||||
// all wal are checked, and no new data available in wal.
|
||||
if (noDataInWal) {
|
||||
*pScanIdle = true;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
bool alreadyRestored = pTq->pVnode->restored;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pMeta->startInfo.startAllTasks) {
|
||||
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pMeta->scanInfo.scanCounter += 1;
|
||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||
}
|
||||
|
||||
if (pMeta->scanInfo.scanCounter > 1) {
|
||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
|
||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
||||
|
||||
// reset the counter value, since we do not launch the scan wal operation.
|
||||
pMeta->scanInfo.scanCounter = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||
numOfTasks, alreadyRestored);
|
||||
|
||||
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
}
|
||||
|
|
|
@ -4378,7 +4378,6 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
|
|||
}
|
||||
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
|
||||
stDebug("streamStateFillGetKVByCur_rocksdb");
|
||||
if (!pCur) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -522,7 +522,10 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
|
|||
if (pItem->type == STREAM_INPUT__GET_RES) {
|
||||
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
|
||||
code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||
stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey);
|
||||
(*pVer) = pTrigger->pBlock->info.window.skey;
|
||||
}
|
||||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
|
||||
code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||
|
@ -671,7 +674,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
|
|||
|
||||
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
|
||||
|
||||
// update the currentVer if processing the submit blocks.
|
||||
// update the currentVer if processing the submitted blocks.
|
||||
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
|
||||
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
|
||||
pInfo->checkpointVer, pInfo->nextProcessVer, ver);
|
||||
|
@ -688,6 +691,34 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
// do nothing after sync executor state to storage backend, untill checkpoint is completed.
|
||||
static int32_t doHandleChkptBlock(SStreamTask* pTask) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
} else { // todo refactor
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
} else {
|
||||
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
|
@ -832,36 +863,16 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
if (type != STREAM_INPUT__CHECKPOINT) {
|
||||
if (type == STREAM_INPUT__CHECKPOINT) {
|
||||
code = doHandleChkptBlock(pTask);
|
||||
streamFreeQitem(pInput);
|
||||
return code;
|
||||
} else {
|
||||
code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
|
||||
streamFreeQitem(pInput);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
} else { // todo other thread may change the status
|
||||
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
} else { // todo refactor
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
} else {
|
||||
code = streamTaskSendCheckpointReadyMsg(pTask);
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamFreeQitem(pInput);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,7 +214,6 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
|||
|
||||
// check whether the time window gaps exist or not
|
||||
int64_t now = taosGetTimestamp(precision);
|
||||
int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
|
||||
|
||||
// there are gaps, needs to be filled
|
||||
STimeWindow w = pTrigger->pBlock->info.window;
|
||||
|
@ -226,13 +225,15 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
|||
}
|
||||
|
||||
pTask->status.latestForceWindow = w;
|
||||
if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
if (w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) {
|
||||
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
|
||||
*pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev,
|
||||
*pNextTrigger);
|
||||
|
||||
pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval;
|
||||
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id,
|
||||
num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);
|
||||
return code;
|
||||
} else {
|
||||
stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
|
||||
|
@ -289,7 +290,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec
|
||||
nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 sec
|
||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL);
|
||||
} else {
|
||||
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
|
Loading…
Reference in New Issue