commit
51632d2b37
|
@ -19,7 +19,6 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "streamMsg.h"
|
#include "streamMsg.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
#include "streamMsg.h"
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdbInt.h"
|
#include "tdbInt.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -455,6 +454,7 @@ struct SStreamTask {
|
||||||
void* pBackend;
|
void* pBackend;
|
||||||
int8_t subtableWithoutMd5;
|
int8_t subtableWithoutMd5;
|
||||||
char reserve[256];
|
char reserve[256];
|
||||||
|
char* backendPath;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
|
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
|
||||||
|
|
|
@ -14,7 +14,10 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
#include "osDef.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "tqCommon.h"
|
#include "tqCommon.h"
|
||||||
|
#include "tstream.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
// 0: not init
|
// 0: not init
|
||||||
|
@ -180,8 +183,8 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
||||||
tDeleteMqDataRsp(&dataRsp);
|
tDeleteMqDataRsp(&dataRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp,
|
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type,
|
||||||
int32_t type, int32_t vgId) {
|
int32_t vgId) {
|
||||||
int64_t sver = 0, ever = 0;
|
int64_t sver = 0, ever = 0;
|
||||||
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
||||||
|
|
||||||
|
@ -238,7 +241,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) {
|
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
|
||||||
|
msgLen - sizeof(vgOffset.consumerId)) < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -505,7 +509,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
consumerId, vgId, req.subKey, pHandle->consumerId);
|
consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sver = 0, ever = 0;
|
int64_t sver = 0, ever = 0;
|
||||||
|
@ -766,11 +769,10 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
||||||
} else {
|
} else {
|
||||||
tqInfo(
|
tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||||
"vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
|
||||||
" nextProcessVer:%" PRId64
|
" nextProcessVer:%" PRId64
|
||||||
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64
|
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
|
||||||
" ms, inputVer:%" PRId64,
|
"delaySched:%" PRId64 " ms, inputVer:%" PRId64,
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
||||||
|
@ -781,8 +783,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
|
||||||
return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
|
|
||||||
|
|
||||||
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||||
|
@ -803,8 +804,8 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer,
|
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
|
||||||
pStep2Range->maxVer, 0.0);
|
pStep2Range->minVer, pStep2Range->maxVer, 0.0);
|
||||||
int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
|
int32_t code = streamTaskPutTranstateIntoInputQ(pTask); // todo: msg lost.
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
|
qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
|
||||||
|
@ -1026,9 +1027,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
|
return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
char* msgStr = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
|
@ -1181,7 +1180,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
} else { // checkpoint already finished, and not in checkpoint status
|
} else { // checkpoint already finished, and not in checkpoint status
|
||||||
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
|
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
|
||||||
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
|
||||||
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
|
" transId:%d already handled, return success",
|
||||||
|
pTask->id.idStr, req.checkpointId, req.transId);
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1198,7 +1198,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1264,9 +1265,7 @@ int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function is needed, do not try to remove it.
|
// this function is needed, do not try to remove it.
|
||||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
|
||||||
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
|
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
|
#include "osDir.h"
|
||||||
|
#include "osMemory.h"
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "streamsm.h"
|
#include "streamsm.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
|
@ -296,15 +298,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||||
pTask->outputInfo.pNodeEpsetUpdateList = NULL;
|
pTask->outputInfo.pNodeEpsetUpdateList = NULL;
|
||||||
|
|
||||||
// if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
|
|
||||||
// char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
|
|
||||||
// sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
|
|
||||||
// taosRemoveDir(path);
|
|
||||||
|
|
||||||
// stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
|
|
||||||
// taosMemoryFree(path);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (pTask->id.idStr != NULL) {
|
if (pTask->id.idStr != NULL) {
|
||||||
taosMemoryFree((void*)pTask->id.idStr);
|
taosMemoryFree((void*)pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -325,6 +318,17 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
|
||||||
taskDbRemoveRef(pTask->pBackend);
|
taskDbRemoveRef(pTask->pBackend);
|
||||||
pTask->pBackend = NULL;
|
pTask->pBackend = NULL;
|
||||||
pTask->pState = NULL;
|
pTask->pState = NULL;
|
||||||
|
} else {
|
||||||
|
if (remove) {
|
||||||
|
if (pTask->backendPath != NULL) {
|
||||||
|
taosRemoveDir(pTask->backendPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->backendPath != NULL) {
|
||||||
|
taosMemoryFree(pTask->backendPath);
|
||||||
|
pTask->backendPath = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,6 +368,34 @@ static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
||||||
|
int64_t streamId = 0;
|
||||||
|
int32_t taskId = 0;
|
||||||
|
|
||||||
|
if (pTask->info.fillHistory) {
|
||||||
|
streamId = pTask->hTaskInfo.id.taskId;
|
||||||
|
taskId = pTask->hTaskInfo.id.taskId;
|
||||||
|
} else {
|
||||||
|
streamId = pTask->streamTaskId.taskId;
|
||||||
|
taskId = pTask->streamTaskId.taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
char id[128] = {0};
|
||||||
|
int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||||
|
if (nBytes < 0 || nBytes >= sizeof(id)) {
|
||||||
|
return TSDB_CODE_OUT_OF_BUFFER;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = strlen(pTask->pMeta->path);
|
||||||
|
pTask->backendPath = (char*)taosMemoryMalloc(len + nBytes + 2);
|
||||||
|
if (pTask->backendPath == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
||||||
(void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
(void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
||||||
pTask->refCnt = 1;
|
pTask->refCnt = 1;
|
||||||
|
@ -460,9 +492,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
|
|
||||||
if (pTask->chkInfo.pActiveInfo == NULL) {
|
if (pTask->chkInfo.pActiveInfo == NULL) {
|
||||||
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
|
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
|
||||||
|
if (code) {
|
||||||
|
stError("s-task:%s failed to create active checkpoint info, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return streamTaskSetBackendPath(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
|
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
|
||||||
|
|
Loading…
Reference in New Issue