Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/commit_txn
This commit is contained in:
commit
71a2db6cf2
|
@ -945,7 +945,7 @@ MIN(expr)
|
||||||
MODE(expr)
|
MODE(expr)
|
||||||
```
|
```
|
||||||
|
|
||||||
**Description**:The value which has the highest frequency of occurrence. NULL is returned if there are multiple values which have highest frequency of occurrence.
|
**Description**:The value which has the highest frequency of occurrence. One random value is returned if there are multiple values which have highest frequency of occurrence.
|
||||||
|
|
||||||
**Return value type**: Same as the input data
|
**Return value type**: Same as the input data
|
||||||
|
|
||||||
|
|
|
@ -946,7 +946,7 @@ MIN(expr)
|
||||||
MODE(expr)
|
MODE(expr)
|
||||||
```
|
```
|
||||||
|
|
||||||
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出NULL。
|
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,则随机输出其中某个值。
|
||||||
|
|
||||||
**返回数据类型**:与输入数据类型一致。
|
**返回数据类型**:与输入数据类型一致。
|
||||||
|
|
||||||
|
|
|
@ -44,12 +44,17 @@ enum {
|
||||||
)
|
)
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
typedef struct {
|
typedef struct SWinKey {
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
} SWinKey;
|
} SWinKey;
|
||||||
|
|
||||||
static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
|
typedef struct SSessionKey {
|
||||||
|
STimeWindow win;
|
||||||
|
uint64_t groupId;
|
||||||
|
} SSessionKey;
|
||||||
|
|
||||||
|
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
|
||||||
SWinKey* pWin1 = (SWinKey*)pKey1;
|
SWinKey* pWin1 = (SWinKey*)pKey1;
|
||||||
SWinKey* pWin2 = (SWinKey*)pKey2;
|
SWinKey* pWin2 = (SWinKey*)pKey2;
|
||||||
|
|
||||||
|
@ -69,7 +74,7 @@ static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
return sWinKeyCmprImpl(pKey1, pKey2);
|
return winKeyCmprImpl(pKey1, pKey2);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
|
|
||||||
|
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
|
||||||
|
|
||||||
// incremental state storage
|
// incremental state storage
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SStreamTask* pOwner;
|
SStreamTask* pOwner;
|
||||||
|
@ -32,6 +34,7 @@ typedef struct {
|
||||||
TTB* pStateDb;
|
TTB* pStateDb;
|
||||||
TTB* pFuncStateDb;
|
TTB* pFuncStateDb;
|
||||||
TTB* pFillStateDb; // todo refactor
|
TTB* pFillStateDb; // todo refactor
|
||||||
|
TTB* pSessionStateDb;
|
||||||
TXN txn;
|
TXN txn;
|
||||||
int32_t number;
|
int32_t number;
|
||||||
} SStreamState;
|
} SStreamState;
|
||||||
|
@ -57,6 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateClear(SStreamState* pState);
|
int32_t streamStateClear(SStreamState* pState);
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
||||||
|
|
||||||
|
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
||||||
|
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
|
||||||
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
||||||
|
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
|
||||||
|
int32_t streamStateSessionClear(SStreamState* pState);
|
||||||
|
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen);
|
||||||
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
|
||||||
|
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key);
|
||||||
|
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key);
|
||||||
|
|
||||||
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
|
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
|
||||||
|
|
|
@ -1892,12 +1892,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
|
|
||||||
for (int32_t k = 0; k < colNum; k++) {
|
for (int32_t k = 0; k < colNum; k++) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
|
||||||
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* var = colDataGetData(pColInfoData, j);
|
||||||
switch (pColInfoData->info.type) {
|
switch (pColInfoData->info.type) {
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
memset(pBuf, 0, sizeof(pBuf));
|
memset(pBuf, 0, sizeof(pBuf));
|
||||||
|
@ -1926,8 +1927,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
// len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
||||||
if (len >= size - 1) return dumpBuf;
|
// if (len >= size - 1) return dumpBuf;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
||||||
|
|
|
@ -745,6 +745,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
||||||
mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
|
mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// ASSERT(0);
|
||||||
|
|
||||||
if (cfg.myIndex == -1) {
|
if (cfg.myIndex == -1) {
|
||||||
#if 1
|
#if 1
|
||||||
|
|
|
@ -293,6 +293,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
|
} else if (code > 0) {
|
||||||
|
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
||||||
|
taosWLockLatch(&pMgmt->lock);
|
||||||
|
pMgmt->transId = 0;
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
|
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
|
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
||||||
|
code = 0;
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
|
||||||
|
|
|
@ -53,6 +53,11 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
|
||||||
|
|
||||||
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
||||||
|
|
||||||
|
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
|
||||||
|
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
|
||||||
|
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
|
||||||
|
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
// when this task starts to execute, this status will set
|
// when this task starts to execute, this status will set
|
||||||
TASK_NOT_COMPLETED = 0x1u,
|
TASK_NOT_COMPLETED = 0x1u,
|
||||||
|
@ -434,15 +439,15 @@ typedef struct SCatchSupporter {
|
||||||
} SCatchSupporter;
|
} SCatchSupporter;
|
||||||
|
|
||||||
typedef struct SStreamAggSupporter {
|
typedef struct SStreamAggSupporter {
|
||||||
SHashObj* pResultRows;
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||||
SArray* pCurWins;
|
SSDataBlock* pScanBlock;
|
||||||
int32_t valueSize;
|
SStreamState* pState;
|
||||||
int32_t keySize;
|
int64_t gap; // stream session window gap
|
||||||
char* pKeyBuf; // window key buffer
|
SqlFunctionCtx* pDummyCtx; // for combine
|
||||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SSHashObj* pResultRows;
|
||||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
int32_t stateKeySize;
|
||||||
int32_t currentPageId; // buffer page that is active
|
int16_t stateKeyType;
|
||||||
SSDataBlock* pScanBlock;
|
SDiskbasedBuf* pResultBuf;
|
||||||
} SStreamAggSupporter;
|
} SStreamAggSupporter;
|
||||||
|
|
||||||
typedef struct SWindowSupporter {
|
typedef struct SWindowSupporter {
|
||||||
|
@ -736,42 +741,54 @@ typedef struct SSessionAggOperatorInfo {
|
||||||
} SSessionAggOperatorInfo;
|
} SSessionAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SResultWindowInfo {
|
typedef struct SResultWindowInfo {
|
||||||
SResultRowPosition pos;
|
void* pOutputBuf;
|
||||||
STimeWindow win;
|
SSessionKey sessionWin;
|
||||||
uint64_t groupId;
|
|
||||||
bool isOutput;
|
bool isOutput;
|
||||||
bool isClosed;
|
|
||||||
} SResultWindowInfo;
|
} SResultWindowInfo;
|
||||||
|
|
||||||
typedef struct SStateWindowInfo {
|
typedef struct SStateWindowInfo {
|
||||||
SResultWindowInfo winInfo;
|
SResultWindowInfo winInfo;
|
||||||
SStateKeys stateKey;
|
SStateKeys* pStateKey;
|
||||||
} SStateWindowInfo;
|
} SStateWindowInfo;
|
||||||
|
|
||||||
typedef struct SStreamSessionAggOperatorInfo {
|
typedef struct SStreamSessionAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SStreamAggSupporter streamAggSup;
|
SStreamAggSupporter streamAggSup;
|
||||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
int64_t gap; // session window gap
|
|
||||||
int32_t primaryTsIndex; // primary timestamp slot id
|
int32_t primaryTsIndex; // primary timestamp slot id
|
||||||
int32_t endTsIndex; // window end timestamp slot id
|
int32_t endTsIndex; // window end timestamp slot id
|
||||||
int32_t order; // current SSDataBlock scan order
|
int32_t order; // current SSDataBlock scan order
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
SSDataBlock* pWinBlock; // window result
|
SSDataBlock* pWinBlock; // window result
|
||||||
SqlFunctionCtx* pDummyCtx; // for combine
|
SSDataBlock* pDelRes; // delete result
|
||||||
SSDataBlock* pDelRes; // delete result
|
SSDataBlock* pUpdateRes; // update window
|
||||||
SSDataBlock* pUpdateRes; // update window
|
|
||||||
bool returnUpdate;
|
bool returnUpdate;
|
||||||
SHashObj* pStDeleted;
|
SSHashObj* pStDeleted;
|
||||||
void* pDelIterator;
|
void* pDelIterator;
|
||||||
SArray* pChildren; // cache for children's result; final stream operator
|
SArray* pChildren; // cache for children's result; final stream operator
|
||||||
SPhysiNode* pPhyNode; // create new child
|
SPhysiNode* pPhyNode; // create new child
|
||||||
bool isFinal;
|
bool isFinal;
|
||||||
bool ignoreExpiredData;
|
bool ignoreExpiredData;
|
||||||
SHashObj* pGroupIdTbNameMap;
|
SHashObj* pGroupIdTbNameMap;
|
||||||
} SStreamSessionAggOperatorInfo;
|
} SStreamSessionAggOperatorInfo;
|
||||||
|
|
||||||
|
typedef struct SStreamStateAggOperatorInfo {
|
||||||
|
SOptrBasicInfo binfo;
|
||||||
|
SStreamAggSupporter streamAggSup;
|
||||||
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
|
SGroupResInfo groupResInfo;
|
||||||
|
int32_t primaryTsIndex; // primary timestamp slot id
|
||||||
|
STimeWindowAggSupp twAggSup;
|
||||||
|
SColumn stateCol;
|
||||||
|
SSDataBlock* pDelRes;
|
||||||
|
SSHashObj* pSeDeleted;
|
||||||
|
void* pDelIterator;
|
||||||
|
SArray* pChildren; // cache for children's result;
|
||||||
|
bool ignoreExpiredData;
|
||||||
|
SHashObj* pGroupIdTbNameMap;
|
||||||
|
} SStreamStateAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SStreamPartitionOperatorInfo {
|
typedef struct SStreamPartitionOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SPartitionBySupporter partitionSup;
|
SPartitionBySupporter partitionSup;
|
||||||
|
@ -834,24 +851,6 @@ typedef struct SStateWindowOperatorInfo {
|
||||||
const SNode* pCondition;
|
const SNode* pCondition;
|
||||||
} SStateWindowOperatorInfo;
|
} SStateWindowOperatorInfo;
|
||||||
|
|
||||||
typedef struct SStreamStateAggOperatorInfo {
|
|
||||||
SOptrBasicInfo binfo;
|
|
||||||
SStreamAggSupporter streamAggSup;
|
|
||||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
|
||||||
SGroupResInfo groupResInfo;
|
|
||||||
int32_t primaryTsIndex; // primary timestamp slot id
|
|
||||||
int32_t order; // current SSDataBlock scan order
|
|
||||||
STimeWindowAggSupp twAggSup;
|
|
||||||
SColumn stateCol;
|
|
||||||
SqlFunctionCtx* pDummyCtx; // for combine
|
|
||||||
SSDataBlock* pDelRes;
|
|
||||||
SHashObj* pSeDeleted;
|
|
||||||
void* pDelIterator;
|
|
||||||
SArray* pChildren; // cache for children's result;
|
|
||||||
bool ignoreExpiredData;
|
|
||||||
SHashObj* pGroupIdTbNameMap;
|
|
||||||
} SStreamStateAggOperatorInfo;
|
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
|
@ -1064,13 +1063,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
|
||||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
||||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
|
|
||||||
int32_t size);
|
|
||||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
|
||||||
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
|
||||||
int64_t gap, int32_t* pIndex);
|
|
||||||
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
|
||||||
int64_t gap, int32_t* pIndex);
|
|
||||||
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
||||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
||||||
|
@ -1100,6 +1094,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||||
SGroupResInfo* pGroupResInfo);
|
SGroupResInfo* pGroupResInfo);
|
||||||
|
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
|
||||||
|
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
|
||||||
|
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
|
||||||
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
||||||
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
||||||
|
|
|
@ -4192,42 +4192,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
|
|
||||||
int32_t size) {
|
|
||||||
pSup->currentPageId = -1;
|
|
||||||
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
|
||||||
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
|
||||||
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
|
||||||
pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
|
||||||
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
pSup->valueSize = size;
|
|
||||||
|
|
||||||
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
|
||||||
int32_t pageSize = 4096;
|
|
||||||
while (pageSize < pSup->resultRowSize * 4) {
|
|
||||||
pageSize <<= 1u;
|
|
||||||
}
|
|
||||||
// at least four pages need to be in buffer
|
|
||||||
int32_t bufSize = 4096 * 256;
|
|
||||||
if (bufSize <= pageSize) {
|
|
||||||
bufSize = pageSize * 4;
|
|
||||||
}
|
|
||||||
if (!osTempSpaceAvailable()) {
|
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
|
||||||
qError("Init stream agg supporter failed since %s", terrstr(terrno));
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
|
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
|
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
|
||||||
SWinKey key = {
|
SWinKey key = {
|
||||||
|
@ -4237,7 +4201,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
||||||
char* value = NULL;
|
char* value = NULL;
|
||||||
int32_t size = pAggSup->resultRowSize;
|
int32_t size = pAggSup->resultRowSize;
|
||||||
|
|
||||||
tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
|
|
||||||
if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
|
if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -4342,3 +4305,82 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
|
||||||
|
streamStateSessionPut(pState, key, (const void*)buf, size);
|
||||||
|
releaseOutputBuf(pState, NULL, (SResultRow*)buf);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
|
||||||
|
SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
|
||||||
|
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||||
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
|
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||||
|
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||||
|
|
||||||
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
|
|
||||||
|
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
||||||
|
SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
|
||||||
|
int32_t size = 0;
|
||||||
|
void* pVal = NULL;
|
||||||
|
int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
SResultRow* pRow = (SResultRow*)pVal;
|
||||||
|
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||||
|
// no results, continue to check the next one
|
||||||
|
if (pRow->numOfRows == 0) {
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
releaseOutputBuf(pState, NULL, pRow);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.groupId == 0) {
|
||||||
|
pBlock->info.groupId = pKey->groupId;
|
||||||
|
} else {
|
||||||
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
|
if (pBlock->info.groupId != pKey->groupId) {
|
||||||
|
releaseOutputBuf(pState, NULL, pRow);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
|
ASSERT(pBlock->info.rows > 0);
|
||||||
|
releaseOutputBuf(pState, NULL, pRow);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||||
|
|
||||||
|
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||||
|
if (pCtx[j].fpSet.finalize) {
|
||||||
|
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
|
if (TAOS_FAILED(code1)) {
|
||||||
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code1);
|
||||||
|
}
|
||||||
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
|
// do nothing, todo refactor
|
||||||
|
} else {
|
||||||
|
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
|
||||||
|
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||||
|
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||||
|
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
|
// saveSessionDiscBuf(pState, pKey, pVal, size);
|
||||||
|
releaseOutputBuf(pState, NULL, pRow);
|
||||||
|
}
|
||||||
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
|
@ -1190,23 +1190,22 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
||||||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
int32_t dummy = 0;
|
|
||||||
int64_t version = pSrcBlock->info.version - 1;
|
int64_t version = pSrcBlock->info.version - 1;
|
||||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
||||||
// gap must be 0.
|
// gap must be 0.
|
||||||
SResultWindowInfo* pStartWin =
|
SSessionKey startWin = {0};
|
||||||
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
|
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin);
|
||||||
if (!pStartWin) {
|
if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
|
||||||
// window has been closed.
|
// window has been closed.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SResultWindowInfo* pEndWin =
|
SSessionKey endWin = {0};
|
||||||
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
|
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
|
||||||
ASSERT(pEndWin);
|
ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
|
||||||
TSKEY ts = INT64_MIN;
|
colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
|
||||||
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
|
colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
|
||||||
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
|
|
||||||
colDataAppendNULL(pDestUidCol, i);
|
colDataAppendNULL(pDestUidCol, i);
|
||||||
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
||||||
colDataAppendNULL(pDestCalStartTsCol, i);
|
colDataAppendNULL(pDestCalStartTsCol, i);
|
||||||
|
|
|
@ -693,7 +693,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
|
||||||
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
|
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
|
||||||
tSimpleHashCleanup(pFillSup->pResMap);
|
tSimpleHashCleanup(pFillSup->pResMap);
|
||||||
pFillSup->pResMap = NULL;
|
pFillSup->pResMap = NULL;
|
||||||
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
|
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
|
||||||
pFillSup->cur.pRowVal = NULL;
|
pFillSup->cur.pRowVal = NULL;
|
||||||
|
|
||||||
taosMemoryFree(pFillSup);
|
taosMemoryFree(pFillSup);
|
||||||
|
@ -736,7 +736,7 @@ static void resetFillWindow(SResultRowData* pRowData) {
|
||||||
|
|
||||||
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, SStreamState* pState) {
|
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, SStreamState* pState) {
|
||||||
resetFillWindow(&pFillSup->prev);
|
resetFillWindow(&pFillSup->prev);
|
||||||
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
|
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
|
||||||
resetFillWindow(&pFillSup->cur);
|
resetFillWindow(&pFillSup->cur);
|
||||||
resetFillWindow(&pFillSup->next);
|
resetFillWindow(&pFillSup->next);
|
||||||
resetFillWindow(&pFillSup->nextNext);
|
resetFillWindow(&pFillSup->nextNext);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2527,6 +2527,8 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
buildTDigestInfo(pInfo);
|
||||||
|
tdigestAutoFill(pInfo->pTDigest, COMPRESSION);
|
||||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2540,12 +2542,11 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
tdigestAdd(pInfo->pTDigest, v, w);
|
tdigestAdd(pInfo->pTDigest, v, w);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s before add %d elements into histogram, total:%d, numOfEntry:%d, pHisto:%p, elems: %p", __FUNCTION__,
|
|
||||||
numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems);
|
|
||||||
|
|
||||||
// might be a race condition here that pHisto can be overwritten or setup function
|
// might be a race condition here that pHisto can be overwritten or setup function
|
||||||
// has not been called, need to relink the buffer pHisto points to.
|
// has not been called, need to relink the buffer pHisto points to.
|
||||||
buildHistogramInfo(pInfo);
|
buildHistogramInfo(pInfo);
|
||||||
|
qDebug("%s before add %d elements into histogram, total:%d, numOfEntry:%d, pHisto:%p, elems: %p", __FUNCTION__,
|
||||||
|
numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems);
|
||||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2579,8 +2580,9 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
|
||||||
|
|
||||||
buildTDigestInfo(pOutput);
|
buildTDigestInfo(pOutput);
|
||||||
TDigest* pTDigest = pOutput->pTDigest;
|
TDigest* pTDigest = pOutput->pTDigest;
|
||||||
|
tdigestAutoFill(pTDigest, COMPRESSION);
|
||||||
|
|
||||||
if (pTDigest->num_centroids <= 0) {
|
if (pTDigest->num_centroids <= 0 && pTDigest->num_buffered_pts == 0) {
|
||||||
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
||||||
tdigestAutoFill(pTDigest, COMPRESSION);
|
tdigestAutoFill(pTDigest, COMPRESSION);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2652,6 +2654,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
buildTDigestInfo(pInfo);
|
buildTDigestInfo(pInfo);
|
||||||
|
tdigestAutoFill(pInfo->pTDigest, COMPRESSION);
|
||||||
if (pInfo->pTDigest->size > 0) {
|
if (pInfo->pTDigest->size > 0) {
|
||||||
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
|
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
|
||||||
} else { // no need to free
|
} else { // no need to free
|
||||||
|
@ -5356,16 +5359,14 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t maxCount = 0;
|
int32_t maxCount = 0;
|
||||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||||
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
|
||||||
if (pItem->count > maxCount) {
|
if (pItem->count >= maxCount) {
|
||||||
maxCount = pItem->count;
|
maxCount = pItem->count;
|
||||||
resIndex = i;
|
resIndex = i;
|
||||||
} else if (pItem->count == maxCount) {
|
|
||||||
resIndex = -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
|
||||||
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
|
colDataAppend(pCol, currentRow, pResItem->data, (maxCount == 0) ? true : false);
|
||||||
|
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,40 @@ typedef struct SStateKey {
|
||||||
int64_t opNum;
|
int64_t opNum;
|
||||||
} SStateKey;
|
} SStateKey;
|
||||||
|
|
||||||
|
typedef struct SStateSessionKey {
|
||||||
|
SSessionKey key;
|
||||||
|
int64_t opNum;
|
||||||
|
} SStateSessionKey;
|
||||||
|
|
||||||
|
static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
|
||||||
|
if (pWin1->groupId > pWin2->groupId) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->groupId < pWin2->groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWin1->win.skey > pWin2->win.ekey) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->win.ekey < pWin2->win.skey) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
|
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
|
||||||
|
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
|
||||||
|
|
||||||
|
if (pWin1->opNum > pWin2->opNum) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->opNum < pWin2->opNum) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return sessionKeyCmpr(&pWin1->key, &pWin2->key);
|
||||||
|
}
|
||||||
|
|
||||||
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
SStateKey* pWin1 = (SStateKey*)pKey1;
|
SStateKey* pWin1 = (SStateKey*)pKey1;
|
||||||
SStateKey* pWin2 = (SStateKey*)pKey2;
|
SStateKey* pWin2 = (SStateKey*)pKey2;
|
||||||
|
@ -79,7 +113,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) {
|
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db,
|
||||||
|
&pState->pSessionStateDb) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,6 +134,7 @@ _err:
|
||||||
tdbTbClose(pState->pStateDb);
|
tdbTbClose(pState->pStateDb);
|
||||||
tdbTbClose(pState->pFuncStateDb);
|
tdbTbClose(pState->pFuncStateDb);
|
||||||
tdbTbClose(pState->pFillStateDb);
|
tdbTbClose(pState->pFillStateDb);
|
||||||
|
tdbTbClose(pState->pSessionStateDb);
|
||||||
tdbClose(pState->db);
|
tdbClose(pState->db);
|
||||||
taosMemoryFree(pState);
|
taosMemoryFree(pState);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -105,6 +145,7 @@ void streamStateClose(SStreamState* pState) {
|
||||||
tdbTbClose(pState->pStateDb);
|
tdbTbClose(pState->pStateDb);
|
||||||
tdbTbClose(pState->pFuncStateDb);
|
tdbTbClose(pState->pFuncStateDb);
|
||||||
tdbTbClose(pState->pFillStateDb);
|
tdbTbClose(pState->pFillStateDb);
|
||||||
|
tdbTbClose(pState->pSessionStateDb);
|
||||||
tdbClose(pState->db);
|
tdbClose(pState->db);
|
||||||
|
|
||||||
taosMemoryFree(pState);
|
taosMemoryFree(pState);
|
||||||
|
@ -241,11 +282,11 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
|
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
|
||||||
|
|
||||||
int32_t c;
|
int32_t c = 0;
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
|
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
|
||||||
if (c != 0) {
|
if (c != 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
|
@ -257,7 +298,7 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key)
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL);
|
tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL);
|
||||||
|
|
||||||
int32_t c;
|
int32_t c = 0;
|
||||||
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
|
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
|
||||||
if (c != 0) {
|
if (c != 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
@ -348,21 +389,21 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
|
||||||
}
|
}
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
|
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
tdbTbcClose(pCur->pCur);
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (c > 0) return pCur;
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,20 +416,20 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
|
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t c;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
tdbTbcClose(pCur->pCur);
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (c > 0) return pCur;
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -401,20 +442,20 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
|
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t c;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
tdbTbcClose(pCur->pCur);
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (c < 0) return pCur;
|
if (c < 0) return pCur;
|
||||||
|
|
||||||
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
|
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
|
||||||
taosMemoryFree(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,3 +486,229 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamFreeVal(void* val) { tdbFree(val); }
|
void streamFreeVal(void* val) { tdbFree(val); }
|
||||||
|
|
||||||
|
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) return NULL;
|
||||||
|
tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL);
|
||||||
|
|
||||||
|
int32_t c = 0;
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c);
|
||||||
|
if (c != 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||||
|
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
||||||
|
void* tmp = NULL;
|
||||||
|
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
|
||||||
|
*pVal = tdbRealloc(NULL, *pVLen);
|
||||||
|
memcpy(*pVal, tmp, *pVLen);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
int32_t c = 0;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
|
tdbTbcClose(pCur->pCur);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
int32_t c = 0;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
|
tdbTbcClose(pCur->pCur);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (c > 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||||
|
if (!pCur) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
const SStateSessionKey* pKTmp = NULL;
|
||||||
|
int32_t kLen;
|
||||||
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pKTmp->opNum != pCur->number) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*pKey = pKTmp->key;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionClear(SStreamState* pState) {
|
||||||
|
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
|
||||||
|
streamStateSessionPut(pState, &key, NULL, 0);
|
||||||
|
SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pState, &key);
|
||||||
|
while (1) {
|
||||||
|
SSessionKey delKey = {0};
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t size = 0;
|
||||||
|
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, buf, &size);
|
||||||
|
if (code == 0) {
|
||||||
|
memset(buf, 0, size);
|
||||||
|
streamStateSessionPut(pState, &delKey, buf, size);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
streamStateCurNext(pState, pCur);
|
||||||
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
streamStateSessionDel(pState, &key);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
||||||
|
SSessionKey resKey = *key;
|
||||||
|
while (1) {
|
||||||
|
streamStateCurPrev(pState, pCur);
|
||||||
|
SSessionKey tmpKey = *key;
|
||||||
|
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||||
|
if (code == TSDB_CODE_SUCCESS && sessionKeyCmpr(key, &tmpKey) == 0) {
|
||||||
|
resKey = tmpKey;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return streamStateSessionGetRanomCur(pState, &resKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||||
|
// todo refactor
|
||||||
|
SStreamStateCur* pCur = streamStateSessionGetCur(pState, key);
|
||||||
|
int32_t size = *pVLen;
|
||||||
|
void* tmp = NULL;
|
||||||
|
*pVal = tdbRealloc(NULL, size);
|
||||||
|
memset(*pVal, 0, size);
|
||||||
|
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
|
||||||
|
memcpy(*pVal, tmp, *pVLen);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
|
||||||
|
// todo refactor
|
||||||
|
int32_t res = TSDB_CODE_SUCCESS;
|
||||||
|
SSessionKey tmpKey = *key;
|
||||||
|
int32_t valSize = *pVLen;
|
||||||
|
void* tmp = tdbRealloc(NULL, valSize);
|
||||||
|
if (!tmp) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
||||||
|
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
*pVal = tmp;
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
|
||||||
|
streamStateSessionPut(pState, key, NULL, 0);
|
||||||
|
pCur = streamStateSessionGetRanomCur(pState, key);
|
||||||
|
streamStateCurPrev(pState, pCur);
|
||||||
|
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
*key = tmpKey;
|
||||||
|
pCur = streamStateSessionSeekKeyNext(pState, key);
|
||||||
|
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*key = tmpKey;
|
||||||
|
res = 1;
|
||||||
|
memset(tmp, 0, valSize);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
|
||||||
|
*pVal = tmp;
|
||||||
|
streamStateSessionDel(pState, &tmpKey);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
|
@ -3032,7 +3032,8 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
||||||
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType));
|
||||||
|
// return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
||||||
|
@ -3084,7 +3085,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
// user commit
|
// user commit
|
||||||
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
|
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
bool internalExecute = true;
|
bool internalExecute = true;
|
||||||
if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
|
if ((ths->replicaNum == 1) && ths->restoreFinish) {
|
||||||
|
// if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
|
||||||
internalExecute = false;
|
internalExecute = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -249,12 +249,18 @@
|
||||||
./test.sh -f tsim/stream/windowClose.sim
|
./test.sh -f tsim/stream/windowClose.sim
|
||||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||||
./test.sh -f tsim/stream/sliding.sim
|
./test.sh -f tsim/stream/sliding.sim
|
||||||
#./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||||
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||||
#./test.sh -f tsim/stream/partitionbyColumnState.sim
|
#./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||||
#./test.sh -f tsim/stream/deleteInterval.sim
|
#./test.sh -f tsim/stream/deleteInterval.sim
|
||||||
#./test.sh -f tsim/stream/deleteSession.sim
|
#./test.sh -f tsim/stream/deleteSession.sim
|
||||||
#./test.sh -f tsim/stream/deleteState.sim
|
#./test.sh -f tsim/stream/deleteState.sim
|
||||||
|
#./test.sh -f tsim/stream/fillIntervalDelete0.sim
|
||||||
|
#./test.sh -f tsim/stream/fillIntervalDelete1.sim
|
||||||
|
./test.sh -f tsim/stream/fillIntervalLinear.sim
|
||||||
|
#./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
|
||||||
|
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
|
||||||
|
./test.sh -f tsim/stream/fillIntervalValue.sim
|
||||||
|
|
||||||
# ---- transaction ----
|
# ---- transaction ----
|
||||||
./test.sh -f tsim/trans/lossdata1.sim
|
./test.sh -f tsim/trans/lossdata1.sim
|
||||||
|
|
|
@ -17,9 +17,9 @@ sql create table $table1 (ts timestamp, b binary(20))
|
||||||
sql create table $table2 (ts timestamp, b binary(20))
|
sql create table $table2 (ts timestamp, b binary(20))
|
||||||
|
|
||||||
sql insert into $table1 values(now, "table_name")
|
sql insert into $table1 values(now, "table_name")
|
||||||
sql insert into $table1 values(now-1m, "tablexname")
|
sql insert into $table1 values(now-3m, "tablexname")
|
||||||
sql insert into $table1 values(now-2m, "tablexxx")
|
sql insert into $table1 values(now-2m, "tablexxx")
|
||||||
sql insert into $table1 values(now-2m, "table")
|
sql insert into $table1 values(now-1m, "table")
|
||||||
|
|
||||||
sql select b from $table1
|
sql select b from $table1
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
|
|
|
@ -216,12 +216,12 @@ if $data02 != 3.274823935 then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data03 != 1.800000000 then
|
if $data03 != 1.500000000 then
|
||||||
print ======$data03
|
print ======$data03
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data04 != 3.350000000 then
|
if $data04 != 3.500000000 then
|
||||||
print ======$data04
|
print ======$data04
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
|
@ -5,15 +5,15 @@ sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print =============== create database
|
print =============== create database
|
||||||
sql create database test vgroups 1
|
sql create database test vgroups 1;
|
||||||
sql select * from information_schema.ins_databases
|
sql select * from information_schema.ins_databases;
|
||||||
if $rows != 3 then
|
if $rows != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
sql use test
|
sql use test;
|
||||||
|
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
|
||||||
|
|
|
@ -349,7 +349,7 @@ endi
|
||||||
|
|
||||||
if $rows != 3 then
|
if $rows != 3 then
|
||||||
print ====loop4=rows=$rows
|
print ====loop4=rows=$rows
|
||||||
# goto loop4
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
# row 0
|
# row 0
|
||||||
|
|
|
@ -13,7 +13,7 @@ endi
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
sql use test
|
sql use test;
|
||||||
sql create table t2(ts timestamp, a int, b int , c int, d double);
|
sql create table t2(ts timestamp, a int, b int , c int, d double);
|
||||||
sql create stream streams2 trigger window_close into streamt2 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s);
|
sql create stream streams2 trigger window_close into streamt2 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s);
|
||||||
|
|
||||||
|
@ -58,16 +58,11 @@ endi
|
||||||
sql insert into t2 values(1648791233002,1,2,3,1.0);
|
sql insert into t2 values(1648791233002,1,2,3,1.0);
|
||||||
sleep 300
|
sleep 300
|
||||||
sql select * from streamt2;
|
sql select * from streamt2;
|
||||||
if $rows != 1 then
|
if $rows != 0 then
|
||||||
print ======$rows
|
print ======$rows
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data01 != 6 then
|
|
||||||
print ======$data01
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
sql insert into t2 values(1648791253003,1,2,3,1.0);
|
sql insert into t2 values(1648791253003,1,2,3,1.0);
|
||||||
sleep 300
|
sleep 300
|
||||||
sql select * from streamt2;
|
sql select * from streamt2;
|
||||||
|
|
|
@ -37,6 +37,7 @@ void simLogSql(char *sql, bool useSharp) {
|
||||||
taosFsyncFile(pFile);
|
taosFsyncFile(pFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
char *simParseArbitratorName(char *varName) {
|
char *simParseArbitratorName(char *varName) {
|
||||||
static char hostName[140];
|
static char hostName[140];
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -47,6 +48,7 @@ char *simParseArbitratorName(char *varName) {
|
||||||
#endif
|
#endif
|
||||||
return hostName;
|
return hostName;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
char *simParseHostName(char *varName) {
|
char *simParseHostName(char *varName) {
|
||||||
static char hostName[140];
|
static char hostName[140];
|
||||||
|
@ -102,9 +104,11 @@ char *simGetVariable(SScript *script, char *varName, int32_t varLen) {
|
||||||
return simParseHostName(varName);
|
return simParseHostName(varName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (strncmp(varName, "arbitrator", 10) == 0) {
|
if (strncmp(varName, "arbitrator", 10) == 0) {
|
||||||
return simParseArbitratorName(varName);
|
return simParseArbitratorName(varName);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (strncmp(varName, "error", varLen) == 0) return script->error;
|
if (strncmp(varName, "error", varLen) == 0) return script->error;
|
||||||
|
|
||||||
|
@ -883,6 +887,7 @@ bool simExecuteSqlSlowCmd(SScript *script, char *rest) {
|
||||||
return simExecuteSqlImpCmd(script, rest, isSlow);
|
return simExecuteSqlImpCmd(script, rest, isSlow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool simExecuteRestfulCmd(SScript *script, char *rest) {
|
bool simExecuteRestfulCmd(SScript *script, char *rest) {
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
char filename[256];
|
char filename[256];
|
||||||
|
@ -924,6 +929,7 @@ bool simExecuteRestfulCmd(SScript *script, char *rest) {
|
||||||
|
|
||||||
return simExecuteSystemCmd(script, cmd);
|
return simExecuteSystemCmd(script, cmd);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
|
bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
|
||||||
char buf[3000];
|
char buf[3000];
|
||||||
|
@ -981,6 +987,7 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool simExecuteLineInsertCmd(SScript *script, char *rest) {
|
bool simExecuteLineInsertCmd(SScript *script, char *rest) {
|
||||||
char buf[TSDB_MAX_BINARY_LEN] = {0};
|
char buf[TSDB_MAX_BINARY_LEN] = {0};
|
||||||
|
|
||||||
|
@ -1037,3 +1044,4 @@ bool simExecuteLineInsertErrorCmd(SScript *script, char *rest) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
|
@ -501,6 +501,7 @@ bool simParseEndwCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool simParseSwitchCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
bool simParseSwitchCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
char *token;
|
char *token;
|
||||||
int32_t tokenLen;
|
int32_t tokenLen;
|
||||||
|
@ -647,6 +648,7 @@ bool simParseContinueCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
numOfLines++;
|
numOfLines++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool simParsePrintCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
bool simParsePrintCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
int32_t expLen;
|
int32_t expLen;
|
||||||
|
@ -715,6 +717,7 @@ bool simParseSqlErrorCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool simParseSqlSlowCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
bool simParseSqlSlowCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
simParseSqlCmd(rest, pCmd, lineNum);
|
simParseSqlCmd(rest, pCmd, lineNum);
|
||||||
cmdLine[numOfLines - 1].cmdno = SIM_CMD_SQL_SLOW;
|
cmdLine[numOfLines - 1].cmdno = SIM_CMD_SQL_SLOW;
|
||||||
|
@ -726,6 +729,7 @@ bool simParseRestfulCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
cmdLine[numOfLines - 1].cmdno = SIM_CMD_RESTFUL;
|
cmdLine[numOfLines - 1].cmdno = SIM_CMD_RESTFUL;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool simParseSystemCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
bool simParseSystemCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
int32_t expLen;
|
int32_t expLen;
|
||||||
|
@ -838,6 +842,7 @@ bool simParseRunBackCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool simParseLineInsertCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
bool simParseLineInsertCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
int32_t expLen;
|
int32_t expLen;
|
||||||
|
|
||||||
|
@ -869,6 +874,7 @@ bool simParseLineInsertErrorCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
|
||||||
numOfLines++;
|
numOfLines++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void simInitsimCmdList() {
|
void simInitsimCmdList() {
|
||||||
int32_t cmdno;
|
int32_t cmdno;
|
||||||
|
@ -930,6 +936,7 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].executeCmd = NULL;
|
simCmdList[cmdno].executeCmd = NULL;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
|
||||||
|
#if 0
|
||||||
cmdno = SIM_CMD_SWITCH;
|
cmdno = SIM_CMD_SWITCH;
|
||||||
simCmdList[cmdno].cmdno = cmdno;
|
simCmdList[cmdno].cmdno = cmdno;
|
||||||
strcpy(simCmdList[cmdno].name, "switch");
|
strcpy(simCmdList[cmdno].name, "switch");
|
||||||
|
@ -977,6 +984,7 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].parseCmd = simParseEndsCmd;
|
simCmdList[cmdno].parseCmd = simParseEndsCmd;
|
||||||
simCmdList[cmdno].executeCmd = NULL;
|
simCmdList[cmdno].executeCmd = NULL;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
#endif
|
||||||
|
|
||||||
cmdno = SIM_CMD_SLEEP;
|
cmdno = SIM_CMD_SLEEP;
|
||||||
simCmdList[cmdno].cmdno = cmdno;
|
simCmdList[cmdno].cmdno = cmdno;
|
||||||
|
@ -1050,6 +1058,7 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].executeCmd = simExecuteSqlErrorCmd;
|
simCmdList[cmdno].executeCmd = simExecuteSqlErrorCmd;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
|
||||||
|
#if 0
|
||||||
cmdno = SIM_CMD_SQL_SLOW;
|
cmdno = SIM_CMD_SQL_SLOW;
|
||||||
simCmdList[cmdno].cmdno = cmdno;
|
simCmdList[cmdno].cmdno = cmdno;
|
||||||
strcpy(simCmdList[cmdno].name, "sql_slow");
|
strcpy(simCmdList[cmdno].name, "sql_slow");
|
||||||
|
@ -1065,6 +1074,7 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].parseCmd = simParseRestfulCmd;
|
simCmdList[cmdno].parseCmd = simParseRestfulCmd;
|
||||||
simCmdList[cmdno].executeCmd = simExecuteRestfulCmd;
|
simCmdList[cmdno].executeCmd = simExecuteRestfulCmd;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
#endif
|
||||||
|
|
||||||
/* test is only an internal command */
|
/* test is only an internal command */
|
||||||
cmdno = SIM_CMD_TEST;
|
cmdno = SIM_CMD_TEST;
|
||||||
|
@ -1082,6 +1092,7 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].executeCmd = simExecuteReturnCmd;
|
simCmdList[cmdno].executeCmd = simExecuteReturnCmd;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
|
||||||
|
#if 0
|
||||||
cmdno = SIM_CMD_LINE_INSERT;
|
cmdno = SIM_CMD_LINE_INSERT;
|
||||||
simCmdList[cmdno].cmdno = cmdno;
|
simCmdList[cmdno].cmdno = cmdno;
|
||||||
strcpy(simCmdList[cmdno].name, "line_insert");
|
strcpy(simCmdList[cmdno].name, "line_insert");
|
||||||
|
@ -1097,4 +1108,5 @@ void simInitsimCmdList() {
|
||||||
simCmdList[cmdno].parseCmd = simParseLineInsertErrorCmd;
|
simCmdList[cmdno].parseCmd = simParseLineInsertErrorCmd;
|
||||||
simCmdList[cmdno].executeCmd = simExecuteLineInsertErrorCmd;
|
simCmdList[cmdno].executeCmd = simExecuteLineInsertErrorCmd;
|
||||||
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
simAddCmdIntoHash(&(simCmdList[cmdno]));
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue