Merge pull request #22892 from taosdata/fix/liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2023-09-15 00:46:46 +08:00 committed by GitHub
commit 500fcad36c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 257 additions and 5938 deletions

View File

@ -400,14 +400,18 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
bool leader;
int8_t taskWillbeLaunched;
SRWLatch lock; SRWLatch lock;
// TdThreadRwlock lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
SMetaHbInfo hbInfo; SMetaHbInfo hbInfo;
int32_t closedTask; SHashObj* pUpdateTaskList;
// int32_t closedTask;
int32_t totalTasks; // this value should be increased when a new task is added into the meta int32_t totalTasks; // this value should be increased when a new task is added into the meta
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t rid; int64_t rid;
@ -660,6 +664,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
@ -714,10 +721,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaReopen(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 600; int32_t tsStreamCheckpointTickInterval = 30;
int32_t tsStreamNodeCheckInterval = 10; int32_t tsStreamNodeCheckInterval = 10;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10; int32_t tsTtlPushIntervalSec = 10;

View File

@ -65,9 +65,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessStreamHb(SRpcMsg *pReq); static int32_t mndProcessStreamHb(SRpcMsg *pReq);
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -1063,8 +1060,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
// return -1; // return -1;
// } // }
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) {
int64_t checkpointId) {
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
int32_t totLevel = taosArrayGetSize(pStream->tasks); int32_t totLevel = taosArrayGetSize(pStream->tasks);
@ -1088,7 +1084,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) { pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
@ -1109,7 +1105,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
} }
} }
pStream->checkpointId = checkpointId; pStream->checkpointId = chkptId;
pStream->checkpointFreq = taosGetTimestampMs(); pStream->checkpointFreq = taosGetTimestampMs();
pStream->currentTick = 0; pStream->currentTick = 0;
// 3. commit log: stream checkpoint info // 3. commit log: stream checkpoint info
@ -1890,6 +1886,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1; return -1;
} }
@ -1898,6 +1895,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1; return -1;
} }
@ -1915,6 +1913,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
*pBuf = buf; *pBuf = buf;
*pLen = tlen; *pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2327,65 +2326,5 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execNodeList.lock); taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(req.pTaskStatus); taosArrayDestroy(req.pTaskStatus);
// bool nodeChanged = false;
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
/*
// record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
int64_t duration = now - pEntry->hbTimestamp;
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
taosArrayPush(pList, &pEntry);
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
continue;
}
if (pEntry->nodeId != req.vgId) {
continue;
}
pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes.
// node the epset is changed, which means the node transfer has occurred for this node.
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
// nodeChanged = true;
// break;
// }
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
// if (code != TSDB_CODE_SUCCESS) {
// todo
//// }
// }
}
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -186,7 +186,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);
void *tsdbGetIdx2(SMeta *pMeta); void *tsdbGetIdx2(SMeta *pMeta);
void *tsdbGetIvtIdx2(SMeta *pMeta); void *tsdbGetIvtIdx2(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
void tsdbReaderSetCloseFlag2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
//====================================================================================================================== //======================================================================================================================

View File

@ -174,7 +174,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq); void tqUpdateNodeStage(STQ* pTq, bool isLeader);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -302,12 +302,11 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer); int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(void *arg); int32_t tsdbMerge(void *arg);
@ -830,7 +829,6 @@ bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost); void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);

View File

@ -839,11 +839,21 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} }
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 if (pTask->info.fillHistory) {
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, " nextProcessVer:%" PRId64
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
pTask->info.fillHistory, pTask->info.triggerParam); vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam);
}
return 0; return 0;
} }
@ -914,7 +924,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed", tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId); rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1; return -1;
@ -1033,8 +1043,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
TASK_SCHED_STATUS__WAITING);
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
tqError( tqError(
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected " "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
@ -1051,9 +1060,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamScanHistoryData(pTask); streamScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, int8_t status = streamTaskSetSchedStatusInActive(pTask);
TASK_SCHED_STATUS__INACTIVE); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
@ -1093,8 +1101,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// now we can stop the stream task execution // now we can stop the stream task execution
int64_t latestVer = 0; int64_t latestVer = 0;
taosThreadMutexLock(&pStreamTask->lock); taosThreadMutexLock(&pStreamTask->lock);
streamTaskHalt(pStreamTask); streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
@ -1128,7 +1136,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
// set the fill-history task to be normal // set the fill-history task to be normal
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
@ -1170,44 +1178,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SStreamTransferReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId,
req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed",
req.downstreamTaskId);
return -1;
}
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// only the agg tasks and the sink tasks will receive this message from upstream tasks // only the agg tasks and the sink tasks will receive this message from upstream tasks
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -1295,9 +1265,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
pTask->chkInfo.nextProcessVer); pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(st), pTask->status.schedStatus); pTask->id.idStr, streamGetTaskStatusStr(st), status);
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@ -1581,6 +1551,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = 0; int32_t code = 0;
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader node, ignore checkpoint-source msg", vgId);
return TSDB_CODE_SUCCESS;
}
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1686,9 +1660,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR; rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
goto _end; tDecoderClear(&decoder);
return rsp.code;
} }
tDecoderClear(&decoder);
// update the nodeEpset when it exists // update the nodeEpset when it exists
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -1701,7 +1678,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
req.taskId); req.taskId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
goto _end; taosArrayDestroy(req.pNodeList);
return rsp.code;
} }
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
@ -1741,58 +1719,62 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamTaskStop(*ppHTask); streamTaskStop(*ppHTask);
} }
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0);
pMeta->closedTask += 1;
if (ppHTask != NULL) { if (ppHTask != NULL) {
pMeta->closedTask += 1; tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
} }
rsp.code = 0;
// possibly only handle the stream task. // possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
bool allStopped = (pMeta->closedTask == numOfTasks); int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList);
if (allStopped) { if (updateTasks < numOfTasks) {
pMeta->closedTask = 0; pMeta->taskWillbeLaunched = 1;
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock);
} else { } else {
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask)); taosHashClear(pMeta->pUpdateTaskList);
}
taosWUnLockLatch(&pMeta->lock);
_end:
tDecoderClear(&decoder);
if (allStopped) {
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
} else { } else {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId); tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
terrno = 0; terrno = 0;
int32_t code = streamMetaReopen(pMeta, 0); int32_t code = streamMetaReopen(pMeta);
if (code != 0) { if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId); tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return -1; return -1;
} }
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId); tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return -1; return -1;
} }
taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId); vInfo("vgId:%d, restart all stream tasks", vgId);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq); tqCheckAndRunStreamTaskAsync(pTq);
} else {
vInfo("vgId:%d, follower node not start stream tasks", vgId);
} }
pMeta->taskWillbeLaunched = 0;
taosWUnLockLatch(&pMeta->lock);
} }
} }
taosArrayDestroy(req.pNodeList);
return rsp.code; return rsp.code;
} }

View File

@ -168,7 +168,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
if (code == 0) { if (code == 0) {
code = streamStateLoadTasks(pWriter); code = streamStateLoadTasks(pWriter);
} }

View File

@ -111,12 +111,12 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock); // taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId); tqDebug("vgId:%d no stream tasks existed to run", vgId);
taosWUnLockLatch(&pMeta->lock); // taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
@ -124,7 +124,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock); // taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
} }
@ -135,7 +135,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock); // taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
@ -201,8 +201,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -232,14 +231,12 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
@ -247,12 +244,11 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
int8_t status = (*pTask)->status.taskStatus; int8_t status = (*pTask)->status.taskStatus;
if (status == TASK_STATUS__STOP) { if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
streamSetStatusNormal(*pTask); streamSetStatusNormal(*pTask);
} }
} }
taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }
@ -314,7 +310,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */ streamSchedExec(pTask); /*int32_t code = */streamSchedExec(pTask);
} else { } else {
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer); id, ver, maxVer);

