Merge pull request #27843 from taosdata/fix/syntax
refactor: remove void.
This commit is contained in:
commit
ffd841e8f9
|
@ -1238,6 +1238,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
pMeta->vgId, req.taskId);
|
||||
// ignore this code to avoid error code over write
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
if (ret) {
|
||||
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -234,8 +234,7 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
|
|||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
|
||||
int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||
} else {
|
||||
|
|
|
@ -322,11 +322,8 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
|
|||
pInfo->pList = NULL;
|
||||
|
||||
if (pInfo->checkRspTmr != NULL) {
|
||||
bool succ = taosTmrStop(pInfo->checkRspTmr);
|
||||
streamTmrStop(pInfo->checkRspTmr);
|
||||
pInfo->checkRspTmr = NULL;
|
||||
if (!succ) {
|
||||
stError("failed to stop checkrsp tmr"); // todo: add id
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexDestroy(&pInfo->checkInfoLock);
|
||||
|
|
|
@ -110,17 +110,22 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
|||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
(void)tEncodeStreamRetrieveReq(&encoder, req);
|
||||
code = tEncodeStreamRetrieveReq(&encoder, req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (code < 0) {
|
||||
stError("s-task:%s failed encode stream retrieve req, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
|
||||
|
||||
|
@ -639,8 +644,11 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
|||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
|
||||
int64_t now) {
|
||||
bool found = false;
|
||||
uint32_t hashValue = 0;
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = 0;
|
||||
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
if (pTask->pNameMap == NULL) {
|
||||
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
}
|
||||
|
@ -665,8 +673,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
}
|
||||
} else {
|
||||
(void)buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
|
||||
pDataBlock->info.parTbName);
|
||||
int32_t code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId,
|
||||
pDataBlock->info.parTbName);
|
||||
stError("s-task:%s failed to build child table name, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
||||
|
@ -685,8 +694,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
}
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
// TODO: optimize search
|
||||
streamMutexLock(&pTask->msgInfo.lock);
|
||||
|
@ -730,6 +738,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
int32_t code = 0;
|
||||
SStreamDataBlock* pBlock = NULL;
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
int32_t old = 0;
|
||||
|
||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
if (numOfElems > 0) {
|
||||
|
@ -740,8 +749,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// to make sure only one dispatch is running
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
old = atomic_val_compare_exchange_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
stDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old);
|
||||
return 0;
|
||||
|
@ -1247,14 +1255,20 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
|
|||
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
(void)tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
|
||||
code = tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (code < 0) {
|
||||
rpcFreeCont(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = TMIN(code, 0);
|
||||
initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||
|
||||
pMsg->code = setCode;
|
||||
pMsg->info = *pRpcInfo;
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
|
||||
|
|
|
@ -58,7 +58,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
|||
|
||||
// not handle error, if dispatch failed, try next time.
|
||||
// checkpoint trigger will be checked
|
||||
(void)streamDispatchStreamBlock(pTask);
|
||||
code = streamDispatchStreamBlock(pTask);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -110,7 +110,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
|
|||
pRes = taosArrayInit(4, sizeof(SSDataBlock));
|
||||
}
|
||||
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return;
|
||||
}
|
||||
|
@ -137,7 +137,12 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
|
|||
continue;
|
||||
}
|
||||
|
||||
(void)assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
|
||||
code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
|
||||
if (code) {
|
||||
stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
block.info.type = STREAM_PULL_OVER;
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
|
||||
|
@ -258,9 +263,12 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
|
|||
}
|
||||
|
||||
SSDataBlock block = {0};
|
||||
(void)assignOneDataBlock(&block, output);
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
code = assignOneDataBlock(&block, output);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr);
|
||||
}
|
||||
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
void* p = taosArrayPush(pRes, &block);
|
||||
if (p == NULL) {
|
||||
stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr);
|
||||
|
@ -284,17 +292,17 @@ static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32
|
|||
}
|
||||
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||
}
|
||||
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (!pTask->hTaskInfo.operatorOpen) {
|
||||
(void)qSetStreamOpOpen(exec);
|
||||
int32_t code = qSetStreamOpOpen(exec);
|
||||
pTask->hTaskInfo.operatorOpen = true;
|
||||
}
|
||||
|
||||
|
@ -332,7 +340,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
}
|
||||
|
||||
// dispatch the generated results, todo fix error
|
||||
(void)handleScanhistoryResultBlocks(pTask, pRes, size);
|
||||
int32_t code = handleScanhistoryResultBlocks(pTask, pRes, size);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to handle scan result block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
if (finished) {
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
|
||||
|
@ -355,13 +366,13 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
|||
int32_t code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
|
||||
if (pStreamTask == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
stError(
|
||||
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
||||
"s-task:%s failed to find related stream task:0x%x, may have been destroyed or closed, destroy related "
|
||||
"fill-history task",
|
||||
id, (int32_t)pTask->streamTaskId.taskId);
|
||||
|
||||
// 1. free it and remove fill-history task from disk meta-store
|
||||
// todo: this function should never be failed.
|
||||
(void)streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
|
||||
// 2. save to disk
|
||||
streamMetaWLock(pMeta);
|
||||
|
@ -425,14 +436,14 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
|||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
|
||||
|
||||
(void)streamTaskResetTimewindowFilter(pStreamTask);
|
||||
code = streamTaskResetTimewindowFilter(pStreamTask);
|
||||
} else {
|
||||
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
// NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
|
||||
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream
|
||||
(void)streamTaskSendCheckpointReq(pStreamTask);
|
||||
code = streamTaskSendCheckpointReq(pStreamTask);
|
||||
|
||||
// 3. assign the status to the value that will be kept in disk
|
||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state;
|
||||
|
@ -441,13 +452,12 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
|||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t haltCallback(SStreamTask* pTask, void* param) {
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
(void)streamTaskSendCheckpointReq(pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return streamTaskSendCheckpointReq(pTask);
|
||||
}
|
||||
|
||||
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
|
||||
|
@ -549,10 +559,11 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
|
|||
const char* id = pTask->id.idStr;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
// dispatch the tran-state block to downstream task immediately
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
|
||||
int32_t remain = streamAlignTransferState(pTask);
|
||||
|
||||
if (remain > 0) {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
|
||||
|
@ -560,9 +571,6 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
|
|||
}
|
||||
}
|
||||
|
||||
// dispatch the tran-state block to downstream task immediately
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
|
||||
// transfer the ownership of executor state
|
||||
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
|
@ -576,7 +584,10 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
|
|||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||
if (code == 0) {
|
||||
(void)streamDispatchStreamBlock(pTask);
|
||||
code = streamDispatchStreamBlock(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to dispatch stream block, code:%s", id, tstrerror(code));
|
||||
}
|
||||
} else { // todo put into queue failed, retry
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
|
@ -589,7 +600,8 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
|
|||
|
||||
code = streamTransferStatePrepare(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
(void)streamTaskSetSchedStatusInactive(pTask);
|
||||
stError("s-task:%s failed to prepare transfer state, code:%s", id, tstrerror(code));
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask); // let's ignore this return status
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -660,8 +672,18 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
|
|||
SStreamTask* pHTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
|
||||
if (code == TSDB_CODE_SUCCESS) { // ignore the error code.
|
||||
(void)streamTaskReleaseState(pHTask);
|
||||
(void)streamTaskReloadState(pTask);
|
||||
code = streamTaskReleaseState(pHTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to release query state, code:%s", pHTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = streamTaskReloadState(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to reload query state, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
|
||||
streamTaskGetStatus(pHTask).name);
|
||||
// todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
|
||||
|
@ -738,7 +760,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
// dispatch checkpoint msg to all downstream tasks
|
||||
int32_t type = pInput->type;
|
||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
(void)streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
int32_t code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
if (code != 0) {
|
||||
stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -784,7 +809,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
|
||||
(void)streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
int32_t code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
|
||||
} else { // todo refactor
|
||||
int32_t code = 0;
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
@ -834,15 +859,19 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
|
|||
}
|
||||
|
||||
void streamResumeTask(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
|
||||
stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
|
||||
stError("s-task:%s invalid sched status:%d, not resume task", id, pTask->status.schedStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
while (1) {
|
||||
(void)doStreamExecTask(pTask);
|
||||
code = doStreamExecTask(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
||||
// check if continue
|
||||
streamMutexLock(&pTask->lock);
|
||||
|
|
|
@ -331,7 +331,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
|
|||
tCleanupStreamHbMsg(&pInfo->hbMsg);
|
||||
|
||||
if (pInfo->hbTmr != NULL) {
|
||||
(void) taosTmrStop(pInfo->hbTmr);
|
||||
streamTmrStop(pInfo->hbTmr);
|
||||
pInfo->hbTmr = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,12 @@ static void streamMetaEnvInit() {
|
|||
}
|
||||
}
|
||||
|
||||
void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
|
||||
void streamMetaInit() {
|
||||
int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
|
||||
if (code) {
|
||||
stError("failed to init stream Meta model, code:%s", tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaCleanup() {
|
||||
taosCloseRef(streamBackendId);
|
||||
|
@ -114,13 +119,17 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|||
|
||||
p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
|
||||
if (p == NULL) {
|
||||
SArray* list = taosArrayInit(8, sizeof(void*));
|
||||
p = taosArrayPush(list, &rid);
|
||||
SArray* pList = taosArrayInit(8, POINTER_BYTES);
|
||||
if (pList == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
p = taosArrayPush(pList, &rid);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
|
||||
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &pList, sizeof(void*));
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
|
||||
return code;
|
||||
|
@ -180,8 +189,13 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|||
|
||||
code = tdbTbcMoveToFirst(pCur);
|
||||
if (code) {
|
||||
(void)tdbTbcClose(pCur);
|
||||
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check", pMeta->vgId);
|
||||
stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
|
||||
tstrerror(code));
|
||||
code = tdbTbcClose(pCur);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -209,7 +223,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
|||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
(void)tdbTbcClose(pCur);
|
||||
code = tdbTbcClose(pCur);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -351,8 +368,8 @@ void streamMetaRemoveDB(void* arg, char* key) {
|
|||
|
||||
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
|
||||
int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
|
||||
*p = NULL;
|
||||
int32_t code = 0;
|
||||
QRY_PARAM_CHECK(p);
|
||||
|
||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||
if (pMeta == NULL) {
|
||||
|
@ -484,9 +501,26 @@ _err:
|
|||
taosMemoryFree(pMeta->path);
|
||||
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
|
||||
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
|
||||
if (pMeta->pTaskDb) (void)tdbTbClose(pMeta->pTaskDb);
|
||||
if (pMeta->pCheckpointDb) (void)tdbTbClose(pMeta->pCheckpointDb);
|
||||
if (pMeta->db) (void)tdbClose(pMeta->db);
|
||||
if (pMeta->pTaskDb) {
|
||||
int32_t ret = tdbTbClose(pMeta->pTaskDb);
|
||||
if (ret) {
|
||||
stError("vgId:%d tdb failed close task db, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
pMeta->pTaskDb = NULL;
|
||||
}
|
||||
if (pMeta->pCheckpointDb) {
|
||||
int32_t ret = tdbTbClose(pMeta->pCheckpointDb);
|
||||
if (ret) {
|
||||
stError("vgId:%d tdb failed close task checkpointDb, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
}
|
||||
if (pMeta->db) {
|
||||
int32_t ret = tdbClose(pMeta->db);
|
||||
if (ret) {
|
||||
stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
||||
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
|
@ -532,7 +566,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
// release the ref by timer
|
||||
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
|
||||
(void)taosTmrStop(p->schedInfo.pDelayTimer);
|
||||
streamTmrStop(p->schedInfo.pDelayTimer);
|
||||
p->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, p);
|
||||
}
|
||||
|
@ -567,7 +601,10 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
if (pMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
(void)taosRemoveRef(streamMetaId, pMeta->rid);
|
||||
int32_t code = taosRemoveRef(streamMetaId, pMeta->rid);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to remove ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaCloseImpl(void* arg) {
|
||||
|
@ -576,6 +613,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
stDebug("vgId:%d start to do-close stream meta", vgId);
|
||||
|
||||
|
@ -584,10 +622,22 @@ void streamMetaCloseImpl(void* arg) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// already log the error, ignore here
|
||||
(void)tdbAbort(pMeta->db, pMeta->txn);
|
||||
(void)tdbTbClose(pMeta->pTaskDb);
|
||||
(void)tdbTbClose(pMeta->pCheckpointDb);
|
||||
(void)tdbClose(pMeta->db);
|
||||
code = tdbAbort(pMeta->db, pMeta->txn);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to jump of trans for tdb, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
code = tdbTbClose(pMeta->pTaskDb);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
code = tdbTbClose(pMeta->pCheckpointDb);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
code = tdbClose(pMeta->db);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
taosArrayDestroy(pMeta->pTaskList);
|
||||
taosArrayDestroy(pMeta->chkpSaved);
|
||||
|
@ -611,7 +661,10 @@ void streamMetaCloseImpl(void* arg) {
|
|||
bkdMgtDestroy(pMeta->bkdChkptMgt);
|
||||
|
||||
pMeta->role = NODE_ROLE_UNINIT;
|
||||
(void)taosThreadRwlockDestroy(&pMeta->lock);
|
||||
code = taosThreadRwlockDestroy(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFree(pMeta);
|
||||
stDebug("vgId:%d end to close stream meta", vgId);
|
||||
|
@ -711,7 +764,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
*pAdded = true;
|
||||
|
@ -786,20 +839,26 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
|
|||
}
|
||||
|
||||
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||
int32_t code = 0;
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
(void)streamTaskSendCheckpointSourceRsp(pTask);
|
||||
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
|
||||
// pre-delete operation
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask) {
|
||||
pTask = *ppTask;
|
||||
|
@ -811,12 +870,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
}
|
||||
|
||||
// handle the dropping event
|
||||
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||
code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
|
||||
|
@ -850,12 +913,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
pTask = *ppTask;
|
||||
// it is an fill-history task, remove the related stream task's id that points to it
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void)atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||
(void)streamMetaRemoveTask(pMeta, &id);
|
||||
code = streamMetaRemoveTask(pMeta, &id);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
|
||||
}
|
||||
|
||||
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
|
||||
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
@ -871,7 +937,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
||||
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
streamTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
pTask->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
@ -936,8 +1002,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
|
||||
code = tdbTbcMoveToFirst(pCur);
|
||||
if (code) {
|
||||
(void)tdbTbcClose(pCur);
|
||||
stError("failed to open stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
|
||||
stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
|
||||
int32_t ret = tdbTbcClose(pCur);
|
||||
if (ret != 0) {
|
||||
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
return checkpointId;
|
||||
}
|
||||
|
||||
|
@ -960,7 +1029,11 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
|
||||
(void)tdbTbcClose(pCur);
|
||||
int32_t ret = tdbTbcClose(pCur);
|
||||
if (ret != 0) {
|
||||
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return checkpointId;
|
||||
}
|
||||
|
||||
|
@ -981,6 +1054,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||
if (pRecycleList == NULL) {
|
||||
stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
|
||||
return;
|
||||
}
|
||||
|
||||
vgId = pMeta->vgId;
|
||||
stInfo("vgId:%d load stream tasks from meta files", vgId);
|
||||
|
@ -996,7 +1073,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (code) {
|
||||
stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
(void)tdbTbcClose(pCur);
|
||||
int32_t ret = tdbTbcClose(pCur);
|
||||
if (ret != 0) {
|
||||
stError("vgId:%d failed to close meta file cursor, code:%s", pMeta->vgId, tstrerror(ret));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1072,11 +1152,11 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
(void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1090,7 +1170,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
|
||||
STaskId* pId = taosArrayGet(pRecycleList, i);
|
||||
(void)streamMetaRemoveTask(pMeta, pId);
|
||||
code = streamMetaRemoveTask(pMeta, pId);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1099,8 +1182,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||
|
||||
taosArrayDestroy(pRecycleList);
|
||||
|
||||
(void)streamMetaCommit(pMeta);
|
||||
code = streamMetaCommit(pMeta);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||
|
@ -1117,7 +1202,10 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->status.timerActive >= 1) {
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
|
||||
(void)streamTaskStop(pTask);
|
||||
int32_t code = streamTaskStop(pTask);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to stop task, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
inTimer = true;
|
||||
}
|
||||
}
|
||||
|
@ -1150,7 +1238,10 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
|
||||
(void)streamTaskStop(pTask);
|
||||
int32_t code = streamTaskStop(pTask);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to stop task:0x%x, code:%s", vgId, pTask->id.taskId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
@ -1168,7 +1259,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|||
SArray* pTaskList = NULL;
|
||||
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
}
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
@ -1199,14 +1289,17 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
(void)taosThreadRwlockRdlock(&pMeta->lock);
|
||||
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
|
||||
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
} else {
|
||||
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||
}
|
||||
|
@ -1214,13 +1307,18 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
(void)taosThreadRwlockWrlock(&pMeta->lock);
|
||||
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
(void)taosThreadRwlockUnlock(&pMeta->lock);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||
|
@ -1258,7 +1356,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
(void)streamMetaSendHbHelper(pMeta);
|
||||
code = streamMetaSendHbHelper(pMeta);
|
||||
pMeta->sendMsgBeforeClosing = false;
|
||||
return TSDB_CODE_SUCCESS; // always return true
|
||||
}
|
||||
|
@ -1348,9 +1446,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|||
streamMetaRUnLock(pMeta);
|
||||
|
||||
// add the failed task info, along with the related fill-history task info into tasks list.
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
if (hasFillhistoryTask) {
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
}
|
||||
} else {
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
@ -1365,12 +1463,18 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
|||
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -234,11 +234,17 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
code = streamMetaAcquireTask(pMeta, hStreamId, hTaskId, &pHisTask);
|
||||
if (pHisTask == NULL) {
|
||||
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
|
||||
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
|
||||
}
|
||||
} else { // exist, but not ready, continue check downstream task status
|
||||
if (pHisTask->pBackend == NULL) {
|
||||
code = pMeta->expandTaskFn(pHisTask);
|
||||
|
@ -256,7 +262,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
streamMetaReleaseTask(pMeta, pHisTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
} else {
|
||||
return launchNotBuiltFillHistoryTask(pTask);
|
||||
}
|
||||
|
@ -297,10 +303,14 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
|
|||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
|
||||
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
|
||||
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
} else {
|
||||
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
|
||||
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
|
||||
}
|
||||
|
||||
pHTaskInfo->id.taskId = 0;
|
||||
pHTaskInfo->id.streamId = 0;
|
||||
|
@ -315,7 +325,10 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
|
|||
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId,
|
||||
pInfo->hTaskId.taskId, ref);
|
||||
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pInfo);
|
||||
} else {
|
||||
char* p = streamTaskGetStatus(pTask).name;
|
||||
|
@ -357,7 +370,11 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// record the related fill-history task failed
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRId64 " failed to record the start task status, code:%s", pInfo->hTaskId.taskId,
|
||||
tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pInfo);
|
||||
return;
|
||||
}
|
||||
|
@ -418,7 +435,10 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
} else {
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
|
||||
stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d",
|
||||
|
@ -459,7 +479,10 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
|||
int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
int32_t ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
if (ret) {
|
||||
stError("s-task:%s add task check downstream result failed, code:%s", idStr, tstrerror(ret));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -476,7 +499,10 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
|||
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
|
||||
|
||||
taosMemoryFree(pInfo);
|
||||
(void) streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||
if (code) {
|
||||
stError("s-task:0x%x failed to record the start task status, code:%s", hTaskId, tstrerror(code));
|
||||
}
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SArray* pTaskList = NULL;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
|
||||
|
@ -47,7 +48,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pTaskList = NULL;
|
||||
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_SUCCESS; // ignore the error and return directly
|
||||
|
@ -62,9 +62,12 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if ((pTask == NULL) || (code != 0)) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (ret) {
|
||||
stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -85,9 +88,13 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
SStreamTask* pTask = NULL;
|
||||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
if ((pTask == NULL )|| (code != 0)) {
|
||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||
if (ret) {
|
||||
stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -105,11 +112,14 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||
pTask->id.idStr);
|
||||
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
||||
code = streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
||||
if (code) {
|
||||
stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
|
||||
true);
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
|
||||
true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -216,7 +226,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
if (code) {
|
||||
if (code == TSDB_CODE_DUP_KEY) {
|
||||
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
|
||||
" already exist start results in meta start task result hashmap",
|
||||
" already exist start results in meta start task result hashmap",
|
||||
vgId, id.taskId);
|
||||
} else {
|
||||
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
|
||||
|
@ -333,9 +343,13 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
|||
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
||||
|
||||
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
||||
if (pTask == NULL) {
|
||||
if ((pTask == NULL) || (code != 0)) {
|
||||
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
|
||||
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||
if (ret) {
|
||||
stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
|
@ -431,7 +445,10 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
|||
continue;
|
||||
}
|
||||
|
||||
(void)streamTaskStop(pTask);
|
||||
int32_t ret = streamTaskStop(pTask);
|
||||
if (ret) {
|
||||
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
|
||||
}
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
|
@ -441,7 +458,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
|||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
||||
|
|
|
@ -687,13 +687,14 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
|
||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
|
||||
if (code) {
|
||||
stError("failed to handle STOP event, s-task:%s", id);
|
||||
stError("failed to handle STOP event, s-task:%s, code:%s", id, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to kill task related query handle", id);
|
||||
stError("s-task:%s failed to kill task related query handle, code:%s", id, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -865,7 +866,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
|
|||
pReq->head.vgId = vgId;
|
||||
pReq->taskId = pTaskId->taskId;
|
||||
pReq->streamId = pTaskId->streamId;
|
||||
pReq->resetRelHalt = resetRelHalt; // todo: remove this attribute
|
||||
pReq->resetRelHalt = resetRelHalt;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
|
||||
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
|
||||
|
@ -1052,14 +1053,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
|||
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
|
||||
if (code < 0) {
|
||||
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId,
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return -1;
|
||||
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:Out of memory", id, vgId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
|
@ -1068,8 +1068,9 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
|||
rpcFreeCont(buf);
|
||||
tEncoderClear(&encoder);
|
||||
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
|
|
|
@ -217,10 +217,9 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|||
|
||||
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
|
||||
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||
bool removed = false;
|
||||
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
|
||||
|
||||
bool removed = false;
|
||||
|
||||
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
|
||||
if (pInfo == NULL) {
|
||||
|
@ -266,7 +265,11 @@ int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||
}
|
||||
} else {
|
||||
(void)removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); // ignore the return value,
|
||||
code = removeEventInWaitingList(pTask, TASK_EVENT_PAUSE); // ignore the return value,
|
||||
if (code) {
|
||||
stError("s-task:%s failed to remove event in waiting list, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
code = TSDB_CODE_FAILED; // failed to restore the status, since it is not in pause status
|
||||
}
|
||||
|
||||
|
|
|
@ -16,11 +16,22 @@
|
|||
#include "streamInt.h"
|
||||
|
||||
void streamMutexLock(TdThreadMutex *pMutex) {
|
||||
(void) taosThreadMutexLock(pMutex);
|
||||
int32_t code = taosThreadMutexLock(pMutex);
|
||||
if (code) {
|
||||
stError("%p mutex lock failed, code:%s", pMutex, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMutexUnlock(TdThreadMutex *pMutex) {
|
||||
(void) taosThreadMutexUnlock(pMutex);
|
||||
int32_t code = taosThreadMutexUnlock(pMutex);
|
||||
if (code) {
|
||||
stError("%p mutex unlock failed, code:%s", pMutex, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMutexDestroy(TdThreadMutex *pMutex) { (void) taosThreadMutexDestroy(pMutex); }
|
||||
void streamMutexDestroy(TdThreadMutex *pMutex) {
|
||||
int32_t code = taosThreadMutexDestroy(pMutex);
|
||||
if (code) {
|
||||
stError("%p mutex destroy, code:%s", pMutex, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue