Merge pull request #25653 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-05-07 23:12:30 +08:00 committed by GitHub
commit 55a630911d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 263 additions and 248 deletions

View File

@ -819,9 +819,6 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
@ -838,8 +835,10 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
void streamTaskSendCheckMsg(SStreamTask* pTask);
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo,
int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
@ -924,10 +923,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
int32_t setCode);
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);

View File

@ -37,7 +37,6 @@ typedef struct {
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
@ -63,7 +62,6 @@ typedef struct SRpcHandleInfo {
SRpcConnInfo conn;
int8_t forbiddenIp;
int8_t notFreeAhandle;
} SRpcHandleInfo;
typedef struct SRpcMsg {

View File

@ -1064,6 +1064,7 @@ _OVER:
return code;
}
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1081,8 +1082,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}
@ -1091,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1101,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1112,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1128,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1146,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
@ -1171,11 +1173,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
} else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId);
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
}
@ -1193,10 +1199,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
}
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}

View File

@ -406,50 +406,16 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
SStreamTaskCheckRsp rsp = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
streamTaskProcessCheckMsg(pMeta, &req, &rsp);
return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
}
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {

View File

@ -158,7 +158,6 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -1580,12 +1580,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
key.len = compressedSize;
value = dst;
}
stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize);
}
if (*dest == NULL) {
char* p = taosMemoryCalloc(
1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len);
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
char* p = taosMemoryCalloc(1, size);
char* buf = p;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
@ -1601,8 +1600,8 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
len += taosEncodeFixedI8((void**)&buf, key.compress);
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
}
taosMemoryFree(dst);
taosMemoryFree(dst);
return len;
}

View File

@ -39,7 +39,7 @@ static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
void streamTaskSendCheckMsg(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
@ -97,6 +97,46 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
}
}
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
int32_t taskId = pReq->downstreamTaskId;
*pRsp = (SStreamTaskCheckRsp){
.reqId = pReq->reqId,
.streamId = pReq->streamId,
.childId = pReq->childId,
.downstreamNodeId = pReq->downstreamNodeId,
.downstreamTaskId = pReq->downstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.upstreamTaskId = pReq->upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status);
} else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
}
}
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
@ -152,7 +192,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
@ -162,6 +202,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
@ -394,6 +460,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
.stage = pTask->pMeta->stage,
};
// update the reqId for the new check msg
p->reqId = tGenIdPI64();
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;

View File

@ -833,7 +833,7 @@ FAIL:
}
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed) {
int32_t setCode) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
@ -845,7 +845,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
.streamId = pReq->streamId,
.expireTime = pReq->expireTime,
.mnodeId = pReq->mnodeId,
.success = isSucceed,
.success = (setCode == TSDB_CODE_SUCCESS) ? 1 : 0,
};
tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
@ -866,22 +866,24 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
tEncoderClear(&encoder);
initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len);
pMsg->code = setCode;
pMsg->info = *pRpcInfo;
return 0;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed) {
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
SStreamChkptReadyInfo info = {0};
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
}
taosArrayPush(pTask->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
(int32_t)taosArrayGetSize(pTask->pReadyMsgList));
int32_t size = taosArrayGetSize(pTask->pReadyMsgList);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
return TSDB_CODE_SUCCESS;
}

View File

@ -330,6 +330,36 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue;

View File

@ -17,7 +17,7 @@
#include "ttimer.h"
static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamSchedByTimer(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
@ -26,7 +26,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
@ -112,7 +112,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
streamMetaReleaseTask(pTask->pMeta, pTask);
}
void streamSchedByTimer(void* param, void* tmrId) {
void streamTaskSchedHelper(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
@ -133,7 +133,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
if (pTrigger == NULL) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -144,7 +144,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -153,7 +153,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -161,5 +161,5 @@ void streamSchedByTimer(void* param, void* tmrId) {
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
}

View File

@ -32,6 +32,7 @@ typedef struct SBackendFileItem {
int64_t size;
int8_t ref;
} SBackendFileItem;
typedef struct SBackendFile {
char* pCurrent;
char* pMainfest;
@ -102,6 +103,7 @@ struct SStreamSnapWriter {
int64_t ever;
SStreamSnapHandle handle;
};
const char* ROCKSDB_OPTIONS = "OPTIONS";
const char* ROCKSDB_MAINFEST = "MANIFEST";
const char* ROCKSDB_SST = "sst";
@ -120,7 +122,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
} while (0)
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
@ -149,7 +151,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) {
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
@ -157,7 +159,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
sprintf(buf + strlen(buf) - 1, "]");
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
}
@ -183,7 +185,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
@ -270,12 +272,12 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
}
// unite read/write snap file
for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
@ -297,7 +299,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
SBackendSnapFile2 snapFile = {0};
@ -305,7 +307,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
@ -324,7 +326,7 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->pDbSnapSet) {
for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i);
snapFileDebugInfo(pSnapFile);
snapFileDestroy(pSnapFile);
@ -396,9 +398,9 @@ _NEXT:
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
@ -489,14 +491,10 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
}
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) {
int code = -1;
int32_t code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo;
SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
if (pSnapFile->fd == 0) {
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
@ -540,6 +538,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
pSnapFile->offset += pHdr->size;
}
code = 0;
_EXIT:
return code;
}
@ -590,8 +589,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
return streamSnapWrite(pWriter, pData, nData);
}
}
return code;
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
if (pWriter == NULL) return 0;
streamSnapHandleDestroy(&pWriter->handle);

View File

@ -38,8 +38,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
int32_t streamTaskSetReady(SStreamTask* pTask) {
static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
@ -79,33 +83,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
@ -136,17 +113,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
return TSDB_CODE_SUCCESS;
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask)->state;
@ -267,32 +233,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
// common
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
@ -308,6 +248,55 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
@ -316,37 +305,7 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
SDataRange* pRange = &pHTask->dataRange;
// the query version range should be limited to the already processed data
@ -365,7 +324,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
}
static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -379,7 +338,7 @@ static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
pHTaskInfo->id.streamId = 0;
}
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -401,7 +360,7 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
}
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
int64_t now = taosGetTimestampMs();
@ -449,7 +408,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
noRetryLaunchFillHistoryTask(pTask, pInfo, now);
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
@ -500,7 +459,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, in
return pInfo;
}
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
@ -547,54 +506,6 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec);
@ -651,3 +562,41 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}

View File

@ -84,7 +84,7 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
pTask->execInfo.checkTs);
streamTaskCheckDownstream(pTask);
streamTaskSendCheckMsg(pTask);
return 0;
}