View File

@ -36,10 +36,15 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
return 0; return 0;
} }
void tqUpdateNodeStage(STQ* pTq) { void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
SSyncState state = syncGetState(pTq->pVnode->sync); SSyncState state = syncGetState(pTq->pVnode->sync);
pTq->pStreamMeta->stage = state.term; SStreamMeta* pMeta = pTq->pStreamMeta;
tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); tqDebug("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader);
pMeta->stage = state.term;
pMeta->leader = isLeader;
if (isLeader) {
streamMetaStartHb(pMeta);
}
} }
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {

View File

@ -22,38 +22,6 @@
static void tLDataIterClose2(SLDataIter *pIter); static void tLDataIterClose2(SLDataIter *pIter);
// SLDataIter ================================================= // SLDataIter =================================================
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < numOfSttTrigger; ++i) {
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
pLoadInfo[i].currentLoadBlockIndex = 1;
int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
if (code) {
terrno = code;
}
code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
if (code) {
terrno = code;
}
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
pLoadInfo[i].pSchema = pSchema;
pLoadInfo[i].colIds = colList;
pLoadInfo[i].numOfCols = numOfCols;
}
return pLoadInfo;
}
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
@ -83,25 +51,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
return pLoadInfo; return pLoadInfo;
} }
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < 1; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
taosArrayClear(pLoadInfo[i].aSttBlk);
pLoadInfo[i].cost.loadBlocks = 0;
pLoadInfo[i].cost.blockElapsedTime = 0;
pLoadInfo[i].cost.statisElapsedTime = 0;
pLoadInfo[i].cost.loadStatisBlocks = 0;
pLoadInfo[i].statisBlockIndex = -1;
tStatisBlockDestroy(pLoadInfo[i].statisBlock);
pLoadInfo[i].sttBlockLoaded = false;
}
}
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) { void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime; pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
@ -309,12 +258,6 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
} }
} }
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
return 0;
}
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo, static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
uint64_t suid) { uint64_t suid) {
if (TARRAY2_SIZE(pArray) <= 0) { if (TARRAY2_SIZE(pArray) <= 0) {
@ -767,50 +710,6 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
return -1 * tLDataIterCmprFn(p1, p2); return -1 * tLDataIterCmprFn(p1, p2);
} }
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->idStr = idStr;
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
}
}
}
return code;
_end:
tMergeTreeClose(pMTree);
return code;
}
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;

File diff suppressed because it is too large Load Diff

View File

@ -452,6 +452,9 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
const TBrinBlkArray* pBlkArray = NULL; const TBrinBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadBrinBlk(pFileReader, &pBlkArray); int32_t code = tsdbDataFileReadBrinBlk(pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
#if 0 #if 0
LRUHandle* handle = NULL; LRUHandle* handle = NULL;
@ -2760,6 +2763,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
goto _end; goto _end;
} }
#if 0
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
@ -2788,6 +2792,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
// pReader->rowsNum += block.nRow; // pReader->rowsNum += block.nRow;
// } // }
} }
#endif
_end: _end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
@ -4453,7 +4458,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
// do fill all null column value SMA info // do fill all null column value SMA info
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t)TARRAY2_SIZE(&pSup->colAggArray); int32_t size = (int32_t)TARRAY2_SIZE(&pSup->colAggArray);
TARRAY2_INSERT_PTR(&pSup->colAggArray, 0, pTsAgg); int32_t code = TARRAY2_INSERT_PTR(&pSup->colAggArray, 0, pTsAgg);
if (code != TSDB_CODE_SUCCESS) {
return;
}
size++; size++;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
@ -4466,7 +4475,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg); code = TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
if (code != TSDB_CODE_SUCCESS) {
return;
}
i += 1; i += 1;
size++; size++;
} }
@ -4477,7 +4490,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
while (j < numOfCols) { while (j < numOfCols) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg); code = TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
if (code != TSDB_CODE_SUCCESS) {
return;
}
i += 1; i += 1;
} }
j++; j++;
@ -4835,7 +4852,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
return rows; return rows;
} }
int32_t tsdbGetTableSchema2(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
@ -4970,4 +4987,4 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr; pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr;
} }
void tsdbReaderSetCloseFlag2(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }

View File

@ -549,23 +549,34 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm)); ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
walApplyVer(pVnode->pWal, commitIdx); walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
if (vnodeIsRoleLeader(pVnode)) { if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, start to launch stream tasks", vgId); vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
return;
}
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
return;
}
if (vnodeIsRoleLeader(pVnode)) {
// start to restore all stream tasks // start to restore all stream tasks
if (tsDisableStream) { if (tsDisableStream) {
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq); tqStartStreamTasks(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq);
} }
} else { } else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
} }
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
} }
static void vnodeBecomeFollower(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
@ -580,7 +591,10 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
tqStopStreamTasks(pVnode->pTq); if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq, false);
tqStopStreamTasks(pVnode->pTq);
}
} }
static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
@ -599,7 +613,7 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
static void vnodeBecomeLeader(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
if (pVnode->pTq) { if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq); tqUpdateNodeStage(pVnode->pTq, true);
} }
vDebug("vgId:%d, become leader", pVnode->config.vgId); vDebug("vgId:%d, become leader", pVnode->config.vgId);
} }

View File

@ -6997,8 +6997,8 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
strcpy(col->tableAlias, pTable); tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
strcpy(col->colName, pMeta->schema[0].name); tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName));
SNodeList* pParamterList = nodesMakeList(); SNodeList* pParamterList = nodesMakeList();
if (NULL == pParamterList) { if (NULL == pParamterList) {
nodesDestroyNode((SNode*)col); nodesDestroyNode((SNode*)col);

View File

@ -108,14 +108,12 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
} }
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
return -1; return -1;
} }
@ -256,8 +254,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask);
int8_t schedStatus = streamTaskSetSchedStatusWait(pTask);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
streamTryExec(pTask);
}
return 0; return 0;
} }

View File

