remove backend data file

This commit is contained in:
yihaoDeng 2024-08-16 18:25:10 +08:00
parent 0e9bfa01c2
commit 3e928fc8fc
3 changed files with 150 additions and 111 deletions

View File

@ -19,7 +19,6 @@
#include "os.h" #include "os.h"
#include "streamMsg.h" #include "streamMsg.h"
#include "streamState.h" #include "streamState.h"
#include "streamMsg.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
@ -266,14 +265,14 @@ typedef struct SStreamTaskId {
} SStreamTaskId; } SStreamTaskId;
typedef struct SCheckpointInfo { typedef struct SCheckpointInfo {
int64_t startTs; int64_t startTs;
int64_t checkpointId; // latest checkpoint id int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time int64_t checkpointTime; // latest checkpoint time
int64_t processedVer; int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t msgVer; int64_t msgVer;
int32_t consensusTransId;// consensus checkpoint id int32_t consensusTransId; // consensus checkpoint id
SActiveCheckpointInfo* pActiveInfo; SActiveCheckpointInfo* pActiveInfo;
} SCheckpointInfo; } SCheckpointInfo;
@ -454,7 +453,8 @@ struct SStreamTask {
SSHashObj* pNameMap; SSHashObj* pNameMap;
void* pBackend; void* pBackend;
int8_t subtableWithoutMd5; int8_t subtableWithoutMd5;
char reserve[256]; char* backendPath;
char reserve[256 - sizeof(char*)];
}; };
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
@ -591,9 +591,9 @@ typedef struct STaskStatusEntry {
int32_t statusLastDuration; // to record the last duration of current status int32_t statusLastDuration; // to record the last duration of current status
int64_t stage; int64_t stage;
int32_t nodeId; int32_t nodeId;
SVersionRange verRange; // start/end version in WAL, only valid for source task SVersionRange verRange; // start/end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
double inputQUsed; // in MiB double inputQUsed; // in MiB
double inputRate; double inputRate;
double procsThroughput; // duration between one element put into input queue and being processed. double procsThroughput; // duration between one element put into input queue and being processed.
double procsTotal; // duration between one element put into input queue and being processed. double procsTotal; // duration between one element put into input queue and being processed.
@ -678,9 +678,9 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
// check downstream status // check downstream status
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); void streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);
// fill-history task // fill-history task
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
@ -717,8 +717,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); void streamTaskSetRemoveBackendFiles(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask);
// source level // source level
@ -812,9 +812,9 @@ void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask);
void streamMutexLock(TdThreadMutex *pMutex); void streamMutexLock(TdThreadMutex* pMutex);
void streamMutexUnlock(TdThreadMutex *pMutex); void streamMutexUnlock(TdThreadMutex* pMutex);
void streamMutexDestroy(TdThreadMutex *pMutex); void streamMutexDestroy(TdThreadMutex* pMutex);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -14,7 +14,10 @@
*/ */
#include "tq.h" #include "tq.h"
#include "osDef.h"
#include "taoserror.h"
#include "tqCommon.h" #include "tqCommon.h"
#include "tstream.h"
#include "vnd.h" #include "vnd.h"
// 0: not init // 0: not init
@ -153,7 +156,7 @@ void tqNotifyClose(STQ* pTq) {
} }
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
int32_t code = 0; int32_t code = 0;
SMqPollReq req = {0}; SMqPollReq req = {0};
code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req); code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
if (code < 0) { if (code < 0) {
@ -169,7 +172,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
} }
dataRsp.common.blockNum = 0; dataRsp.common.blockNum = 0;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf,
req.reqId); req.reqId);
@ -180,15 +183,15 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
} }
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type,
int32_t type, int32_t vgId) { int32_t vgId) {
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
char buf1[TSDB_OFFSET_LEN] = {0}; char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset);
(void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset);
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId);
@ -200,7 +203,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
SMqVgOffset vgOffset = {0}; SMqVgOffset vgOffset = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0; int32_t code = 0;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
@ -233,12 +236,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
// save the new offset value // save the new offset value
if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){ if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) { if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
msgLen - sizeof(vgOffset.consumerId)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
@ -416,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
(void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
@ -447,7 +451,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
STqOffset* pSavedOffset = NULL; STqOffset* pSavedOffset = NULL;
int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
if (code != 0) { if (code != 0) {
return TSDB_CODE_TMQ_NO_COMMITTED; return TSDB_CODE_TMQ_NO_COMMITTED;
} }
@ -479,7 +483,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = 0; int32_t code = 0;
SMqPollReq req = {0}; SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
@ -505,7 +509,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
consumerId, vgId, req.subKey, pHandle->consumerId); consumerId, vgId, req.subKey, pHandle->consumerId);
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
return TSDB_CODE_TMQ_CONSUMER_MISMATCH; return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
} }
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
@ -612,8 +615,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0}; STqCheckInfo info = {0};
int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen);
if(code != 0){ if (code != 0) {
return code; return code;
} }
@ -650,7 +653,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosRLockLatch(&pTq->lock); taosRLockLatch(&pTq->lock);
STqHandle* pHandle = NULL; STqHandle* pHandle = NULL;
(void)tqMetaGetHandle(pTq, req.subKey, &pHandle); //ignore return code (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); // ignore return code
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
@ -697,7 +700,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
} }
end: end:
tDecoderClear(&dc); tDecoderClear(&dc);
return ret; return ret;
} }
@ -705,7 +708,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*) pTqObj; STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId); tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
@ -749,12 +752,12 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
} }
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
(void) streamSetupScheduleTrigger(pTask); (void)streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask).name; char* p = streamTaskGetStatus(pTask).name;
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -766,14 +769,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
} else { } else {
tqInfo( tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
"vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64 "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
" ms, inputVer:%" PRId64, vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
} }
@ -781,8 +783,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
return 0; return 0;
} }
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
@ -803,13 +804,13 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
pTask->execInfo.step2Start = taosGetTimestampMs(); pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) { if (done) {
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
pStep2Range->maxVer, 0.0); pStep2Range->minVer, pStep2Range->maxVer, 0.0);
int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
if (code) { if (code) {
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
} }
(void) streamExecTask(pTask); // exec directly (void)streamExecTask(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
@ -830,12 +831,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
(void) streamTaskSetSchedStatusInactive(pTask); (void)streamTaskSetSchedStatusInactive(pTask);
// now the fill-history task starts to scan data from wal files. // now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
(void) tqScanWalAsync(pTq, false); (void)tqScanWalAsync(pTq, false);
} }
} }
} }
@ -846,9 +847,9 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
SStreamMeta* pMeta = pStreamTask->pMeta; SStreamMeta* pMeta = pStreamTask->pMeta;
STaskId hId = pStreamTask->hTaskInfo.id; STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId); tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -930,8 +931,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (retInfo.ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamExecScanHistoryInFuture(pTask, retInfo.idleTime); streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
} else { } else {
SStreamTaskState p = streamTaskGetStatus(pTask); SStreamTaskState p = streamTaskGetStatus(pTask);
ETaskStatus s = p.state; ETaskStatus s = p.state;
if (s == TASK_STATUS__PAUSE) { if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
@ -963,7 +964,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; // todo: handle failure return code; // todo: handle failure
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
@ -988,7 +989,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// let's continue scan data in the wal files // let's continue scan data in the wal files
if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) {
(void) tqScanWalAsync(pTq, false); // it's ok to failed (void)tqScanWalAsync(pTq, false); // it's ok to failed
} }
return code; return code;
@ -1026,11 +1027,9 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
} }
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
return 0;
}
int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -1092,18 +1091,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode
} }
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
@ -1111,9 +1110,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it", ", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId); vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode
} }
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
@ -1123,7 +1122,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed", " transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId); vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1136,9 +1135,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; // todo retry handle error return TSDB_CODE_SUCCESS; // todo retry handle error
} }
// todo save the checkpoint failed info // todo save the checkpoint failed info
@ -1154,14 +1153,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { } else {
if (status != TASK_STATUS__HALT) { if (status != TASK_STATUS__HALT) {
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
} }
} }
@ -1178,16 +1177,17 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // checkpoint already finished, and not in checkpoint status } else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) { if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); " transId:%d already handled, return success",
pTask->id.idStr, req.checkpointId, req.transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1198,7 +1198,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code) { if (code) {
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
tstrerror(code));
return code; return code;
} }
@ -1215,7 +1216,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0}; SRpcMsg rsp = {0};
(void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1228,7 +1229,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId); (int32_t)pReq->downstreamTaskId);
@ -1249,7 +1250,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId); (int32_t)pReq->downstreamTaskId);
@ -1264,9 +1265,7 @@ int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
// this function is needed, do not try to remove it. // this function is needed, do not try to remove it.
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);

