This commit is contained in:
chenhaoran 2024-05-08 10:33:17 +08:00
commit f37212c74c
15 changed files with 267 additions and 250 deletions

View File

@ -887,4 +887,4 @@ The `pycumsum` function finds the cumulative sum for all data in the input colum
</details>
## Manage and Use UDF
You need to add UDF to TDengine before using it in SQL queries. For more information about how to manage UDF and how to invoke UDF, please see [Manage and Use UDF](../taos-sql/udf/).
You need to add UDF to TDengine before using it in SQL queries. For more information about how to manage UDF and how to invoke UDF, please see [Manage and Use UDF](../../taos-sql/udf/).

View File

@ -819,9 +819,6 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage);
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
@ -838,8 +835,10 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
void streamTaskSendCheckMsg(SStreamTask* pTask);
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo,
int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
@ -924,10 +923,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
int32_t setCode);
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
void* streamDestroyStateMachine(SStreamTaskSM* pSM);

View File

@ -37,7 +37,6 @@ typedef struct {
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
@ -63,7 +62,6 @@ typedef struct SRpcHandleInfo {
SRpcConnInfo conn;
int8_t forbiddenIp;
int8_t notFreeAhandle;
} SRpcHandleInfo;
typedef struct SRpcMsg {

View File

@ -1064,6 +1064,7 @@ _OVER:
return code;
}
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1081,8 +1082,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}
@ -1091,7 +1093,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1101,7 +1103,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
", transId:%d s-task:0x%x ignore it",
vgId, req.checkpointId, req.transId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1112,7 +1114,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
" transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1128,7 +1130,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
@ -1146,7 +1148,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
@ -1171,11 +1173,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
} else { // checkpoint already finished, and not in checkpoint status
if (req.checkpointId <= pTask->chkInfo.checkpointId) {
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId);
" transId:%d already handled, return success", pTask->id.idStr, req.checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
}
@ -1193,10 +1199,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
}
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
tmsgSendRsp(&rsp); // error occurs
return code;
}

View File

@ -406,50 +406,16 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
SStreamTaskCheckRsp rsp = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
streamTaskProcessCheckMsg(pMeta, &req, &rsp);
return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId);
}
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {

View File

@ -158,7 +158,6 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -1580,12 +1580,11 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
key.len = compressedSize;
value = dst;
}
stDebug("vlen: raw size: %d, compressed size: %d", vlen, compressedSize);
}
if (*dest == NULL) {
char* p = taosMemoryCalloc(
1, sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len);
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
char* p = taosMemoryCalloc(1, size);
char* buf = p;
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key.len);
@ -1601,8 +1600,8 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
len += taosEncodeFixedI8((void**)&buf, key.compress);
len += taosEncodeBinary((void**)&buf, (char*)value, key.len);
}
taosMemoryFree(dst);
taosMemoryFree(dst);
return len;
}

View File

@ -39,7 +39,7 @@ static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
// check status
void streamTaskCheckDownstream(SStreamTask* pTask) {
void streamTaskSendCheckMsg(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
@ -97,6 +97,46 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
}
}
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
int32_t taskId = pReq->downstreamTaskId;
*pRsp = (SStreamTaskCheckRsp){
.reqId = pReq->reqId,
.streamId = pReq->streamId,
.childId = pReq->childId,
.downstreamNodeId = pReq->downstreamNodeId,
.downstreamTaskId = pReq->downstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.upstreamTaskId = pReq->upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError(
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId);
pRsp->status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) {
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
streamMetaReleaseTask(pMeta, pTask);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status);
} else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
}
}
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
@ -152,7 +192,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
} else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms
ASSERT(left > 0);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
@ -162,6 +202,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
@ -394,6 +460,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
.stage = pTask->pMeta->stage,
};
// update the reqId for the new check msg
p->reqId = tGenIdPI64();
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;

View File

@ -833,7 +833,7 @@ FAIL:
}
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed) {
int32_t setCode) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
@ -845,7 +845,7 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
.streamId = pReq->streamId,
.expireTime = pReq->expireTime,
.mnodeId = pReq->mnodeId,
.success = isSucceed,
.success = (setCode == TSDB_CODE_SUCCESS) ? 1 : 0,
};
tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
@ -866,22 +866,24 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
tEncoderClear(&encoder);
initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len);
pMsg->code = setCode;
pMsg->info = *pRpcInfo;
return 0;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed) {
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
SStreamChkptReadyInfo info = {0};
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
}
taosArrayPush(pTask->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
(int32_t)taosArrayGetSize(pTask->pReadyMsgList));
int32_t size = taosArrayGetSize(pTask->pReadyMsgList);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
return TSDB_CODE_SUCCESS;
}

View File

@ -330,6 +330,36 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue;

View File

@ -17,7 +17,7 @@
#include "ttimer.h"
static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamSchedByTimer(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
@ -26,7 +26,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
@ -112,7 +112,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
streamMetaReleaseTask(pTask->pMeta, pTask);
}
void streamSchedByTimer(void* param, void* tmrId) {
void streamTaskSchedHelper(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
@ -133,7 +133,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
if (pTrigger == NULL) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -144,7 +144,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -153,7 +153,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -161,5 +161,5 @@ void streamSchedByTimer(void* param, void* tmrId) {
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
}

View File

@ -32,6 +32,7 @@ typedef struct SBackendFileItem {
int64_t size;
int8_t ref;
} SBackendFileItem;
typedef struct SBackendFile {
char* pCurrent;
char* pMainfest;
@ -102,6 +103,7 @@ struct SStreamSnapWriter {
int64_t ever;
SStreamSnapHandle handle;
};
const char* ROCKSDB_OPTIONS = "OPTIONS";
const char* ROCKSDB_MAINFEST = "MANIFEST";
const char* ROCKSDB_SST = "sst";
@ -120,7 +122,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
} while (0)
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
@ -149,7 +151,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) {
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
@ -157,7 +159,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
sprintf(buf + strlen(buf) - 1, "]");
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
}
@ -183,7 +185,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
@ -270,12 +272,12 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
}
// unite read/write snap file
for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
@ -297,7 +299,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
SBackendSnapFile2 snapFile = {0};
@ -305,7 +307,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
@ -324,7 +326,7 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->pDbSnapSet) {
for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i);
snapFileDebugInfo(pSnapFile);
snapFileDestroy(pSnapFile);
@ -396,9 +398,9 @@ _NEXT:
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
@ -489,14 +491,10 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
}
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) {
int code = -1;
int32_t code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo;
SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
if (pSnapFile->fd == 0) {
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
@ -540,6 +538,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
pSnapFile->offset += pHdr->size;
}
code = 0;
_EXIT:
return code;
}
@ -590,8 +589,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
return streamSnapWrite(pWriter, pData, nData);
}
}
return code;
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
if (pWriter == NULL) return 0;
streamSnapHandleDestroy(&pWriter->handle);

View File

@ -38,8 +38,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
int32_t streamTaskSetReady(SStreamTask* pTask) {
static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
@ -79,33 +83,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
@ -136,17 +113,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
return TSDB_CODE_SUCCESS;
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask)->state;
@ -267,32 +233,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
// common
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
@ -308,6 +248,55 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
@ -316,37 +305,7 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
SDataRange* pRange = &pHTask->dataRange;
// the query version range should be limited to the already processed data
@ -365,7 +324,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
}
static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -379,7 +338,7 @@ static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
pHTaskInfo->id.streamId = 0;
}
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -401,7 +360,7 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
}
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
int64_t now = taosGetTimestampMs();
@ -449,7 +408,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
noRetryLaunchFillHistoryTask(pTask, pInfo, now);
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
@ -500,7 +459,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, in
return pInfo;
}
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
@ -547,54 +506,6 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec);
@ -651,3 +562,41 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}

View File

@ -84,7 +84,7 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
pTask->execInfo.checkTs);
streamTaskCheckDownstream(pTask);
streamTaskSendCheckMsg(pTask);
return 0;
}

View File

@ -542,7 +542,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
}
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
int64_t mark = (pFileState->deleteMark == INT64_MAX) ? INT64_MIN : pFileState->maxTs - pFileState->deleteMark;
int64_t mark = (pFileState->deleteMark == INT64_MAX || pFileState->maxTs == INT64_MIN)
? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs;
}