@ -358,18 +358,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// 3. clear the link between fill-history task and stream task info // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
pStreamTask->historyTaskId.taskId = 0;
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet. // pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 5. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
// 6. save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
@ -509,7 +509,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
code = streamTransferStateToStreamTask(pTask); code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
} }
} else { } else {
qDebug("s-task:%s sink task does not transfer state", id); qDebug("s-task:%s sink task does not transfer state", id);
@ -615,25 +615,28 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
int32_t streamTryExec(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required. // this function may be executed by multi-threads, so status check is required.
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask); while (1) {
if (code < 0) { // todo this status shoudl be removed int32_t code = streamExecForAll(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); if (code < 0) { // todo this status shoudl be removed
return -1; atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
} return -1;
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexLock(&pTask->lock);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), if (taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) ||
pTask->status.schedStatus); streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);
if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamTaskShouldPause(&pTask->status))) { streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
streamSchedExec(pTask); return 0;
}
taosThreadMutexUnlock(&pTask->lock);
} }
} else { } else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,

View File

@ -139,6 +139,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->pUpdateTaskList == NULL) {
goto _err;
}
// task list // task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) { if (pMeta->pTaskList == NULL) {
@ -201,16 +206,13 @@ _err:
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
// taosThreadMutexDestroy(&pMeta->backendMutex);
// taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
qError("failed to open stream meta"); qError("failed to open stream meta");
return NULL; return NULL;
} }
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { int32_t streamMetaReopen(SStreamMeta* pMeta) {
streamMetaClear(pMeta); streamMetaClear(pMeta);
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;
@ -315,6 +317,7 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pUpdateTaskList);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex); taosThreadMutexDestroy(&pMeta->backendMutex);
@ -688,7 +691,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
streamTaskResetUpstreamStageInfo(pTask);
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
doClear(pKey, pVal, pCur, pRecycleList); doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
@ -701,6 +703,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
} }
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum); qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey); tdbFree(pKey);
@ -757,9 +760,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
return 0; return 0;
} }
static bool readyToSendHb(SMetaHbInfo* pInfo) { static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
// reset the counter
pInfo->tickCounter = 0; pInfo->tickCounter = 0;
return true; return true;
} }
@ -783,12 +785,21 @@ void metaHbToMnode(void* param, void* tmrId) {
return; return;
} }
if (!readyToSendHb(&pMeta->hbInfo)) { // not leader not send msg
if (!pMeta->leader) {
qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
if (!enoughTimeDuration(&pMeta->hbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
qInfo("vgId:%d start hb", pMeta->vgId);
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -810,7 +821,7 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry); taosArrayPush(hbMsg.pTaskStatus, &entry);
if (i == 0) { if (!hasValEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset); epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true; hasValEpset = true;
} }
@ -856,6 +867,8 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
} else {
qError("vgId:%d no mnd epset", pMeta->vgId);
} }
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
@ -906,10 +919,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; if (pMeta->leader) {
while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
taosMsleep(100); while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); taosMsleep(100);
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
} }
qDebug("vgId:%d start to check all tasks", vgId); qDebug("vgId:%d start to check all tasks", vgId);
@ -923,3 +938,10 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
} }
void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid);
*pRid = pMeta->rid;
metaHbToMnode(pRid, NULL);
}

View File

@ -395,7 +395,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) {
if (cap < 100 || rate < 50 || pBucket == NULL) { if (cap < 50 || rate < 50 || pBucket == NULL) {
qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate); qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -480,7 +480,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
// execute in the scan history complete call back msg, ready to process data from inputQ // execute in the scan history complete call back msg, ready to process data from inputQ
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamTaskSetSchedStatusInActive(pTask);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);

View File

@ -332,7 +332,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
taosThreadMutexDestroy(&pTask->lock);
if (pTask->msgInfo.pData != NULL) { if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData); destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL; pTask->msgInfo.pData = NULL;
@ -385,8 +384,22 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->dataRange.range.minVer = ver; pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb; pTask->pMsgCb = pMsgCb;
streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100); streamTaskInitTokenBucket(&pTask->tokenBucket, 50, 50);
taosThreadMutexInit(&pTask->lock, NULL);
TdThreadMutexAttr attr = {0};
int ret = taosThreadMutexAttrInit(&attr);
if (ret != 0) {
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
}
ret = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
if (ret != 0) {
qError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(ret));
return ret;
}
taosThreadMutexInit(&pTask->lock, &attr);
streamTaskOpenAllUpstreamInput(pTask); streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -553,3 +566,36 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
} }
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__WAITING) {
pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE ||
status == TASK_SCHED_STATUS__INACTIVE);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
taosThreadMutexUnlock(&pTask->lock);
return status;
}