Merge pull request #26065 from taosdata/fix/3_liaohj

fix(stream): always load the data for fill-history scan.
This commit is contained in:
Haojun Liao 2024-06-07 14:01:50 +08:00 committed by GitHub
commit cbd9f86a34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 275 additions and 202 deletions

View File

@ -106,8 +106,8 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq); int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode); int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
@ -121,6 +121,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo(); void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot); void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -56,13 +56,13 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot); static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -792,7 +792,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name); mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
// execute creation // execute creation
@ -815,7 +815,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
} else { } else {
char detail[1000] = {0}; char detail[1000] = {0};
sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname); snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail)); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
} }
@ -827,6 +827,7 @@ _OVER:
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq); tFreeSCMCreateStreamReq(&createReq);
tFreeStreamObj(&streamObj); tFreeStreamObj(&streamObj);
if (sql != NULL) { if (sql != NULL) {
taosMemoryFreeClear(sql); taosMemoryFreeClear(sql);
} }
@ -839,6 +840,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
void *pIter = NULL; void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t maxChkptId = 0; int64_t maxChkptId = 0;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break; if (pIter == NULL) break;
@ -1031,10 +1033,9 @@ _ERR:
return code; return code;
} }
int32_t initStreamNodeList(SMnode *pMnode) { int32_t extractStreamNodeList(SMnode *pMnode) {
if (taosArrayGetSize(execInfo.pNodeList) == 0) { if (taosArrayGetSize(execInfo.pNodeList) == 0) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); extractNodeListFromStream(pMnode, execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
} }
return taosArrayGetSize(execInfo.pNodeList); return taosArrayGetSize(execInfo.pNodeList);
@ -1044,7 +1045,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not // check if the node update happens or not
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) { if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec(); execInfo.ts = taosGetTimestampSec();
@ -1071,22 +1072,23 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
} }
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) { if (nodeUpdated) {
mDebug("stream task not ready due to node update"); mDebug("stream tasks not ready due to node update");
} }
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated; return nodeUpdated;
} }
static int32_t mndCheckNodeStatus(SMnode *pMnode) { static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true; bool ready = true;
int64_t ts = taosGetTimestampSec();
if (taskNodeIsUpdated(pMnode)) { if (taskNodeIsUpdated(pMnode)) {
return -1; return -1;
} }
@ -1094,7 +1096,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) { if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts; ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0);
} }
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
@ -1148,12 +1150,13 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t code = 0; int32_t code = 0;
int32_t numOfCheckpointTrans = 0; int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) { if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return code; return code;
} }
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
int64_t duration = now - pStream->checkpointFreq; int64_t duration = now - pStream->checkpointFreq;
if (duration < tsStreamCheckpointInterval * 1000) { if (duration < tsStreamCheckpointInterval * 1000) {
@ -1572,7 +1575,9 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) { if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64
" no valid status/stage info",
id.taskId, pStream->name, pStream->uid, pStream->createTime);
return -1; return -1;
} }
@ -2164,7 +2169,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0; return 0;
} }
static SArray *extractNodeListFromStream(SMnode *pMnode) { static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
@ -2193,13 +2198,13 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); taosArrayClear(pNodeList);
// convert to list // convert to list
pIter = NULL; pIter = NULL;
while ((pIter = taosHashIterate(pHash, pIter)) != NULL) { while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
SNodeEntry *pEntry = (SNodeEntry *)pIter; SNodeEntry *pEntry = (SNodeEntry *)pIter;
taosArrayPush(plist, pEntry); taosArrayPush(pNodeList, pEntry);
char buf[256] = {0}; char buf[256] = {0};
epsetToStr(&pEntry->epset, buf, tListLen(buf)); epsetToStr(&pEntry->epset, buf, tListLen(buf));
@ -2207,27 +2212,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
} }
taosHashCleanup(pHash); taosHashCleanup(pHash);
return plist;
}
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2244,7 +2228,7 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) {
return false; return false;
} }
int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
@ -2262,10 +2246,7 @@ int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) {
} }
} }
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { removeTasksInBuf(pRemovedTasks, &execInfo);
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList)); (int32_t)taosArrayGetSize(execInfo.pTaskList));
@ -2292,7 +2273,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) { if (numOfNodes == 0) {
@ -2312,7 +2293,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
} }
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
removeExpiredNodeEntryAndTask(pNodeSnapshot);
removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
@ -2322,8 +2304,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
// keep the new vnode snapshot if success // keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
taosArrayDestroy(execInfo.pNodeList); extractNodeListFromStream(pMnode, execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
execInfo.ts = ts; execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d", mDebug("create trans successfully, update cached node list, numOfNodes:%d",
(int)taosArrayGetSize(execInfo.pNodeList)); (int)taosArrayGetSize(execInfo.pNodeList));
@ -2360,7 +2341,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0; return 0;
} }
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
@ -2400,37 +2381,6 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
} }
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
}
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList); int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {

View File

@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo {
int32_t transId; int32_t transId;
} SFailedCheckpointInfo; } SFailedCheckpointInfo;
static void doExtractTasksFromStream(SMnode *pMnode) { static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
@ -33,11 +33,44 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
break; break;
} }
saveStreamTasksInfo(pStream, &execInfo); saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
} }
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pMnode == NULL) {
return;
}
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray *pIdList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t));
if (p != NULL) {
continue;
}
void* pObj = mndGetStreamObj(pMnode, pId->streamId);
if (pObj != NULL) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pIdList, pId);
}
}
removeTasksInBuf(pIdList, &execInfo);
taosArrayDestroy(pIdList);
taosHashCleanup(pHash);
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) { for (int32_t j = 0; j < numOfNodes; ++j) {
@ -230,7 +263,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
SArray *pFailedTasks = NULL; SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL; SArray *pOrphanTasks = NULL;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
@ -252,17 +285,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
// extract stream task list // extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) { if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode); addAllStreamTasksIntoBuf(pMnode, &execInfo);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreamTasksInBuf(pMnode, &execInfo);
} }
initStreamNodeList(pMnode); extractStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) { if (numOfUpdated > 0) {
@ -290,16 +327,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} else { } else {
// task is idle for more than 50 sec. // task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { // if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) { // if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++; // pTaskEntry->inputQUnchangeCounter++;
} else { // } else {
pTaskEntry->inputQChanging = false; // pTaskEntry->inputQChanging = false;
} // }
} else { // } else {
pTaskEntry->inputQChanging = true; // pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0; // pTaskEntry->inputQUnchangeCounter = 0;
} // }
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);
@ -310,7 +347,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = { SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info); addIntoCheckpointList(pFailedChkpt, &info);
} }
} }
@ -328,7 +365,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans // current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal // kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) { if (taosArrayGetSize(pFailedChkpt) > 0) {
bool allReady = true; bool allReady = true;
if (pMnode != NULL) { if (pMnode != NULL) {
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
@ -339,8 +376,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId); pInfo->checkpointId, pInfo->transId);
@ -359,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req); tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedTasks); taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks); taosArrayDestroy(pOrphanTasks);
{ {

View File

@ -26,6 +26,8 @@ struct SStreamTaskIter {
SStreamTask *pTask; SStreamTask *pTask;
}; };
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) { if (pIter == NULL) {
@ -598,3 +600,49 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList)); mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList));
} }
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS;
}
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i);
doRemoveTasks(pExecInfo, pId);
}
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
doRemoveTasks(pExecNode, &id);
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}

