Merge pull request #21364 from taosdata/enh/rocksdbSstate

state merge into 3.0
This commit is contained in:
Haojun Liao 2023-05-18 17:25:44 +08:00 committed by GitHub
commit dbd52ecffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 299 additions and 196 deletions

View File

@ -82,7 +82,7 @@ typedef struct STuplePos {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
SWinKey streamTupleKey;
};
} STuplePos;

View File

@ -86,9 +86,8 @@ typedef struct {
int64_t number;
} SStreamStateCur;
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);

View File

@ -110,14 +110,14 @@ typedef struct {
int64_t ver;
int32_t* dataRef;
SPackedData submit;
} SStreamDataSubmit2;
} SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* submits; // SArray<SPackedSubmit>
} SStreamMergedSubmit2;
} SStreamMergedSubmit;
typedef struct {
int8_t type;
@ -205,10 +205,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct {
char* qmsg;

View File

@ -38,8 +38,8 @@ typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
#ifdef __cplusplus
}

View File

@ -147,8 +147,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
int32_t vgId);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta

View File

@ -325,7 +325,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT);
*pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id);
@ -445,8 +445,8 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen,
pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr);
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
pReader->nextBlk, numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
@ -703,7 +703,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, const char* id) {
SColVal colVal;
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
// schema:%d",
// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
sourceIdx++;
continue;

View File

@ -132,7 +132,8 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.currentVer);
}
}
@ -161,7 +162,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
}
streamMetaReleaseTask(pStreamMeta, pTask);
}

View File

@ -17,12 +17,12 @@
#include "tmsg.h"
#include "tq.h"
#define MAX_CATCH_NUM 10240
#define MAX_CACHE_TABLE_INFO_NUM 10240
typedef struct STblInfo {
typedef struct STableSinkInfo {
uint64_t uid;
char tbName[TSDB_TABLE_NAME_LEN];
} STblInfo;
} STableSinkInfo;
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) {
@ -97,19 +97,21 @@ end:
return ret;
}
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) {
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t));
static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
if (pVal) {
*pTbl = *(STblInfo**)pVal;
*pInfo = *(STableSinkInfo**)pVal;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
return TSDB_CODE_SUCCESS;
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) {
return TSDB_CODE_FAILED;
}
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
}
@ -274,7 +276,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
crTblArray = NULL;
} else {
SSubmitTbData tbData = {0};
tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows);
tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end;
@ -283,35 +285,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version;
STblInfo* pTblMeta = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta);
STableSinkInfo* pTableSinkInfo = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
if (res != TSDB_CODE_SUCCESS) {
pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo));
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo));
}
char* ctbName = pDataBlock->info.parTbName;
if (!ctbName[0]) {
if (res == TSDB_CODE_SUCCESS) {
memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName));
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
} else {
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
memcpy(ctbName, tmp, strlen(tmp));
memcpy(pTblMeta->tbName, tmp, strlen(tmp));
memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode),
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId);
}
}
if (res == TSDB_CODE_SUCCESS) {
tbData.uid = pTblMeta->uid;
tbData.uid = pTableSinkInfo->uid;
} else {
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
taosMemoryFree(pTableSinkInfo);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL;
@ -371,7 +373,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type);
metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
taosMemoryFree(pTableSinkInfo);
continue;
}
@ -380,13 +382,16 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
taosMemoryFree(pTableSinkInfo);
continue;
}
tbData.uid = mr.me.uid;
pTblMeta->uid = mr.me.uid;
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta);
pTableSinkInfo->uid = mr.me.uid;
int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);
}
metaReaderClear(&mr);
}
}

View File

@ -18,7 +18,8 @@
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define NO_POLL_CNT 5
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId);
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
@ -103,7 +104,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64,
tqDebug("tmq poll: consumer:0x%" PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
return 0;
} else {
@ -152,7 +154,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
@ -196,11 +199,11 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end:
{
end : {
char buf[80] = {0};
tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
}
@ -208,7 +211,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
return code;
}
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
@ -272,8 +276,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId,
pRequest->epoch, vgId, fetchVer, pHead->msgType);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
@ -301,7 +305,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey);
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
goto end;
}
@ -363,7 +368,8 @@ static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int
pMsgHead->walever = ever;
}
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) {
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
int32_t vgId) {
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);

View File

@ -299,7 +299,6 @@ typedef struct SPartitionBySupporter {
typedef struct SPartitionDataInfo {
uint64_t groupId;
char* tbname;
SArray* tags;
SArray* rowIds;
} SPartitionDataInfo;

View File

@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
return pBlock;
}
void freePartItem(void* ptr) {
SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr;
taosArrayDestroy(pPart->rowIds);
}
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);

View File

@ -2674,6 +2674,29 @@ TSKEY compareTs(void* pKey) {
return pWinKey->ts;
}
int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) {
if (pCtx->subsidiaries.rowLen == 0) {
int32_t rowLen = 0;
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
rowLen += pc->pExpr->base.resSchema.bytes;
}
return rowLen + pCtx->subsidiaries.num * sizeof(bool);
} else {
return pCtx->subsidiaries.rowLen;
}
}
int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
int32_t size = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
size = TMAX(size, resSize);
}
return size;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
@ -2720,8 +2743,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
pInfo->pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -2730,10 +2756,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->numOfChild = numOfChild;
@ -2766,7 +2788,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0;
@ -4889,9 +4912,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
pInfo->pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -4912,10 +4939,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo
@ -4928,7 +4951,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,

View File

@ -883,10 +883,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
}
pStart += pDstCol->info.bytes;
}
if (pCtx->saveHandle.pState) {
streamFreeVal((void*)p);
}
}
return TSDB_CODE_SUCCESS;
@ -3123,7 +3119,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf;
}
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key,
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key,
STuplePos* pPos) {
STuplePos p = {0};
if (pHandle->pBuf != NULL) {
@ -3171,7 +3167,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx);
STupleKey key;
SWinKey key;
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
@ -3179,7 +3175,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
key.exprIdx = pCtx->exprIdx;
}
}

View File

@ -96,8 +96,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
}
int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
@ -294,23 +294,26 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total, size);
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
total, size);
streamDataSubmitDestroy(pSubmitBlock);
taosFreeQitem(pSubmitBlock);
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
px->submit.msgLen, px->submit.ver, numOfBlocks, size);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
taosWriteQitem(pTask->inputQueue->queue, pItem);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
@ -331,10 +334,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
@ -356,6 +355,4 @@ void* streamQueueNextItem(SStreamQueue* queue) {
}
}
void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
}
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }

View File

@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
return 0;
}
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
if (pDataSubmit == NULL) {
return NULL;
}
@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) {
return NULL;
}
pDataSubmit->submit = submit;
pDataSubmit->submit = *pData;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = type;
return pDataSubmit;
}
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) {
}
}
SStreamMergedSubmit2* streamMergedSubmitNew() {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0);
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
if (pMerged == NULL) {
return NULL;
}
@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() {
return pMerged;
}
int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) {
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver;
return 0;
}
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) {
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) {
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
int32_t len = 0;
if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
len = pSubmit->submit.msgLen;
}
SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len);
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
if (pSubmitClone == NULL) {
return NULL;
}
streamDataSubmitRefInc(pSubmit);
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2));
memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
return pSubmitClone;
}
@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst;
SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem;
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
// todo handle error
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst);
taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged;
@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitDestroy((SStreamDataSubmit2*)data);
streamDataSubmitDestroy((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data;
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) {
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);

View File

@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0;
while (1) {
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
@ -370,11 +370,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qRes->blocks = pRes;
if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput;
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput;
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
@ -411,7 +411,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
(!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}

View File

@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
return (SStreamQueueRes){0};
}
#endif
#define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16
// todo refactor:
// read data from input queue
typedef struct SQueueReader {
SStreamQueue* pQueue;
int32_t taskLevel;
int32_t maxBlocks; // maximum block in one batch
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) {
int32_t numOfBlocks = 0;
int32_t tryCount = 0;
SStreamQueueItem* pRet = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue);
if (qItem == NULL) {
if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) {
tryCount++;
taosMsleep(1);
qDebug("===stream===try again batchSize:%d", numOfBlocks);
continue;
}
qDebug("===stream===break batchSize:%d", numOfBlocks);
break;
}
if (pRet == NULL) {
pRet = qItem;
streamQueueProcessSuccess(pReader->pQueue);
if (pReader->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) {
streamQueueProcessFail(pReader->pQueue);
break;
} else {
numOfBlocks++;
pRet = newRet;
streamQueueProcessSuccess(pReader->pQueue);
if (numOfBlocks > pReader->maxBlocks) {
qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr);
break;
}
}
}
}
return pRet;
}

View File

@ -272,26 +272,30 @@ int32_t streamStateCommit(SStreamState* pState) {
#endif
}
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFuncPut_rocksdb(pState, key, value, vLen);
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
#else
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
#endif
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
#else
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
#endif
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
#ifdef USE_ROCKSDB
return streamStateFuncDel_rocksdb(pState, key);
#else
return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
#endif
}

View File

@ -32,6 +32,7 @@ struct SStreamFileState {
SSHashObj* rowBuffMap;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
@ -45,8 +46,8 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark) {
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
@ -58,6 +59,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) {
goto _error;
}
rowSize += selectRowSize;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
@ -69,11 +71,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
}
pFileState->keyLen = keySize;
pFileState->rowSize = rowSize;
pFileState->selectivityRowSize = selectRowSize;
pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->getTs = fp;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = INT64_MIN;
@ -441,9 +443,12 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark);
}
void* pStVal = NULL;
int32_t len = 0;
@ -479,3 +484,5 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
return TSDB_CODE_SUCCESS;
}
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }