Merge pull request #26446 from taosdata/fix/cov
refactor(stream): delay checkpointInterval to generate the checkpoint
This commit is contained in:
commit
75d2e96b9a
|
@ -1134,6 +1134,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
||||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||||
ready = false;
|
ready = false;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->hTaskId != 0) {
|
if (pEntry->hTaskId != 0) {
|
||||||
|
@ -1153,6 +1154,27 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
return ready ? 0 : -1;
|
return ready ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
|
||||||
|
int64_t ts = -1;
|
||||||
|
int32_t taskId = -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
|
||||||
|
STaskId *p = taosArrayGet(pTaskList, i);
|
||||||
|
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||||
|
if (pEntry == NULL || pEntry->id.streamId != streamId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
|
||||||
|
ts = pEntry->startTime;
|
||||||
|
taskId = pEntry->id.taskId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t duration;
|
int64_t duration;
|
||||||
|
@ -1191,6 +1213,15 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
||||||
|
if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
||||||
taosArrayPush(pList, &in);
|
taosArrayPush(pList, &in);
|
||||||
|
|
||||||
|
|
|
@ -2187,6 +2187,17 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t colIdComparFn(const void* param1, const void * param2) {
|
||||||
|
int32_t p1 = *(int32_t*) param1;
|
||||||
|
int32_t p2 = *(int32_t*) param2;
|
||||||
|
|
||||||
|
if (p1 == p2) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return (p1 < p2)? -1:1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
|
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||||
|
@ -2203,6 +2214,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
|
SArray* pColList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||||
|
@ -2217,6 +2230,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
||||||
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
||||||
colExists = true;
|
colExists = true;
|
||||||
|
taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2225,6 +2239,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
if (!colExists) {
|
if (!colExists) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
||||||
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||||
|
taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2240,7 +2255,34 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
|
|
||||||
// reset the error code.
|
// reset the error code.
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) {
|
||||||
|
taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySort(pColList, colIdComparFn);
|
||||||
|
|
||||||
|
int32_t i = 0, j = 0;
|
||||||
|
while(i < taosArrayGetSize(pColList)) {
|
||||||
|
int32_t slot1 = *(int32_t*)taosArrayGet(pColList, i);
|
||||||
|
if (slot1 > j) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j);
|
||||||
|
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||||
|
j += 1;
|
||||||
|
} else {
|
||||||
|
i += 1;
|
||||||
|
j += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while(j < taosArrayGetSize(pInfo->pRes->pDataBlock)) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j);
|
||||||
|
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||||
|
j += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pColList);
|
||||||
|
|
||||||
if (filter) {
|
if (filter) {
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
|
|
Loading…
Reference in New Issue