View File

@ -138,7 +138,6 @@ void initStreamExecInfo() {
} }
void initNodeInfo() { void initNodeInfo() {
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
SNodeEntry entry = {0}; SNodeEntry entry = {0};
entry.nodeId = 2; entry.nodeId = 2;
entry.stageUpdated = true; entry.stageUpdated = true;
@ -207,27 +206,32 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
killAllCheckpointTrans(pMnode, &info); killAllCheckpointTrans(pMnode, &info);
SStreamObj stream; void* p = alloca(sizeof(SStreamObj) + sizeof(SSdbRow));
memset(&stream, 0, sizeof(SStreamObj)); SSdbRow* pRow = static_cast<SSdbRow*>(p);
pRow->type = SDB_MAX;
stream.uid = defStreamId; SStreamObj* pStream = (SStreamObj*)((char*)p + sizeof(SSdbRow));
stream.lock = 0;
stream.tasks = taosArrayInit(1, POINTER_BYTES); memset(pStream, 0, sizeof(SStreamObj));
stream.pHTasksList = taosArrayInit(1, POINTER_BYTES);
pStream->uid = defStreamId;
pStream->lock = 0;
pStream->tasks = taosArrayInit(1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(1, POINTER_BYTES);
SArray* pLevel = taosArrayInit(1, POINTER_BYTES); SArray* pLevel = taosArrayInit(1, POINTER_BYTES);
SStreamTask* pTask = static_cast<SStreamTask*>(taosMemoryCalloc(1, sizeof(SStreamTask))); SStreamTask* pTask = static_cast<SStreamTask*>(taosMemoryCalloc(1, sizeof(SStreamTask)));
pTask->id.streamId = defStreamId; pTask->id.streamId = defStreamId;
pTask->id.taskId = 1; pTask->id.taskId = 1;
pTask->exec.qmsg = (char*)taosMemoryMalloc(1); pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1);
taosThreadMutexInit(&pTask->lock, NULL); taosThreadMutexInit(&pTask->lock, NULL);
taosArrayPush(pLevel, &pTask); taosArrayPush(pLevel, &pTask);
taosArrayPush(stream.tasks, &pLevel); taosArrayPush(pStream->tasks, &pLevel);
mndCreateStreamResetStatusTrans(pMnode, &stream); mndCreateStreamResetStatusTrans(pMnode, pStream);
tFreeStreamObj(&stream); tFreeStreamObj(pStream);
sdbCleanup(pMnode->pSdb); sdbCleanup(pMnode->pSdb);
taosMemoryFree(pMnode); taosMemoryFree(pMnode);

View File

@ -162,7 +162,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SPackedData tmp = {.pDataBlock = input}; SPackedData tmp = {.pDataBlock = input};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
pInfo->blockType = STREAM_INPUT__CHECKPOINT; pInfo->blockType = STREAM_INPUT__CHECKPOINT;

View File

@ -989,13 +989,15 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
colDataSetNULL(pColInfo, pBlock->info.rows); colDataSetNULL(pColInfo, pBlock->info.rows);
} }
} }
if (bFreeRow) {
taosMemoryFree(buf);
}
if (*(int32_t*)pStart != pStart - buf) { if (*(int32_t*)pStart != pStart - buf) {
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
(int32_t)(pStart - buf)); (int32_t)(pStart - buf));
}; }
if (bFreeRow) {
taosMemoryFree(buf);
}
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;

