Merge pull request #24468 from taosdata/fix/3_liaohj
fix(stream): fix deadlock in pause.
This commit is contained in:
commit
dfe959eb28
|
@ -801,7 +801,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
int64_t* oldStage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||
|
@ -826,8 +826,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
|||
|
||||
// common
|
||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||
int32_t streamResetParamForScanHistory(SStreamTask* pTask);
|
||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
void streamTaskResume(SStreamTask* pTask);
|
||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||
|
@ -838,6 +837,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask);
|
|||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||
|
||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||
|
@ -885,6 +885,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
|||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||
|
||||
// checkpoint
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
|
|
|
@ -117,6 +117,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName);
|
|||
|
||||
int32_t taosSetFileHandlesLimit();
|
||||
|
||||
int32_t taosLinkFile(char *src, char *dst);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -742,8 +742,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
return code;
|
||||
}
|
||||
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
STaskId taskId = {0};
|
||||
if (pTask->info.fillHistory) {
|
||||
|
@ -1126,6 +1124,19 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
pRsp->info.handle = NULL;
|
||||
|
||||
SStreamCheckpointSourceReq req = {0};
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
SRpcMsg rsp = {0};
|
||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return code;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||
SRpcMsg rsp = {0};
|
||||
|
@ -1142,19 +1153,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||
SRpcMsg rsp = {0};
|
||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||
tmsgSendRsp(&rsp); // error occurs
|
||||
return code;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
||||
|
|
|
@ -76,33 +76,6 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||
if (pRunReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId);
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = streamId;
|
||||
pRunReq->taskId = taskId;
|
||||
pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
|
@ -728,10 +701,6 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) {
|
|||
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
streamTaskResetStatus(*pTask);
|
||||
|
||||
// if ((*pTask)->info.fillHistory == 1) {
|
||||
// streamResetParamForScanHistory(*pTask);
|
||||
// }
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -844,8 +813,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||
char* p = NULL;
|
||||
if (streamTaskReadyToRun(pTask, &p)) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
p, pTask->chkInfo.nextProcessVer);
|
||||
streamExecTask(pTask);
|
||||
} else {
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
|
@ -864,23 +833,31 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
|
||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
if (pStartInfo->taskStarting == 1) {
|
||||
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
|
||||
pMeta->startInfo.restartCount);
|
||||
} else { // not in starting procedure
|
||||
if (pStartInfo->restartCount > 0) {
|
||||
bool allReady = streamMetaAllTasksReady(pMeta);
|
||||
|
||||
if ((pStartInfo->restartCount > 0) && (!allReady)) {
|
||||
// if all tasks are ready now, do NOT restart again, and reset the value of pStartInfo->restartCount
|
||||
pStartInfo->restartCount -= 1;
|
||||
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", vgId, pMeta->role,
|
||||
pStartInfo->restartCount);
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId);
|
||||
if (pStartInfo->restartCount == 0) {
|
||||
tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId);
|
||||
} else if (allReady) {
|
||||
pStartInfo->restartCount = 0;
|
||||
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -922,7 +899,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
|||
}
|
||||
|
||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||
streamTaskPause(pTask, pMeta);
|
||||
streamTaskPause(pMeta, pTask);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -939,7 +916,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
|||
|
||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||
|
||||
streamTaskPause(pHistoryTask, pMeta);
|
||||
streamTaskPause(pMeta, pHistoryTask);
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -120,7 +120,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
|||
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||
|
|
|
@ -300,28 +300,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
|||
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
||||
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
if (num == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
pInfo->dataAllowed = true;
|
||||
}
|
||||
|
||||
pTask->upstreamInfo.numOfClosed = 0;
|
||||
stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr);
|
||||
}
|
||||
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
|
||||
if (pInfo != NULL) {
|
||||
pInfo->dataAllowed = false;
|
||||
}
|
||||
}
|
||||
|
||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
|
|
@ -41,6 +41,8 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
|||
int32_t getCfIdx(const char* cfName);
|
||||
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath);
|
||||
|
||||
int32_t backendCopyFiles(char* src, char* dst);
|
||||
|
||||
void destroyCompactFilteFactory(void* arg);
|
||||
void destroyCompactFilte(void* arg);
|
||||
const char* compactFilteFactoryName(void* arg);
|
||||
|
@ -218,7 +220,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
|||
taosRemoveDir(state);
|
||||
}
|
||||
taosMkDir(state);
|
||||
code = copyFiles(chkp, state);
|
||||
code = backendCopyFiles(chkp, state);
|
||||
stInfo("copy snap file from %s to %s", chkp, state);
|
||||
if (code != 0) {
|
||||
stError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
|
@ -334,7 +336,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
code = copyFiles(chkpPath, defaultPath);
|
||||
code = backendCopyFiles(chkpPath, defaultPath);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -359,7 +361,7 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
|
|||
|
||||
if (code == 0) {
|
||||
taosMkDir(defaultPath);
|
||||
code = copyFiles(chkpPath, defaultPath);
|
||||
code = backendCopyFiles(chkpPath, defaultPath);
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -382,6 +384,121 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t copyFiles_create(char* src, char* dst, int8_t type) {
|
||||
// create and copy file
|
||||
int32_t err = taosCopyFile(src, dst);
|
||||
|
||||
if (errno == EXDEV || errno == ENOTSUP) {
|
||||
errno = 0;
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
|
||||
// same fs and hard link
|
||||
return taosLinkFile(src, dst);
|
||||
}
|
||||
|
||||
int32_t backendFileCopyFilesImpl(char* src, char* dst) {
|
||||
const char* current = "CURRENT";
|
||||
size_t currLen = strlen(current);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t sLen = strlen(src);
|
||||
int32_t dLen = strlen(dst);
|
||||
char* srcName = taosMemoryCalloc(1, sLen + 64);
|
||||
char* dstName = taosMemoryCalloc(1, dLen + 64);
|
||||
// copy file to dst
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(src);
|
||||
if (pDir == NULL) {
|
||||
taosMemoryFree(srcName);
|
||||
taosMemoryFree(dstName);
|
||||
errno = 0;
|
||||
return -1;
|
||||
}
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
char* name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
|
||||
|
||||
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
|
||||
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
|
||||
|
||||
if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
|
||||
code = copyFiles_create(srcName, dstName, 0);
|
||||
if (code != 0) {
|
||||
stError("failed to copy file, detail: %s to %s reason: %s", srcName, dstName,
|
||||
tstrerror(TAOS_SYSTEM_ERROR(code)));
|
||||
goto _ERROR;
|
||||
}
|
||||
} else {
|
||||
code = copyFiles_hardlink(srcName, dstName, 0);
|
||||
if (code != 0) {
|
||||
stError("failed to hard line file, detail: %s to %s, reason: %s", srcName, dstName,
|
||||
tstrerror(TAOS_SYSTEM_ERROR(code)));
|
||||
goto _ERROR;
|
||||
}
|
||||
}
|
||||
memset(srcName, 0, sLen + 64);
|
||||
memset(dstName, 0, dLen + 64);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(srcName);
|
||||
taosMemoryFreeClear(dstName);
|
||||
taosCloseDir(&pDir);
|
||||
errno = 0;
|
||||
return 0;
|
||||
_ERROR:
|
||||
taosMemoryFreeClear(srcName);
|
||||
taosMemoryFreeClear(dstName);
|
||||
taosCloseDir(&pDir);
|
||||
errno = 0;
|
||||
return -1;
|
||||
}
|
||||
int32_t backendCopyFiles(char* src, char* dst) {
|
||||
return backendFileCopyFilesImpl(src, dst);
|
||||
// // opt later, just hard link
|
||||
// int32_t sLen = strlen(src);
|
||||
// int32_t dLen = strlen(dst);
|
||||
// char* srcName = taosMemoryCalloc(1, sLen + 64);
|
||||
// char* dstName = taosMemoryCalloc(1, dLen + 64);
|
||||
|
||||
// TdDirPtr pDir = taosOpenDir(src);
|
||||
// if (pDir == NULL) {
|
||||
// taosMemoryFree(srcName);
|
||||
// taosMemoryFree(dstName);
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// TdDirEntryPtr de = NULL;
|
||||
// while ((de = taosReadDir(pDir)) != NULL) {
|
||||
// char* name = taosGetDirEntryName(de);
|
||||
// if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
|
||||
|
||||
// sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
|
||||
// sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
|
||||
// // if (!taosDirEntryIsDir(de)) {
|
||||
// // // code = taosCopyFile(srcName, dstName);
|
||||
// // if (code == -1) {
|
||||
// // goto _err;
|
||||
// // }
|
||||
// // }
|
||||
// return backendFileCopyFilesImpl(src, dst);
|
||||
|
||||
// memset(srcName, 0, sLen + 64);
|
||||
// memset(dstName, 0, dLen + 64);
|
||||
// }
|
||||
|
||||
// _err:
|
||||
// taosMemoryFreeClear(srcName);
|
||||
// taosMemoryFreeClear(dstName);
|
||||
// taosCloseDir(&pDir);
|
||||
// return code >= 0 ? 0 : -1;
|
||||
|
||||
// return 0;
|
||||
}
|
||||
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||
int32_t code = -1;
|
||||
int32_t len = strlen(defaultPath) + 32;
|
||||
|
@ -396,7 +513,7 @@ int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* de
|
|||
taosRemoveDir(tmp);
|
||||
}
|
||||
taosMkDir(defaultPath);
|
||||
code = copyFiles(chkpPath, defaultPath);
|
||||
code = backendCopyFiles(chkpPath, defaultPath);
|
||||
if (code != 0) {
|
||||
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
|
|
|
@ -185,6 +185,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
int64_t checkpointId = pDataBlock->info.version;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64
|
||||
", current checkpointingId:%" PRId64,
|
||||
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, checkpointId);
|
||||
|
||||
// set task status
|
||||
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) {
|
||||
|
@ -330,7 +335,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
|
|||
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
|
||||
|
||||
// save the task if not sink task
|
||||
if (p->info.taskLevel < TASK_LEVEL__SINK) {
|
||||
if (p->info.taskLevel <= TASK_LEVEL__SINK) {
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
code = streamMetaSaveTask(pMeta, p);
|
||||
|
@ -455,7 +460,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
|
||||
// sink task do not need to save the status, and generated the checkpoint
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||
stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel);
|
||||
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
||||
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
|
||||
|
|
|
@ -713,7 +713,13 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
|
|||
*pStatus = pState->name;
|
||||
}
|
||||
|
||||
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
|
||||
// pause & halt will still run for sink tasks.
|
||||
if (streamTaskIsSinkTask(pTask)) {
|
||||
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
|
||||
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
|
||||
} else {
|
||||
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
|
||||
}
|
||||
}
|
||||
|
||||
static void doStreamExecTaskHelper(void* param, void* tmrId) {
|
||||
|
|
|
@ -47,6 +47,12 @@ struct SMetaHbInfo {
|
|||
int64_t hbStart;
|
||||
};
|
||||
|
||||
typedef struct STaskInitTs {
|
||||
int64_t start;
|
||||
int64_t end;
|
||||
bool success;
|
||||
} STaskInitTs;
|
||||
|
||||
SMetaRefMgt gMetaRefMgt;
|
||||
|
||||
void metaRefMgtInit();
|
||||
|
@ -581,7 +587,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
|
|||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
||||
*pAdded = false;
|
||||
|
||||
STaskId id = streamTaskExtractKey(pTask);
|
||||
STaskId id = streamTaskGetTaskId(pTask);
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p != NULL) {
|
||||
return 0;
|
||||
|
@ -871,7 +877,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
int32_t taskId = pTask->id.taskId;
|
||||
tFreeStreamTask(pTask);
|
||||
|
||||
STaskId id = streamTaskExtractKey(pTask);
|
||||
STaskId id = streamTaskGetTaskId(pTask);
|
||||
taosArrayPush(pRecycleList, &id);
|
||||
|
||||
int32_t total = taosArrayGetSize(pRecycleList);
|
||||
|
@ -1302,28 +1308,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
|||
}
|
||||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
taosThreadRwlockRdlock(&pMeta->lock);
|
||||
}
|
||||
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
|
||||
} else {
|
||||
stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||
// stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
taosThreadRwlockWrlock(&pMeta->lock);
|
||||
stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
|
||||
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
|
||||
}
|
||||
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
taosThreadRwlockUnlock(&pMeta->lock);
|
||||
}
|
||||
|
||||
|
@ -1407,37 +1413,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
|
||||
if (num == 0) {
|
||||
streamMetaRUnLock(pMeta);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// send hb msg to mnode before closing all tasks.
|
||||
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
|
||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (pTask == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
streamTaskStop(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskList);
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
@ -1512,6 +1487,54 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
|
||||
if (num == 0) {
|
||||
streamMetaRUnLock(pMeta);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// send hb msg to mnode before closing all tasks.
|
||||
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
|
||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (pTask == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
streamTaskStop(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskList);
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
|
||||
if (ppTask == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((*ppTask)->status.downstreamReady == 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);
|
||||
|
@ -1547,4 +1570,74 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
|||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
void* pIter = NULL;
|
||||
size_t keyLen = 0;
|
||||
|
||||
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
|
||||
succ ? "success" : "failed");
|
||||
|
||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||
STaskInitTs* pInfo = pIter;
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
||||
if (pTask1 == NULL) {
|
||||
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
|
||||
} else {
|
||||
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
||||
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
if (pStartInfo->taskStarting != 1) {
|
||||
int64_t el = endTs - startTs;
|
||||
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||
pMeta->vgId, taskId, ready, el);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
|
||||
|
||||
if (numOfRecv == numOfTotal) {
|
||||
pStartInfo->readyTs = taosGetTimestampMs();
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
pStartInfo->completeFn(pMeta);
|
||||
} else {
|
||||
streamMetaWUnLock(pMeta);
|
||||
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
|
||||
ready, numOfRecv, numOfTotal);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
|
@ -154,7 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*numOfBlocks = 0;
|
||||
*blockSize = 0;
|
||||
|
||||
// todo remove it
|
||||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
||||
|
|
|
@ -35,12 +35,6 @@ typedef struct STaskRecheckInfo {
|
|||
void* checkTimer;
|
||||
} STaskRecheckInfo;
|
||||
|
||||
typedef struct STaskInitTs {
|
||||
int64_t start;
|
||||
int64_t end;
|
||||
bool success;
|
||||
} STaskInitTs;
|
||||
|
||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||
|
@ -546,15 +540,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
|||
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
||||
}
|
||||
|
||||
int32_t streamResetParamForScanHistory(SStreamTask* pTask) {
|
||||
stDebug("s-task:%s reset operator option for scan-history data", pTask->id.idStr);
|
||||
if (pTask->exec.pExecutor != NULL) {
|
||||
return qResetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamRestoreParam(SStreamTask* pTask) {
|
||||
stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
|
||||
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
|
||||
|
@ -1134,97 +1119,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||
|
||||
// in case of fill-history task, stop the tsdb file scan operation.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
void streamTaskResume(SStreamTask* pTask) {
|
||||
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
|
||||
streamTaskRestoreStatus(pTask);
|
||||
|
||||
char* pNew = streamTaskGetStatus(pTask)->name;
|
||||
if (prevState.state == TASK_STATUS__PAUSE) {
|
||||
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
|
||||
} else {
|
||||
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
|
||||
}
|
||||
}
|
||||
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
void* pIter = NULL;
|
||||
size_t keyLen = 0;
|
||||
|
||||
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
|
||||
succ ? "success" : "failed");
|
||||
|
||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||
STaskInitTs* pInfo = pIter;
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
||||
if (pTask1 == NULL) {
|
||||
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
|
||||
} else {
|
||||
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
||||
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
|
||||
|
||||
if (numOfRecv == numOfTotal) {
|
||||
pStartInfo->readyTs = taosGetTimestampMs();
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
pStartInfo->completeFn(pMeta);
|
||||
} else {
|
||||
streamMetaWUnLock(pMeta);
|
||||
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
|
||||
ready, numOfRecv, numOfTotal);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -435,7 +435,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
|
|||
|
||||
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used) {
|
||||
// todo refactor
|
||||
stDebug("streamStateReleaseBuf");
|
||||
if (!pVal) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,55 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||
char buf[512] = {0};
|
||||
|
||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||
EPSET_TO_STR(pEpSet, buf)
|
||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||
}
|
||||
|
||||
// check for the dispath info and the upstream task info
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||
} else if (level == TASK_LEVEL__AGG) {
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||
} else { // TASK_LEVEL__SINK
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void freeItem(void* p) {
|
||||
SStreamContinueExecInfo* pInfo = p;
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
}
|
||||
|
||||
static void freeUpstreamItem(void* p) {
|
||||
SStreamChildEpInfo** pInfo = p;
|
||||
taosMemoryFree(*pInfo);
|
||||
}
|
||||
|
||||
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
|
||||
if (pEpInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pEpInfo->childId = pTask->info.selfChildId;
|
||||
pEpInfo->epSet = pTask->info.epSet;
|
||||
pEpInfo->nodeId = pTask->info.nodeId;
|
||||
pEpInfo->taskId = pTask->id.taskId;
|
||||
pEpInfo->stage = -1;
|
||||
|
||||
return pEpInfo;
|
||||
}
|
||||
|
||||
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
|
||||
SArray* pTaskList, bool hasFillhistory) {
|
||||
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
|
@ -291,16 +340,6 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void freeItem(void* p) {
|
||||
SStreamContinueExecInfo* pInfo = p;
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
}
|
||||
|
||||
static void freeUpstreamItem(void* p) {
|
||||
SStreamChildEpInfo** pInfo = p;
|
||||
taosMemoryFree(*pInfo);
|
||||
}
|
||||
|
||||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
char* p = NULL;
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
|
@ -475,14 +514,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
}
|
||||
|
||||
taosThreadMutexInit(&pTask->lock, &attr);
|
||||
// if (pTask->info.fillHistory == 1) {
|
||||
// //
|
||||
// } else {
|
||||
|
||||
// }
|
||||
// if (streamTaskSetDb(pMeta, pTask) != 0) {
|
||||
// return -1;
|
||||
// }
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
|
||||
|
@ -509,22 +540,6 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
|
||||
if (pEpInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pEpInfo->childId = pTask->info.selfChildId;
|
||||
pEpInfo->epSet = pTask->info.epSet;
|
||||
pEpInfo->nodeId = pTask->info.nodeId;
|
||||
pEpInfo->taskId = pTask->id.taskId;
|
||||
pEpInfo->stage = -1;
|
||||
|
||||
return pEpInfo;
|
||||
}
|
||||
|
||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
|
||||
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
|
||||
if (pEpInfo == NULL) {
|
||||
|
@ -622,29 +637,6 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
||||
char buf[512] = {0};
|
||||
|
||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||
EPSET_TO_STR(pEpSet, buf)
|
||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||
}
|
||||
|
||||
// check for the dispath info and the upstream task info
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||
} else if (level == TASK_LEVEL__AGG) {
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||
} else { // TASK_LEVEL__SINK
|
||||
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||
STaskExecStatisInfo* p = &pTask->execInfo;
|
||||
|
||||
|
@ -677,7 +669,29 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
|
||||
}
|
||||
|
||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask) {
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
if (num == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
pInfo->dataAllowed = true;
|
||||
}
|
||||
|
||||
pTask->upstreamInfo.numOfClosed = 0;
|
||||
stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
|
||||
}
|
||||
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
|
||||
if (pInfo != NULL) {
|
||||
pInfo->dataAllowed = false;
|
||||
}
|
||||
}
|
||||
|
||||
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
|
||||
return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
}
|
||||
|
||||
|
@ -760,7 +774,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
|
|||
return code;
|
||||
}
|
||||
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask) {
|
||||
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
return id;
|
||||
}
|
||||
|
@ -801,3 +815,40 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
|
|||
pDst->chkpointTransId = pSrc->chkpointTransId;
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||
|
||||
// in case of fill-history task, stop the tsdb file scan operation.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
void streamTaskResume(SStreamTask* pTask) {
|
||||
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
|
||||
streamTaskRestoreStatus(pTask);
|
||||
|
||||
char* pNew = streamTaskGetStatus(pTask)->name;
|
||||
if (prevState.state == TASK_STATUS__PAUSE) {
|
||||
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
|
||||
} else {
|
||||
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
|
||||
}
|
||||
}
|
||||
|
||||
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
|
||||
return pTask->info.taskLevel == TASK_LEVEL__SINK;
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
#include "zlib.h"
|
||||
|
||||
#ifdef WINDOWS
|
||||
#include <io.h>
|
||||
#include <WinBase.h>
|
||||
#include <io.h>
|
||||
#include <ktmw32.h>
|
||||
#include <windows.h>
|
||||
#define F_OK 0
|
||||
|
@ -50,7 +50,7 @@ typedef struct TdFile {
|
|||
TdThreadRwlock rwlock;
|
||||
int refId;
|
||||
HANDLE hFile;
|
||||
FILE* fp;
|
||||
FILE *fp;
|
||||
int32_t tdFileOptions;
|
||||
} TdFile;
|
||||
#else
|
||||
|
@ -230,7 +230,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime, int32_t *a
|
|||
int32_t code = _stati64(path, &fileStat);
|
||||
#else
|
||||
struct stat fileStat;
|
||||
int32_t code = stat(path, &fileStat);
|
||||
int32_t code = stat(path, &fileStat);
|
||||
#endif
|
||||
if (code < 0) {
|
||||
return code;
|
||||
|
@ -274,7 +274,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
|
|||
return -1;
|
||||
}
|
||||
struct stat fileStat;
|
||||
int32_t code = fstat(pFile->fd, &fileStat);
|
||||
int32_t code = fstat(pFile->fd, &fileStat);
|
||||
if (code < 0) {
|
||||
printf("taosFStatFile run fstat fail.");
|
||||
return code;
|
||||
|
@ -374,7 +374,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
|
|||
DWORD bytesRead;
|
||||
if (!ReadFile(pFile->hFile, buf, count, &bytesRead, NULL)) {
|
||||
bytesRead = -1;
|
||||
}
|
||||
}
|
||||
#if FILE_WITH_LOCK
|
||||
taosThreadRwlockUnlock(&(pFile->rwlock));
|
||||
#endif
|
||||
|
@ -389,7 +389,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
|||
taosThreadRwlockWrlock(&(pFile->rwlock));
|
||||
#endif
|
||||
|
||||
DWORD bytesWritten;
|
||||
DWORD bytesWritten;
|
||||
if (!WriteFile(pFile->hFile, buf, count, &bytesWritten, NULL)) {
|
||||
bytesWritten = -1;
|
||||
}
|
||||
|
@ -666,7 +666,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
|
|||
}
|
||||
int64_t leftbytes = count;
|
||||
int64_t readbytes;
|
||||
char * tbuf = (char *)buf;
|
||||
char *tbuf = (char *)buf;
|
||||
|
||||
while (leftbytes > 0) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -716,7 +716,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
|||
|
||||
int64_t nleft = count;
|
||||
int64_t nwritten = 0;
|
||||
char * tbuf = (char *)buf;
|
||||
char *tbuf = (char *)buf;
|
||||
|
||||
while (nleft > 0) {
|
||||
nwritten = write(pFile->fd, (void *)tbuf, (uint32_t)nleft);
|
||||
|
@ -1028,7 +1028,7 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
|
|||
#endif
|
||||
}
|
||||
|
||||
#endif // WINDOWS
|
||||
#endif // WINDOWS
|
||||
|
||||
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
||||
FILE *fp = NULL;
|
||||
|
@ -1056,7 +1056,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
if (hFile != NULL) CloseHandle(hFile);
|
||||
#else
|
||||
if (fd >= 0) close(fd);
|
||||
#endif
|
||||
#endif
|
||||
if (fp != NULL) fclose(fp);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1067,7 +1067,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
pFile->fp = fp;
|
||||
pFile->refId = 0;
|
||||
|
||||
#ifdef WINDOWS
|
||||
#ifdef WINDOWS
|
||||
pFile->hFile = hFile;
|
||||
pFile->tdFileOptions = tdFileOptions;
|
||||
// do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle
|
||||
|
@ -1137,7 +1137,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
|
|||
#endif
|
||||
return -1;
|
||||
}
|
||||
DWORD ret = 0;
|
||||
DWORD ret = 0;
|
||||
OVERLAPPED ol = {0};
|
||||
ol.OffsetHigh = (uint32_t)((offset & 0xFFFFFFFF00000000LL) >> 0x20);
|
||||
ol.Offset = (uint32_t)(offset & 0xFFFFFFFFLL);
|
||||
|
@ -1179,7 +1179,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
|
|||
if (pFile->hFile != NULL) {
|
||||
if (pFile->tdFileOptions & TD_FILE_WRITE_THROUGH) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return !FlushFileBuffers(pFile->hFile);
|
||||
#else
|
||||
if (pFile->fd >= 0) {
|
||||
|
@ -1204,7 +1204,7 @@ bool taosValidFile(TdFilePtr pFile) {
|
|||
return pFile != NULL && pFile->hFile != NULL;
|
||||
#else
|
||||
return pFile != NULL && pFile->fd > 0;
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t taosUmaskFile(int32_t maskVal) {
|
||||
|
@ -1249,7 +1249,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
|
|||
}
|
||||
|
||||
bufferSize += 512;
|
||||
void* newBuf = taosMemoryRealloc(*ptrBuf, bufferSize);
|
||||
void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize);
|
||||
if (newBuf == NULL) {
|
||||
taosMemoryFreeClear(*ptrBuf);
|
||||
return -1;
|
||||
|
@ -1363,7 +1363,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
|||
|
||||
cmp_end:
|
||||
if (pFile) {
|
||||
taosCloseFile(&pFile);
|
||||
taosCloseFile(&pFile);
|
||||
}
|
||||
if (pSrcFile) {
|
||||
taosCloseFile(&pSrcFile);
|
||||
|
@ -1386,3 +1386,15 @@ int32_t taosSetFileHandlesLimit() {
|
|||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t taosLinkFile(char *src, char *dst) {
|
||||
#ifndef WINDOWS
|
||||
if (link(src, dst) != 0) {
|
||||
if (errno == EXDEV || errno == ENOTSUP) {
|
||||
return -1;
|
||||
}
|
||||
return errno;
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue