Merge pull request #22970 from taosdata/fix/liaohj

fix(stream): keep the status entry in hash table, instead entry index.
This commit is contained in:
Haojun Liao 2023-09-20 10:33:05 +08:00 committed by GitHub
commit 779c0c5f19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 138 additions and 70 deletions

View File

@ -247,6 +247,7 @@ typedef struct SStreamTaskId {
} SStreamTaskId;
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
@ -661,6 +662,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);

View File

@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 300;
int32_t tsStreamCheckpointTickInterval = 10;
int32_t tsStreamNodeCheckInterval = 10;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;

View File

@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -122,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) {
taosThreadMutexInit(&execNodeList.lock, NULL);
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry));
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId));
return sdbSetTable(pMnode->pSdb, table);
}
@ -1180,10 +1183,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execNodeList.lock);
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) {
STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status));
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
break;
}
@ -1280,7 +1288,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
return -1;
}
// mndTransSetSerial(pTrans);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
@ -1304,13 +1311,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
removeStreamTasksInBuf(pStream, &execNodeList);
char detail[100] = {0};
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail);
sdbRelease(pMnode->pSdb, pStream);
@ -1555,13 +1562,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (index == NULL) {
STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status);
const char* pStatus = streamGetTaskStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -2238,7 +2244,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2252,15 +2258,42 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
if (p == NULL) {
STaskStatusEntry entry = {
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal));
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
}
}
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
break;
}
}
}
}
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -2277,7 +2310,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
// int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);
@ -2288,13 +2320,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (index == NULL) {
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
if (pEntry == NULL) {
mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId);
continue;
}
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index);
pStatusEntry->status = p->status;
pEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
}

View File

@ -886,6 +886,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTaskCheckReq req;
SDecoder decoder;
@ -906,10 +907,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
// only the leader node handle the check request
if (!pMeta->leader) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
return -1;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
@ -921,7 +929,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {

View File

@ -16,6 +16,8 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
@ -95,7 +97,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
}
pTask->taskExecInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
@ -111,12 +113,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
// taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}
@ -124,7 +123,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
// taosWUnLockLatch(&pMeta->lock);
return -1;
}
@ -135,8 +133,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}
@ -159,6 +155,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
pMeta->walScanCounter = MAX_REPEAT_SCAN_THRESHOLD;
}
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);

View File

@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already.
@ -201,6 +202,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
pTask->chkInfo.startTs = taosGetTimestampMs();
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
@ -312,15 +315,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId,
el, pTask->checkpointingId);
} else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId,
pTask->id.idStr, remain, pMeta->numOfStreamTasks);
qDebug(
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not "
"ready:%d/%d",
pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks);
}
// send check point response to upstream task

View File

@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem == NULL) {
return pStreamBlocks;
}
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->sourceVer = pSubmit->ver;

View File

@ -16,9 +16,10 @@
#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks);
return code;
}
@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
if ((++numOfBlocks) >= outputBatchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks,
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
break;
}
}
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pStreamBlocks);
return code;
}
size = 0;
} else {
taosArrayDestroy(pRes);
}
@ -525,6 +521,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
while (1) {
int32_t numOfBlocks = 0;
SStreamQueueItem* pInput = NULL;
@ -533,9 +532,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks);
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);

View File

@ -294,6 +294,8 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
}
void streamMetaClose(SStreamMeta* pMeta) {
@ -644,10 +646,12 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);
int32_t vgId = pMeta->vgId;
qInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
qError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
return -1;
}
@ -662,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
doClear(pKey, pVal, pCur, pRecycleList);
return -1;
}
@ -672,9 +678,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually", vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);
@ -731,6 +736,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
qError("vgId:%d failed to close meta-file cursor", vgId);
taosArrayDestroy(pRecycleList);
return -1;
}
@ -743,6 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);

View File

@ -387,7 +387,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total + 1, size, tstrerror(code));
} else {
qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
qDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size);
}
return TSDB_CODE_SUCCESS;

View File

@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
const char* id = pTask->id.idStr;
if (stage == -1) {
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id,
upstreamTaskId, stage);
}
if (pInfo->stage < stage) {
qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage);
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0;
@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
}
int32_t streamSetStatusUnint(SStreamTask* pTask) {
int32_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr);
return -1;
} else {
qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT);
return 0;
}
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);