fix(stream): remove fill-history sink task.
This commit is contained in:
parent
2bfd6e3355
commit
91e3d70796
|
@ -321,8 +321,8 @@ typedef struct {
|
|||
int64_t init;
|
||||
int64_t step1Start;
|
||||
int64_t step2Start;
|
||||
int64_t execStart;
|
||||
int32_t taskUpdateCount;
|
||||
int64_t start;
|
||||
int32_t updateCount;
|
||||
int64_t latestUpdateTs;
|
||||
} STaskExecStatisInfo;
|
||||
|
||||
|
@ -722,7 +722,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
|||
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
|
||||
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
int8_t isSucceed);
|
||||
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||
|
|
|
@ -1084,7 +1084,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||
|
||||
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1334,10 +1335,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
|
|||
|
||||
// commit the update
|
||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks);
|
||||
|
||||
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
|||
|
||||
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
|
||||
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0;
|
||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||
" submit into dst table, duration:%.2f Sec.",
|
||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
|
||||
|
@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTask->taskExecInfo.execStart == 0) {
|
||||
pTask->taskExecInfo.execStart = taosGetTimestampMs();
|
||||
if (pTask->taskExecInfo.start == 0) {
|
||||
pTask->taskExecInfo.start = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
bool onlySubmitData = true;
|
||||
|
|
|
@ -303,7 +303,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
pTask->id.idStr, pTask->streamTaskId.taskId);
|
||||
|
||||
// 1. free it and remove fill-history task from disk meta-store
|
||||
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
|
||||
|
||||
// 2. save to disk
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
@ -365,7 +366,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||
|
||||
// 4. free it and remove fill-history task from disk meta-store
|
||||
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
|
||||
|
||||
// 5. clear the link between fill-history task and stream task info
|
||||
pStreamTask->historyTaskId.taskId = 0;
|
||||
|
@ -408,6 +410,8 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
} else { // drop fill-history task
|
||||
streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -503,16 +507,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
}
|
||||
} else { // non-dispatch task, do task state transfer directly
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
if (level != TASK_LEVEL__SINK) {
|
||||
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
|
||||
}
|
||||
} else {
|
||||
qDebug("s-task:%s sink task does not transfer state", id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
/*int8_t status = */ streamTaskSetSchedStatusInActive(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,8 @@ struct SMetaHbInfo {
|
|||
tmr_h hbTmr;
|
||||
int32_t stopFlag;
|
||||
int32_t tickCounter;
|
||||
int32_t hbCount;
|
||||
int64_t hbStart;
|
||||
};
|
||||
|
||||
SMetaRefMgt gMetaRefMgt;
|
||||
|
@ -332,6 +334,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
taosHashCleanup(pMeta->pTaskBackendUnique);
|
||||
taosHashCleanup(pMeta->pUpdateTaskSet);
|
||||
|
||||
taosMemoryFree(pMeta->pHbInfo);
|
||||
taosMemoryFree(pMeta->path);
|
||||
taosThreadMutexDestroy(&pMeta->backendMutex);
|
||||
|
||||
|
@ -784,7 +787,6 @@ static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
|
|||
void metaHbToMnode(void* param, void* tmrId) {
|
||||
int64_t rid = *(int64_t*)param;
|
||||
|
||||
SStreamHbMsg hbMsg = {0};
|
||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
|
||||
if (pMeta == NULL) {
|
||||
return;
|
||||
|
@ -802,31 +804,37 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
if (!pMeta->leader) {
|
||||
qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
pMeta->pHbInfo->hbStart = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// set the hb start time
|
||||
if (pMeta->pHbInfo->hbStart == 0) {
|
||||
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
if (!enoughTimeDuration(pMeta->pHbInfo)) {
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
return;
|
||||
}
|
||||
|
||||
qInfo("vgId:%d start hb", pMeta->vgId);
|
||||
qDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader);
|
||||
|
||||
SStreamHbMsg hbMsg = {0};
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasValEpset = false;
|
||||
|
||||
hbMsg.vgId = pMeta->vgId;
|
||||
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||
|
||||
int64_t keys[2] = {pId->streamId, pId->taskId};
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
|
||||
if ((*pTask)->info.fillHistory == 1) {
|
||||
continue;
|
||||
}
|
||||
|
@ -878,10 +886,13 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
msg.info.noResp = 1;
|
||||
|
||||
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
|
||||
pMeta->pHbInfo->hbCount += 1;
|
||||
|
||||
qDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
||||
pMeta->pHbInfo->hbCount);
|
||||
tmsgSendReq(&epset, &msg);
|
||||
} else {
|
||||
qError("vgId:%d no mnd epset", pMeta->vgId);
|
||||
qDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
|
||||
}
|
||||
|
||||
taosArrayDestroy(hbMsg.pTaskStatus);
|
||||
|
@ -914,7 +925,9 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
|
|||
void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
|
||||
qDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
||||
pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
void* pIter = NULL;
|
||||
|
|
|
@ -277,7 +277,20 @@ static void freeUpstreamItem(void* p) {
|
|||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
|
||||
qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
|
||||
STaskExecStatisInfo* pStatis = &pTask->taskExecInfo;
|
||||
|
||||
qDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
|
||||
qDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||
" nextProcessVer:%" PRId64,
|
||||
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
||||
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer);
|
||||
|
||||
if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
|
||||
// remove the ref by timer
|
||||
while (pTask->status.timerActive > 0) {
|
||||
|
@ -396,7 +409,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
TdThreadMutexAttr attr = {0};
|
||||
int code = taosThreadMutexAttrInit(&attr);
|
||||
if (code != 0) {
|
||||
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
qError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -524,9 +537,8 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
taosMsleep(100);
|
||||
}
|
||||
|
||||
pTask->taskExecInfo.init = 0;
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el);
|
||||
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -556,9 +568,9 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
|||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||
STaskExecStatisInfo* p = &pTask->taskExecInfo;
|
||||
qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr,
|
||||
p->taskUpdateCount + 1, p->latestUpdateTs);
|
||||
p->updateCount + 1, p->latestUpdateTs);
|
||||
|
||||
p->taskUpdateCount += 1;
|
||||
p->updateCount += 1;
|
||||
p->latestUpdateTs = taosGetTimestampMs();
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||
|
@ -615,3 +627,25 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
|
|||
|
||||
return status;
|
||||
}
|
||||
|
||||
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) {
|
||||
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = vgId;
|
||||
pReq->taskId = pTaskId->taskId;
|
||||
pReq->streamId = pTaskId->streamId;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
|
||||
int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue