refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-07 09:46:12 +08:00
parent 18c66a5491
commit bbd423f666
6 changed files with 179 additions and 176 deletions

View File

@ -158,7 +158,6 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -162,6 +162,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);

View File

@ -330,6 +330,36 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue;

View File

@ -17,7 +17,7 @@
#include "ttimer.h"
static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamSchedByTimer(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
@ -26,7 +26,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
@ -112,7 +112,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
streamMetaReleaseTask(pTask->pMeta, pTask);
}
void streamSchedByTimer(void* param, void* tmrId) {
void streamTaskSchedHelper(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
@ -133,7 +133,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
if (pTrigger == NULL) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -144,7 +144,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -153,7 +153,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
return;
}
@ -161,5 +161,5 @@ void streamSchedByTimer(void* param, void* tmrId) {
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
taosTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer);
}

View File

@ -32,6 +32,7 @@ typedef struct SBackendFileItem {
int64_t size;
int8_t ref;
} SBackendFileItem;
typedef struct SBackendFile {
char* pCurrent;
char* pMainfest;
@ -102,6 +103,7 @@ struct SStreamSnapWriter {
int64_t ever;
SStreamSnapHandle handle;
};
const char* ROCKSDB_OPTIONS = "OPTIONS";
const char* ROCKSDB_MAINFEST = "MANIFEST";
const char* ROCKSDB_SST = "sst";
@ -120,7 +122,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
} while (0)
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
@ -149,7 +151,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) {
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
@ -157,7 +159,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
sprintf(buf + strlen(buf) - 1, "]");
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
}
@ -183,7 +185,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
@ -270,12 +272,12 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
}
// unite read/write snap file
for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
@ -297,7 +299,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
SBackendSnapFile2 snapFile = {0};
@ -305,7 +307,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile);
}
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
@ -324,7 +326,7 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->pDbSnapSet) {
for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i);
snapFileDebugInfo(pSnapFile);
snapFileDestroy(pSnapFile);
@ -396,9 +398,9 @@ _NEXT:
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
@ -489,14 +491,10 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
}
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) {
int code = -1;
int32_t code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo;
SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
if (pSnapFile->fd == 0) {
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
@ -540,6 +538,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
pSnapFile->offset += pHdr->size;
}
code = 0;
_EXIT:
return code;
}
@ -590,8 +589,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
return streamSnapWrite(pWriter, pData, nData);
}
}
return code;
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
if (pWriter == NULL) return 0;
streamSnapHandleDestroy(&pWriter->handle);

View File

@ -38,8 +38,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId);
static void doExecScanhistoryInFuture(void* param, void* tmrId);
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask);
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
static void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
int32_t streamTaskSetReady(SStreamTask* pTask) {
static int32_t streamTaskSetReady(SStreamTask* pTask) {
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
@ -79,33 +83,6 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0;
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
@ -136,17 +113,6 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
return TSDB_CODE_SUCCESS;
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask)->state;
@ -267,32 +233,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
// common
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
@ -308,6 +248,55 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId;
@ -316,37 +305,7 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8
return 0;
}
int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pTranstate == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTranstate->type = STREAM_INPUT__TRANS_STATE;
pBlock->info.type = STREAM_TRANS_STATE;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
taosArrayPush(pTranstate->blocks, pBlock);
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.appendTranstateBlock = true;
return TSDB_CODE_SUCCESS;
}
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
SDataRange* pRange = &pHTask->dataRange;
// the query version range should be limited to the already processed data
@ -365,7 +324,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
}
static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -379,7 +338,7 @@ static void noRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
pHTaskInfo->id.streamId = 0;
}
static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
@ -401,7 +360,7 @@ static void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* p
}
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
int64_t now = taosGetTimestampMs();
@ -449,7 +408,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
noRetryLaunchFillHistoryTask(pTask, pInfo, now);
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
@ -500,7 +459,7 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, in
return pInfo;
}
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
@ -547,54 +506,6 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
const char* idStr = pTask->id.idStr;
int64_t hStreamId = pTask->hTaskInfo.id.streamId;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
ASSERT(hTaskId != 0);
// check stream task status in the first place.
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code
}
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
// Set the execute conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask);
}
streamMetaReleaseTask(pMeta, pHisTask);
}
return TSDB_CODE_SUCCESS;
} else {
return launchNotBuiltFillHistoryTask(pTask);
}
}
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec);
@ -651,3 +562,41 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
}
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code;
}