View File

@ -14,6 +14,8 @@
*/ */
#include "executor.h" #include "executor.h"
#include "osDir.h"
#include "osMemory.h"
#include "streamInt.h" #include "streamInt.h"
#include "streamsm.h" #include "streamsm.h"
#include "tmisce.h" #include "tmisce.h"
@ -30,7 +32,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray); int32_t childId = taosArrayGetSize(pArray);
pTask->info.selfChildId = childId; pTask->info.selfChildId = childId;
void* p = taosArrayPush(pArray, &pTask); void* p = taosArrayPush(pArray, &pTask);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
} }
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) {
@ -42,7 +44,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
if (!isEqual) { if (!isEqual) {
(*pUpdated) = true; (*pUpdated) = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors (void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors
epsetAssign(&pTask->info.epSet, pEpSet); epsetAssign(&pTask->info.epSet, pEpSet);
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp);
@ -92,7 +94,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
} }
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) {
*p = NULL; *p = NULL;
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -224,17 +226,17 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
if (pTask->schedInfo.pDelayTimer != NULL) { if (pTask->schedInfo.pDelayTimer != NULL) {
(void) taosTmrStop(pTask->schedInfo.pDelayTimer); (void)taosTmrStop(pTask->schedInfo.pDelayTimer);
pTask->schedInfo.pDelayTimer = NULL; pTask->schedInfo.pDelayTimer = NULL;
} }
if (pTask->hTaskInfo.pTimer != NULL) { if (pTask->hTaskInfo.pTimer != NULL) {
(void) taosTmrStop(pTask->hTaskInfo.pTimer); (void)taosTmrStop(pTask->hTaskInfo.pTimer);
pTask->hTaskInfo.pTimer = NULL; pTask->hTaskInfo.pTimer = NULL;
} }
if (pTask->msgInfo.pRetryTmr != NULL) { if (pTask->msgInfo.pRetryTmr != NULL) {
(void) taosTmrStop(pTask->msgInfo.pRetryTmr); (void)taosTmrStop(pTask->msgInfo.pRetryTmr);
pTask->msgInfo.pRetryTmr = NULL; pTask->msgInfo.pRetryTmr = NULL;
} }
@ -321,10 +323,19 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
stDebug("s-task:0x%x start to free task state", pTask->id.taskId); stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
streamStateClose(pTask->pState, remove); streamStateClose(pTask->pState, remove);
if (remove)taskDbSetClearFileFlag(pTask->pBackend); if (remove) taskDbSetClearFileFlag(pTask->pBackend);
taskDbRemoveRef(pTask->pBackend); taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL; pTask->pBackend = NULL;
pTask->pState = NULL; pTask->pState = NULL;
} else {
if (remove) {
if (pTask->backendPath != NULL) {
taosRemoveDir(pTask->backendPath);
taosMemoryFree(pTask->backendPath);
pTask->backendPath = NULL;
}
}
} }
} }
@ -364,8 +375,36 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
} }
} }
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
int64_t streamId = 0;
int32_t taskId = 0;
if (pTask->info.fillHistory) {
streamId = pTask->hTaskInfo.id.taskId;
taskId = pTask->hTaskInfo.id.taskId;
} else {
streamId = pTask->streamTaskId.taskId;
taskId = pTask->streamTaskId.taskId;
}
char id[128] = {0};
int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
if (nBytes < 0 || nBytes >= sizeof(id)) {
return TSDB_CODE_OUT_OF_BUFFER;
}
int32_t len = strlen(pTask->pMeta->path);
pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
if (pTask->backendPath == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
return 0;
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
(void) createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); (void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
@ -459,8 +498,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
} }
if (pTask->chkInfo.pActiveInfo == NULL) { if (pTask->chkInfo.pActiveInfo == NULL) {
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
} }
code = streamTaskSetBackendPath(pTask);
return code; return code;
} }
@ -494,12 +534,12 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
} }
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo); void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo);
return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
} }
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file.
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) { for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -510,7 +550,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pInfo->epSet, pEpSet); epsetAssign(&pInfo->epSet, pEpSet);
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId,
@ -545,7 +585,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) {
char buf[512] = {0}; char buf[512] = {0};
(void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files.
int32_t id = pTask->id.taskId; int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type; int8_t type = pTask->outputInfo.type;
@ -564,7 +604,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
if (!isEqual) { if (!isEqual) {
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp));
epsetAssign(&pVgInfo->epSet, pEpSet); epsetAssign(&pVgInfo->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId,
@ -584,7 +624,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
*pUpdated = true; *pUpdated = true;
char tmp[512] = {0}; char tmp[512] = {0};
(void) epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); (void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp));
epsetAssign(&pDispatcher->epSet, pEpSet); epsetAssign(&pDispatcher->epSet, pEpSet);
stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId,
@ -919,7 +959,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t code = 0; int32_t code = 0;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
@ -935,7 +975,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
} }
void streamTaskPause(SStreamTask* pTask) { void streamTaskPause(SStreamTask* pTask) {
(void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
} }
void streamTaskResume(SStreamTask* pTask) { void streamTaskResume(SStreamTask* pTask) {
@ -1142,13 +1182,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
if (pTriggerTmr->tmrHandle != NULL) { if (pTriggerTmr->tmrHandle != NULL) {
(void) taosTmrStop(pTriggerTmr->tmrHandle); (void)taosTmrStop(pTriggerTmr->tmrHandle);
pTriggerTmr->tmrHandle = NULL; pTriggerTmr->tmrHandle = NULL;
} }
SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
if (pReadyTmr->tmrHandle != NULL) { if (pReadyTmr->tmrHandle != NULL) {
(void) taosTmrStop(pReadyTmr->tmrHandle); (void)taosTmrStop(pReadyTmr->tmrHandle);
pReadyTmr->tmrHandle = NULL; pReadyTmr->tmrHandle = NULL;
} }