diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a4f1e2ef94..c9d0eebe2b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -309,6 +309,8 @@ typedef struct STimeWindowAggSupp { int64_t waterMark; TSKEY maxTs; TSKEY minTs; + TSKEY checkPointTs; + TSKEY checkPointInterval; SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; @@ -360,9 +362,10 @@ typedef struct SStreamScanInfo { int32_t blockRecoverTotCnt; SSDataBlock* pRecoverRes; - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; + SStreamState* pState; } SStreamScanInfo; typedef struct { @@ -436,6 +439,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pPullDataRes; bool isFinal; SArray* pChildren; + int32_t numOfChild; SStreamState* pState; SWinKey delKey; uint64_t numOfDatapack; @@ -567,7 +571,6 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); -void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey, void* pState); @@ -658,6 +661,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSu SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); +void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index d1bde3dba4..70200b067e 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1033,6 +1033,31 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* return TSDB_CODE_SUCCESS; } +int32_t resultRowEncode(void* k, int32_t* size, char* buf) { + // SResultRow* key = k; + // int len = 0; + // int struLen = *size; + // len += taosEncodeFixedI32((void**)&buf, key->pageId); + + // uint32_t offset = key->offset; + // len += taosEncodeFixedU32((void**)&buf, offset); + + // len += taosEncodeFixedI8((void**)&buf, key->startInterp); + // len += taosEncodeFixedI8((void**)&buf, key->endInterp); + // len += taosEncodeFixedI8((void**)&buf, key->closed); + // len += taosEncodeFixedU32((void**)&buf, key->numOfRows); + + // len += taosEncodeFixedI64((void**)&buf, key->win.skey); + // len += taosEncodeFixedI64((void**)&buf, key->win.ekey); + + // int32_t numOfEntryInfo = (struLen - sizeof(SResultRow)) / sizeof(struct SResultRowEntryInfo); + // len += taosEncodeFixedI32((void**)&buf, numOfEntryInfo); + // for (int i = 0; i < numOfEntryInfo; i++) { + // SResultRowEntryInfo* p = &key->pEntryInfo[i]; + + // uint8_t value = p->initialized ? 1 : 0; + // len += taosEncodeFixedU8((void**)&buf, value); + // value = p->complete ? 1 : 0; // len += taosEncodeFixedU8((void**)&buf, value);