diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 55c1b89c30..474ac5f98d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); pInfo->dataVersion = 0; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; @@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index cc185d2c2c..0b7d6d7693 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -883,7 +883,7 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { - *dest = NULL; + if (dest != NULL) *dest = NULL; return -1; } p = taosDecodeFixedI64(p, &key.unixTimestamp); @@ -1490,9 +1490,13 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons if (pKtmp->opNum != pCur->number) { return -1; } - size_t vlen = 0; - if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (pVLen != NULL) *pVLen = vlen; + + if (pVLen != NULL) { + size_t vlen = 0; + const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); + *pVLen = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); + } + *pKey = pKtmp->key; return 0; } @@ -1555,19 +1559,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); - char buf[128] = {0}; - int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - pCur->db = wrapper->rocksdb; + pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); - rocksdb_iter_prev(pCur->iter); while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { rocksdb_iter_prev(pCur->iter); @@ -1600,18 +1603,14 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* rocksdb_iter_seek(pCur->iter, buf, len); if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { - size_t vlen; - char* val = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (!streamStateValueIsStale(val)) { - SStateKey curKey; - size_t kLen = 0; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); + SStateKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - pCur->number = pState->number; - return pCur; - } + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + pCur->number = pState->number; + return pCur; } } streamStateFreeCur(pCur); @@ -1900,8 +1899,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, winKeyDecode(&winKey, keyStr); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - // char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); if (len < 0) { return -1; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index f8fa7b024a..f246a781c6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -443,9 +443,9 @@ static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpoi } tEncoderClear(&encoder); - initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); + initRpcMsg(&msg, TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); + qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", + pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0; @@ -518,16 +518,16 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pRpcMsgList); ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num); - qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, - num); + qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, + pTask->info.taskLevel, num); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i); tmsgSendRsp(pMsg); } taosArrayClear(pTask->pRpcMsgList); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr, pTask->info.taskLevel); return TSDB_CODE_SUCCESS; } @@ -540,7 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(pMsg); taosArrayClear(pTask->pRpcMsgList); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); return TSDB_CODE_SUCCESS; } @@ -631,7 +631,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist tEncoderClear(&encoder); msg.info.noResp = 1; - initRpcMsg(&msg,TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); + initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); tmsgSendReq(pEpSet, &msg); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); @@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; taosArrayPush(pTask->pRpcMsgList, &rspMsg); - qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t) taosArrayGetSize(pTask->pRpcMsgList)); + qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pRpcMsgList)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 063c15e4f3..6d61f65394 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -386,7 +386,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, 0, buf); // todo handle failure memset(buf, 0, len); -// qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); + // qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } taosMemoryFree(buf); @@ -397,8 +397,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStateClearBatch(batch); int64_t elapsed = taosGetTimestampMs() - st; - qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%"PRId64"ms", pFileState->id, numOfElems, - BATCH_LIMIT, elapsed); + qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms", + pFileState->id, numOfElems, BATCH_LIMIT, elapsed); if (flushState) { const char* taskKey = "streamFileState"; @@ -488,8 +488,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { : pFileState->maxTs - pFileState->deleteMark; deleteExpiredCheckPoint(pFileState, mark); } - void* pStVal = NULL; - int32_t len = 0; SWinKey key = {.groupId = 0, .ts = 0}; SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); @@ -511,6 +509,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { taosMemoryFreeClear(pNode); break; } + ASSERT(pVLen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, pVLen); code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpoint0.sim index 3f2457347a..325667ee2b 100644 --- a/tests/script/tsim/stream/checkpoint0.sim +++ b/tests/script/tsim/stream/checkpoint0.sim @@ -49,6 +49,12 @@ if $data02 != 3 then goto loop0 endi +print waiting for checkpoint generation ...... + +sleep 25000 + +print restart taosd + system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start