diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e7f2bf0a6..f916e05d52 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -19,7 +19,6 @@ #include "os.h" #include "streamMsg.h" #include "streamState.h" -#include "streamMsg.h" #include "tdatablock.h" #include "tdbInt.h" #include "tmsg.h" @@ -266,14 +265,14 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { - int64_t startTs; - int64_t checkpointId; // latest checkpoint id - int64_t checkpointVer; // latest checkpoint offset in wal - int64_t checkpointTime; // latest checkpoint time - int64_t processedVer; - int64_t nextProcessVer; // current offset in WAL, not serialize it - int64_t msgVer; - int32_t consensusTransId;// consensus checkpoint id + int64_t startTs; + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time + int64_t processedVer; + int64_t nextProcessVer; // current offset in WAL, not serialize it + int64_t msgVer; + int32_t consensusTransId; // consensus checkpoint id SActiveCheckpointInfo* pActiveInfo; } SCheckpointInfo; @@ -454,7 +453,8 @@ struct SStreamTask { SSHashObj* pNameMap; void* pBackend; int8_t subtableWithoutMd5; - char reserve[256]; + char* backendPath; + char reserve[256 - sizeof(char*)]; }; typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*); @@ -591,9 +591,9 @@ typedef struct STaskStatusEntry { int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - SVersionRange verRange; // start/end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - double inputQUsed; // in MiB + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + double inputQUsed; // in MiB double inputRate; double procsThroughput; // duration between one element put into input queue and being processed. double procsTotal; // duration between one element put into input queue and being processed. @@ -678,9 +678,9 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); // check downstream status -void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); -void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); -void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); +void streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); // fill-history task int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); @@ -717,8 +717,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) bool streamTaskIsSinkTask(const SStreamTask* pTask); void streamTaskSetRemoveBackendFiles(SStreamTask* pTask); -void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); -void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); +void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask); // source level @@ -812,9 +812,9 @@ void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp); int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask); -void streamMutexLock(TdThreadMutex *pMutex); -void streamMutexUnlock(TdThreadMutex *pMutex); -void streamMutexDestroy(TdThreadMutex *pMutex); +void streamMutexLock(TdThreadMutex* pMutex); +void streamMutexUnlock(TdThreadMutex* pMutex); +void streamMutexDestroy(TdThreadMutex* pMutex); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 314a6abdf5..a70a04f23d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,7 +14,10 @@ */ #include "tq.h" +#include "osDef.h" +#include "taoserror.h" #include "tqCommon.h" +#include "tstream.h" #include "vnd.h" // 0: not init @@ -153,7 +156,7 @@ void tqNotifyClose(STQ* pTq) { } void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { - int32_t code = 0; + int32_t code = 0; SMqPollReq req = {0}; code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req); if (code < 0) { @@ -169,7 +172,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } dataRsp.common.blockNum = 0; char buf[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -180,15 +183,15 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { tDeleteMqDataRsp(&dataRsp); } -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, - int32_t type, int32_t vgId) { +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type, + int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); char buf1[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); - (void) tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); + (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); + (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); @@ -200,7 +203,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t SMqVgOffset vgOffset = {0}; int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = 0; + int32_t code = 0; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { @@ -233,12 +236,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } // save the new offset value - if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){ + if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) { + if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, + msgLen - sizeof(vgOffset.consumerId)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -416,7 +420,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } char buf[TSDB_OFFSET_LEN] = {0}; - (void) tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); + (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); @@ -447,7 +451,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); STqOffset* pSavedOffset = NULL; - int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); + int32_t code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset); if (code != 0) { return TSDB_CODE_TMQ_NO_COMMITTED; } @@ -479,7 +483,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { - int32_t code = 0; + int32_t code = 0; SMqPollReq req = {0}; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); @@ -505,7 +509,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { consumerId, vgId, req.subKey, pHandle->consumerId); taosRUnLockLatch(&pTq->lock); return TSDB_CODE_TMQ_CONSUMER_MISMATCH; - } int64_t sver = 0, ever = 0; @@ -612,8 +615,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqCheckInfo info = {0}; - int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); - if(code != 0){ + int32_t code = tqMetaDecodeCheckInfo(&info, msg, msgLen); + if (code != 0) { return code; } @@ -650,7 +653,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosRLockLatch(&pTq->lock); STqHandle* pHandle = NULL; - (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); //ignore return code + (void)tqMetaGetHandle(pTq, req.subKey, &pHandle); // ignore return code taosRUnLockLatch(&pTq->lock); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -697,7 +700,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } - end: +end: tDecoderClear(&dc); return ret; } @@ -705,7 +708,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { - STQ* pTq = (STQ*) pTqObj; + STQ* pTq = (STQ*)pTqObj; int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); @@ -749,12 +752,12 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - (void) streamSetupScheduleTrigger(pTask); + (void)streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); - char* p = streamTaskGetStatus(pTask).name; + char* p = streamTaskGetStatus(pTask).name; const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); if (pTask->info.fillHistory) { @@ -766,14 +769,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); } else { - tqInfo( - "vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 - " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64 - " ms, inputVer:%" PRId64, - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, - (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); + tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x " + "delaySched:%" PRId64 " ms, inputVer:%" PRId64, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } @@ -781,8 +783,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV return 0; } -int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } +int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); @@ -803,13 +804,13 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, - pStep2Range->maxVer, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, + pStep2Range->minVer, pStep2Range->maxVer, 0.0); int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost. if (code) { qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code)); } - (void) streamExecTask(pTask); // exec directly + (void)streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 @@ -830,12 +831,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); - (void) streamTaskSetSchedStatusInactive(pTask); + (void)streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); if (code == TSDB_CODE_SUCCESS) { - (void) tqScanWalAsync(pTq, false); + (void)tqScanWalAsync(pTq, false); } } } @@ -846,9 +847,9 @@ int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) { SStreamMeta* pMeta = pStreamTask->pMeta; STaskId hId = pStreamTask->hTaskInfo.id; SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); + int32_t code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask); if (pTask == NULL) { - tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t) hId.taskId); + tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId); return TSDB_CODE_SUCCESS; } @@ -930,8 +931,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) { streamExecScanHistoryInFuture(pTask, retInfo.idleTime); } else { - SStreamTaskState p = streamTaskGetStatus(pTask); - ETaskStatus s = p.state; + SStreamTaskState p = streamTaskGetStatus(pTask); + ETaskStatus s = p.state; if (s == TASK_STATUS__PAUSE) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, @@ -963,7 +964,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); - return code; // todo: handle failure + return code; // todo: handle failure } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -988,7 +989,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // let's continue scan data in the wal files if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { - (void) tqScanWalAsync(pTq, false); // it's ok to failed + (void)tqScanWalAsync(pTq, false); // it's ok to failed } return code; @@ -1026,11 +1027,9 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); } -int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { - return 0; -} +int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { +int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1092,18 +1091,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } if (!pTq->pVnode->restored) { @@ -1111,9 +1110,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode } SStreamTask* pTask = NULL; @@ -1123,7 +1122,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1136,9 +1135,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // todo retry handle error + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; // todo retry handle error } // todo save the checkpoint failed info @@ -1154,14 +1153,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } } else { if (status != TASK_STATUS__HALT) { tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); -// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); } } @@ -1178,16 +1177,17 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; - } else { // checkpoint already finished, and not in checkpoint status + } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 - " transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId); + " transId:%d already handled, return success", + pTask->id.idStr, req.checkpointId, req.transId); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1198,7 +1198,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); if (code) { - qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, + tstrerror(code)); return code; } @@ -1215,7 +1216,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - (void) streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1228,7 +1229,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1249,7 +1250,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont; + SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont; if (!vnodeIsRoleLeader(pTq->pVnode)) { tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId, (int32_t)pReq->downstreamTaskId); @@ -1264,9 +1265,7 @@ int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) { } // this function is needed, do not try to remove it. -int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); -} +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f190673430..c531260682 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -14,6 +14,8 @@ */ #include "executor.h" +#include "osDir.h" +#include "osMemory.h" #include "streamInt.h" #include "streamsm.h" #include "tmisce.h" @@ -30,7 +32,7 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); pTask->info.selfChildId = childId; void* p = taosArrayPush(pArray, &pTask); - return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; + return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { @@ -42,7 +44,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp if (!isEqual) { (*pUpdated) = true; char tmp[512] = {0}; - (void) epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors + (void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors epsetAssign(&pTask->info.epSet, pEpSet); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); @@ -92,7 +94,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { } int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { + SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { *p = NULL; SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -224,17 +226,17 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->schedInfo.pDelayTimer != NULL) { - (void) taosTmrStop(pTask->schedInfo.pDelayTimer); + (void)taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->schedInfo.pDelayTimer = NULL; } if (pTask->hTaskInfo.pTimer != NULL) { - (void) taosTmrStop(pTask->hTaskInfo.pTimer); + (void)taosTmrStop(pTask->hTaskInfo.pTimer); pTask->hTaskInfo.pTimer = NULL; } if (pTask->msgInfo.pRetryTmr != NULL) { - (void) taosTmrStop(pTask->msgInfo.pRetryTmr); + (void)taosTmrStop(pTask->msgInfo.pRetryTmr); pTask->msgInfo.pRetryTmr = NULL; } @@ -321,10 +323,19 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); streamStateClose(pTask->pState, remove); - if (remove)taskDbSetClearFileFlag(pTask->pBackend); + if (remove) taskDbSetClearFileFlag(pTask->pBackend); + taskDbRemoveRef(pTask->pBackend); pTask->pBackend = NULL; pTask->pState = NULL; + } else { + if (remove) { + if (pTask->backendPath != NULL) { + taosRemoveDir(pTask->backendPath); + taosMemoryFree(pTask->backendPath); + pTask->backendPath = NULL; + } + } } } @@ -364,8 +375,36 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { } } +int32_t streamTaskSetBackendPath(SStreamTask* pTask) { + int64_t streamId = 0; + int32_t taskId = 0; + + if (pTask->info.fillHistory) { + streamId = pTask->hTaskInfo.id.taskId; + taskId = pTask->hTaskInfo.id.taskId; + } else { + streamId = pTask->streamTaskId.taskId; + taskId = pTask->streamTaskId.taskId; + } + + char id[128] = {0}; + int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId); + if (nBytes < 0 || nBytes >= sizeof(id)) { + return TSDB_CODE_OUT_OF_BUFFER; + } + + int32_t len = strlen(pTask->pMeta->path); + pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2); + if (pTask->backendPath == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); + + return 0; +} int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { - (void) createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); + (void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; @@ -459,8 +498,9 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } if (pTask->chkInfo.pActiveInfo == NULL) { - code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); } + code = streamTaskSetBackendPath(pTask); return code; } @@ -494,12 +534,12 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre } void* p = taosArrayPush(pTask->upstreamInfo.pList, &pEpInfo); - return (p == NULL)? TSDB_CODE_OUT_OF_MEMORY:TSDB_CODE_SUCCESS; + return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - (void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. + (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -510,7 +550,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); epsetAssign(&pInfo->epSet, pEpSet); stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, @@ -545,7 +585,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - (void) epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. + (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; @@ -564,7 +604,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (!isEqual) { *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); epsetAssign(&pVgInfo->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, @@ -584,7 +624,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE *pUpdated = true; char tmp[512] = {0}; - (void) epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + (void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); epsetAssign(&pDispatcher->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, @@ -919,7 +959,7 @@ STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) { static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { SStreamMeta* pMeta = pTask->pMeta; - int32_t code = 0; + int32_t code = 0; int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num); @@ -935,7 +975,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { } void streamTaskPause(SStreamTask* pTask) { - (void) streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); + (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); } void streamTaskResume(SStreamTask* pTask) { @@ -1142,13 +1182,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; if (pTriggerTmr->tmrHandle != NULL) { - (void) taosTmrStop(pTriggerTmr->tmrHandle); + (void)taosTmrStop(pTriggerTmr->tmrHandle); pTriggerTmr->tmrHandle = NULL; } SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; if (pReadyTmr->tmrHandle != NULL) { - (void) taosTmrStop(pReadyTmr->tmrHandle); + (void)taosTmrStop(pReadyTmr->tmrHandle); pReadyTmr->tmrHandle = NULL; } @@ -1185,4 +1225,4 @@ const char* streamTaskGetExecType(int32_t type) { default: return "invalid-exec-type"; } -} \ No newline at end of file +}