View File

@ -454,7 +454,12 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
scanPathOptSetGroupOrderScan(info.pScan); scanPathOptSetGroupOrderScan(info.pScan);
} }
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
if (pCxt->pPlanCxt->streamQuery) {
info.pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; // always load all data for stream query
} else {
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
}
info.pScan->pDynamicScanFuncs = info.pDsoFuncs; info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
} }
if (TSDB_CODE_SUCCESS == code && info.pScan) { if (TSDB_CODE_SUCCESS == code && info.pScan) {

View File

@ -224,6 +224,8 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
typedef int32_t (*__stream_async_exec_fn_t)(void* param); typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -195,12 +195,6 @@ int32_t getCfIdx(const char* cfName) {
bool isValidCheckpoint(const char* dir) { bool isValidCheckpoint(const char* dir) {
return true; return true;
STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir);
if (pDb == NULL) {
return false;
}
taskDbDestroy(pDb, false);
return true;
} }
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {

View File

@ -276,6 +276,12 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int8_t type = pTask->outputInfo.type; int8_t type = pTask->outputInfo.type;
pActiveInfo->allUpstreamTriggerRecv = 1; pActiveInfo->allUpstreamTriggerRecv = 1;
// We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
// The transfer of state may generate new data that need to dispatch to downstream tasks,
// Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
// before the next checkpoint.
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask); continueDispatchCheckpointTriggerBlock(pBlock, pTask);
@ -306,8 +312,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards } else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
// this task already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} }
} }
@ -507,7 +516,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks);
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
@ -526,6 +535,8 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
} }
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char buf[128] = {0};
char* file = taosMemoryCalloc(1, strlen(path) + 32); char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
@ -537,12 +548,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
} }
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0}; if (pFile == NULL) {
stError("%s failed to open meta file:%s for checkpoint", id, file);
code = -1;
return code;
}
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("chkp failed to read meta file:%s", file); stError("%s failed to read meta file:%s for checkpoint", id, file);
code = -1; code = -1;
} else { } else {
int32_t len = strlen(buf); int32_t len = strnlen(buf, tListLen(buf));
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
if (buf[i] == '\n') { if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1); char* item = taosMemoryCalloc(1, i + 1);

View File

@ -246,6 +246,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
return code; return code;
} }
@ -730,8 +731,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
} else { } else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear " "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"checkpoint-ready msg, ref:%d", "and quit from timer, ref:%d",
id, vgId, ref); id, vgId, ref);
streamClearChkptReadyMsg(pTask); streamClearChkptReadyMsg(pTask);

View File

@ -24,6 +24,7 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) { bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
@ -87,8 +88,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code; return code;
} }
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
@ -541,6 +541,75 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) {
const char* id = pTask->id.idStr;
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
SCheckpointInfo* pInfo = &pTask->chkInfo;
int64_t ver = pInfo->processedVer;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
doSetStreamInputBlock(pTask, pBlock, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
}
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
const char* id = pTask->id.idStr;
// 1. transfer the ownership of executor state
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
} else {
stDebug("s-task:%s no transfer-state needed", id);
}
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
doStreamTaskExecImpl(pTask, pCheckpointBlock);
}
/** /**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
@ -628,63 +697,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
} }
} }
if (type == STREAM_INPUT__CHECKPOINT) { if (type != STREAM_INPUT__CHECKPOINT) {
// transfer the state from fill-history to related stream task before generating the checkpoint. doStreamTaskExecImpl(pTask, pInput);
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
}
}
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type));
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
SCheckpointInfo* pInfo = &pTask->chkInfo;
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
} }
streamFreeQitem(pInput); streamFreeQitem(pInput);
@ -692,7 +706,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// todo other thread may change the status // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock // todo add lock
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState->state == TASK_STATUS__CK) {
@ -717,8 +730,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
return 0; return 0;
} }
} }
return 0;
} }
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not

View File

@ -298,6 +298,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) { if (code != 0) {
taosArrayDestroy(pSnapInfoSet);
return -1; return -1;
} }

View File

@ -624,18 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) { if ((pInfo != NULL) && pInfo->dataAllowed) {
pInfo->dataAllowed = false; pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t <= streamTaskGetNumOfUpstream(pTask));
} }
} }
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) { if ((pInfo != NULL) && (!pInfo->dataAllowed)) {
pInfo->dataAllowed = true;
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0); ASSERT(t >= 0);
pInfo->dataAllowed = true;
} }
} }