Merge branch '3.0' into fix/liao_cov
This commit is contained in:
commit
ffa34888df
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG 0fb640b
|
GIT_TAG a921bd4
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -130,7 +130,7 @@ void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn)
|
||||||
void tColDataClear(SColData *pColData);
|
void tColDataClear(SColData *pColData);
|
||||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
||||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal);
|
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
|
||||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||||
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
|
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
|
||||||
|
|
||||||
|
|
|
@ -1805,7 +1805,7 @@ int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicR
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
} SMqConsumerLostMsg, SMqConsumerRecoverMsg;
|
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
|
|
|
@ -149,7 +149,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
|
||||||
|
@ -171,6 +171,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -60,19 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateClear(SStreamState* pState);
|
int32_t streamStateClear(SStreamState* pState);
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
||||||
|
|
||||||
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
|
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
|
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
|
||||||
int32_t streamStateSessionClear(SStreamState* pState);
|
int32_t streamStateSessionClear(SStreamState* pState);
|
||||||
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
|
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
||||||
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
|
||||||
|
|
||||||
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||||
|
@ -99,7 +99,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
|
||||||
// char* streamStateSessionDump(SStreamState* pState);
|
#if 0
|
||||||
|
char* streamStateSessionDump(SStreamState* pState);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -990,7 +990,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
// all data has returned to App already, no need to try again
|
// all data has returned to App already, no need to try again
|
||||||
if (pResultInfo->completed) {
|
if (pResultInfo->completed) {
|
||||||
// it is a local executed query, no need to do async fetch
|
// it is a local executed query, no need to do async fetch
|
||||||
if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
|
if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
|
||||||
if (pResultInfo->localResultFetched) {
|
if (pResultInfo->localResultFetched) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
pResultInfo->current = 0;
|
pResultInfo->current = 0;
|
||||||
|
|
|
@ -1557,7 +1557,7 @@ void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
|
||||||
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal) {
|
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) {
|
||||||
uint8_t v;
|
uint8_t v;
|
||||||
switch (pColData->flag) {
|
switch (pColData->flag) {
|
||||||
case HAS_NONE:
|
case HAS_NONE:
|
||||||
|
|
|
@ -44,6 +44,7 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||||
|
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||||
|
|
||||||
bool mndRebTryStart();
|
bool mndRebTryStart();
|
||||||
void mndRebEnd();
|
void mndRebEnd();
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||||
|
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 3
|
#define MND_CONSUMER_LOST_HB_CNT 3
|
||||||
|
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||||
|
|
||||||
static int8_t mqRebInExecCnt = 0;
|
static int8_t mqRebInExecCnt = 0;
|
||||||
|
|
||||||
|
@ -50,6 +51,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg);
|
||||||
|
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
|
@ -69,6 +71,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
|
||||||
|
@ -162,6 +165,43 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
|
||||||
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
|
||||||
|
if (pConsumer == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId,
|
||||||
|
mndConsumerStatusName(pConsumer->status));
|
||||||
|
|
||||||
|
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
pConsumerNew->updateType = CONSUMER_UPDATE__LOST;
|
||||||
|
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||||
|
if (pTrans == NULL) goto FAIL;
|
||||||
|
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||||
|
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
FAIL:
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||||
if (pRebInfo == NULL) {
|
if (pRebInfo == NULL) {
|
||||||
|
@ -206,15 +246,28 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
|
||||||
|
|
||||||
pLostMsg->consumerId = pConsumer->consumerId;
|
pLostMsg->consumerId = pConsumer->consumerId;
|
||||||
SRpcMsg pRpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
|
.msgType = TDMT_MND_TMQ_CONSUMER_LOST,
|
||||||
.pCont = pLostMsg,
|
.pCont = pLostMsg,
|
||||||
.contLen = sizeof(SMqConsumerLostMsg),
|
.contLen = sizeof(SMqConsumerLostMsg),
|
||||||
};
|
};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
|
|
||||||
|
if (status == MQ_CONSUMER_STATUS__READY) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||||
|
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
|
||||||
|
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||||
|
|
||||||
|
pClearMsg->consumerId = pConsumer->consumerId;
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
|
||||||
|
.pCont = pClearMsg,
|
||||||
|
.contLen = sizeof(SMqConsumerClearMsg),
|
||||||
|
};
|
||||||
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
}
|
||||||
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
|
@ -444,6 +497,14 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||||
|
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
|
|
@ -893,7 +893,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
||||||
// NOTE: reverse the order to find the end position in data block
|
// NOTE: reverse the order to find the end position in data block
|
||||||
int32_t endPos = -1;
|
int32_t endPos = -1;
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||||
|
@ -910,6 +910,117 @@ int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SData
|
||||||
return endPos;
|
return endPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
|
||||||
|
int32_t dumpedRows, bool asc) {
|
||||||
|
if (asc) {
|
||||||
|
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t));
|
||||||
|
} else {
|
||||||
|
int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
|
||||||
|
memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t));
|
||||||
|
|
||||||
|
// todo: opt perf by extract the loop
|
||||||
|
// reverse the array list
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int64_t* pts = (int64_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a faster version of copy procedure.
|
||||||
|
static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData,
|
||||||
|
int32_t dumpedRows, bool asc) {
|
||||||
|
uint8_t* p = NULL;
|
||||||
|
if (asc) {
|
||||||
|
p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
|
||||||
|
} else {
|
||||||
|
int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1;
|
||||||
|
p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t step = asc? 1:-1;
|
||||||
|
|
||||||
|
// make sure it is aligned to 8bit
|
||||||
|
ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
|
||||||
|
|
||||||
|
// 1. copy data in a batch model
|
||||||
|
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
|
||||||
|
|
||||||
|
// 2. reverse the array list in case of descending order scan data block
|
||||||
|
if (!asc) {
|
||||||
|
switch(pColData->info.type) {
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
{
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int64_t* pts = (int64_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int8_t* pts = (int8_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int16_t* pts = (int16_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
int32_t mid = dumpedRows >> 1u;
|
||||||
|
int32_t* pts = (int32_t*)pColData->pData;
|
||||||
|
for (int32_t j = 0; j < mid; ++j) {
|
||||||
|
int64_t t = pts[j];
|
||||||
|
pts[j] = pts[dumpedRows - j - 1];
|
||||||
|
pts[dumpedRows - j - 1] = t;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. if the null value exists, check items one-by-one
|
||||||
|
if (pData->flag != HAS_VALUE) {
|
||||||
|
int32_t rowIndex = 0;
|
||||||
|
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) {
|
||||||
|
uint8_t v = tColDataGetBitValue(pData, j);
|
||||||
|
if (v == 0 || v == 1) {
|
||||||
|
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
||||||
|
pColData->hasNull = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
@ -949,24 +1060,17 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
}
|
}
|
||||||
|
|
||||||
endIndex += step;
|
endIndex += step;
|
||||||
int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||||
if (remain > pReader->capacity) { // output buffer check
|
if (dumpedRows > pReader->capacity) { // output buffer check
|
||||||
remain = pReader->capacity;
|
dumpedRows = pReader->capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = 0;
|
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
int32_t rowIndex = 0;
|
||||||
|
|
||||||
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
if (asc) {
|
copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc);
|
||||||
memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t));
|
|
||||||
} else {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
|
||||||
colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -981,23 +1085,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
} else if (pData->cid == pColData->info.colId) {
|
} else if (pData->cid == pColData->info.colId) {
|
||||||
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
|
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) {
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
|
if (IS_MATHABLE_TYPE(pColData->info.type)) {
|
||||||
uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex;
|
copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc);
|
||||||
memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes);
|
} else { // varchar/nchar type
|
||||||
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
|
||||||
// null value exists, check one-by-one
|
|
||||||
if (pData->flag != HAS_VALUE) {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
|
|
||||||
uint8_t v = tColDataGetBitValue(pData, j);
|
|
||||||
if (v == 0 || v == 1) {
|
|
||||||
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) {
|
|
||||||
tColDataGetValue(pData, j, &cv);
|
tColDataGetValue(pData, j, &cv);
|
||||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||||
}
|
}
|
||||||
|
@ -1007,7 +1100,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
i += 1;
|
i += 1;
|
||||||
} else { // the specified column does not exist in file block, fill with null data
|
} else { // the specified column does not exist in file block, fill with null data
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1015,12 +1108,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
// fill the mis-matched columns with null value
|
// fill the mis-matched columns with null value
|
||||||
while (i < numOfOutputCols) {
|
while (i < numOfOutputCols) {
|
||||||
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
pColData = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, dumpedRows);
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pResBlock->info.rows = remain;
|
pResBlock->info.rows = dumpedRows;
|
||||||
pDumpInfo->rowIndex += step * remain;
|
pDumpInfo->rowIndex += step * dumpedRows;
|
||||||
|
|
||||||
// check if current block are all handled
|
// check if current block are all handled
|
||||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
||||||
|
@ -1039,7 +1132,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||||
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
|
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
|
||||||
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -264,6 +264,7 @@ typedef struct SExchangeInfo {
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
uint64_t self;
|
uint64_t self;
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
|
int64_t openedTs; // start exec time stamp
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
typedef struct SScanInfo {
|
typedef struct SScanInfo {
|
||||||
|
|
|
@ -1846,40 +1846,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
static int32_t getCompletedSources(const SArray* pArray) {
|
||||||
int32_t code = 0;
|
size_t total = taosArrayGetSize(pArray);
|
||||||
int64_t startTs = taosGetTimestampUs();
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
|
||||||
|
|
||||||
int32_t completed = 0;
|
int32_t completed = 0;
|
||||||
for (int32_t k = 0; k < totalSources; ++k) {
|
for (int32_t k = 0; k < total; ++k) {
|
||||||
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
SSourceDataInfo* p = taosArrayGet(pArray, k);
|
||||||
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
completed += 1;
|
completed += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
|
||||||
|
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||||
if (completed == totalSources) {
|
if (completed == totalSources) {
|
||||||
setAllSourcesCompleted(pOperator, startTs);
|
setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// printf("1\n");
|
|
||||||
tsem_wait(&pExchangeInfo->ready);
|
tsem_wait(&pExchangeInfo->ready);
|
||||||
// printf("2\n");
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < totalSources; ++i) {
|
for (int32_t i = 0; i < totalSources; ++i) {
|
||||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
||||||
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
// printf("========:%d is completed\n", i);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// printf("index:%d - status:%d\n", i, pDataInfo->status);
|
|
||||||
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
|
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
|
||||||
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1895,19 +1896,11 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
// printf("%d completed, try next\n", i);
|
|
||||||
|
|
||||||
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
|
||||||
pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources);
|
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
|
||||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||||
|
|
||||||
// if (completed == totalSources) {
|
|
||||||
// return;
|
|
||||||
// } else {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1915,7 +1908,6 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
char* pStart = pRetrieveRsp->data;
|
char* pStart = pRetrieveRsp->data;
|
||||||
while (index++ < pRetrieveRsp->numOfBlocks) {
|
while (index++ < pRetrieveRsp->numOfBlocks) {
|
||||||
printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks);
|
|
||||||
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
||||||
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1926,25 +1918,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
|
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator);
|
||||||
|
|
||||||
// int32_t completed = 0;
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
|
|
||||||
// for (int32_t k = 0; k < totalSources; ++k) {
|
|
||||||
// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
|
||||||
// if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
|
||||||
// completed += 1;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
|
||||||
", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
|
", total:%.2f Kb, try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
|
||||||
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
||||||
completed, i + 1, totalSources);
|
i + 1, totalSources);
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
|
||||||
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
|
||||||
|
@ -1962,23 +1945,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (completed == totalSources) {
|
|
||||||
// setAllSourcesCompleted(pOperator, startTs);
|
|
||||||
// }
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
} // end loop
|
||||||
|
|
||||||
int32_t completed = 0;
|
int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||||
for (int32_t k = 0; k < totalSources; ++k) {
|
if (complete1 == totalSources) {
|
||||||
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
|
qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
|
||||||
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
|
||||||
completed += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (completed == totalSources) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2097,6 +2069,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
pExchangeInfo->openedTs = taosGetTimestampUs();
|
||||||
}
|
}
|
||||||
|
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
|
|
@ -918,6 +918,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
blockDataDestroy(pResBlock);
|
blockDataDestroy(pResBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(pParInfo->rowIds);
|
||||||
|
pParInfo->rowIds = NULL;
|
||||||
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
|
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
|
||||||
pDest->info.groupId = pParInfo->groupId;
|
pDest->info.groupId = pParInfo->groupId;
|
||||||
pOperator->resultInfo.totalRows += pDest->info.rows;
|
pOperator->resultInfo.totalRows += pDest->info.rows;
|
||||||
|
@ -1016,6 +1018,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
||||||
cleanupExprSupp(&pInfo->tbnameCalSup);
|
cleanupExprSupp(&pInfo->tbnameCalSup);
|
||||||
cleanupExprSupp(&pInfo->tagCalSup);
|
cleanupExprSupp(&pInfo->tagCalSup);
|
||||||
blockDataDestroy(pInfo->pDelRes);
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
|
taosHashCleanup(pInfo->pPartitions);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1438,7 +1438,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
||||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
||||||
// gap must be 0.
|
// gap must be 0.
|
||||||
SSessionKey startWin = {0};
|
SSessionKey startWin = {0};
|
||||||
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin);
|
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
|
||||||
if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
|
if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
|
||||||
// window has been closed.
|
// window has been closed.
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -3550,7 +3550,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
|
||||||
pKey->win.skey = startTs;
|
pKey->win.skey = startTs;
|
||||||
pKey->win.ekey = endTs;
|
pKey->win.ekey = endTs;
|
||||||
pKey->groupId = groupId;
|
pKey->groupId = groupId;
|
||||||
int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey);
|
int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_SESSION_WIN_KEY_INVALID(pKey);
|
SET_SESSION_WIN_KEY_INVALID(pKey);
|
||||||
}
|
}
|
||||||
|
@ -3561,10 +3561,11 @@ bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->session
|
||||||
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
||||||
SResultWindowInfo* pCurWin) {
|
SResultWindowInfo* pCurWin) {
|
||||||
pCurWin->sessionWin.groupId = groupId;
|
pCurWin->sessionWin.groupId = groupId;
|
||||||
pCurWin->sessionWin.win.skey = startTs - pAggSup->gap;
|
pCurWin->sessionWin.win.skey = startTs;
|
||||||
pCurWin->sessionWin.win.ekey = endTs + pAggSup->gap;
|
pCurWin->sessionWin.win.ekey = endTs;
|
||||||
int32_t size = pAggSup->resultRowSize;
|
int32_t size = pAggSup->resultRowSize;
|
||||||
int32_t code = streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, &pCurWin->pOutputBuf, &size);
|
int32_t code =
|
||||||
|
streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pCurWin->isOutput = true;
|
pCurWin->isOutput = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3575,7 +3576,7 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
|
||||||
|
|
||||||
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
|
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, (const void**)&pWinInfo->pOutputBuf, &size);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3680,7 +3681,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
|
||||||
setSessionWinOutputInfo(pStUpdated, pNextWin);
|
setSessionWinOutputInfo(pStUpdated, pNextWin);
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
pNextWin->sessionWin = pCurWin->sessionWin;
|
pNextWin->sessionWin = pCurWin->sessionWin;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, (const void**)&pNextWin->pOutputBuf, &size);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_SESSION_WIN_INVALID(*pNextWin);
|
SET_SESSION_WIN_INVALID(*pNextWin);
|
||||||
}
|
}
|
||||||
|
@ -3894,7 +3895,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
|
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||||
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
|
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
|
||||||
SStreamStateCur* pCur = streamStateSessionGetCur(pChAggSup->pState, pWinKey);
|
SSessionKey chWinKey = *pWinKey;
|
||||||
|
chWinKey.win.ekey = chWinKey.win.skey;
|
||||||
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
SResultRow* pChResult = NULL;
|
SResultRow* pChResult = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -4112,6 +4115,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
char* pBuf = streamStateSessionDump(pAggSup->pState);
|
||||||
|
qDebug("===stream===final session%s", pBuf);
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
#endif
|
||||||
|
|
||||||
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
if (pInfo->pDelRes->info.rows > 0) {
|
if (pInfo->pDelRes->info.rows > 0) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
|
@ -4306,6 +4315,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
char* pBuf = streamStateSessionDump(pAggSup->pState);
|
||||||
|
qDebug("===stream===semi session%s", pBuf);
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
#endif
|
||||||
|
|
||||||
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "semi session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
|
|
|
@ -911,6 +911,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
float* plist = (float*)pCol->pData;
|
float* plist = (float*)pCol->pData;
|
||||||
|
// float val = 0;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -920,6 +921,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
pAvgRes->count += 1;
|
pAvgRes->count += 1;
|
||||||
pAvgRes->sum.dsum += plist[i];
|
pAvgRes->sum.dsum += plist[i];
|
||||||
}
|
}
|
||||||
|
// pAvgRes->sum.dsum = val;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1278,16 +1280,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1309,16 +1317,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1340,16 +1354,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1371,16 +1391,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1404,16 +1430,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1435,16 +1467,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1466,16 +1504,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1497,16 +1541,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1529,16 +1579,22 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
// ignore the equivalent data value
|
||||||
if ((*val) == pData[i]) {
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
continue;
|
if (isMinFunc) { // min
|
||||||
}
|
if (*val > pData[i]) {
|
||||||
|
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -1559,7 +1615,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
// ignore the equivalent data value
|
#if 0
|
||||||
if ((*val) == pData[i]) {
|
if ((*val) == pData[i]) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1570,6 +1626,23 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
// NOTE: An faster version to avoid one additional comparison with FPU.
|
||||||
|
if (isMinFunc) { // min
|
||||||
|
if (*val > pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // max
|
||||||
|
if (*val < pData[i]) {
|
||||||
|
*val = pData[i];
|
||||||
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
|
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElems += 1;
|
numOfElems += 1;
|
||||||
|
@ -2934,6 +3007,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
int64_t* pts = (int64_t*) pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2942,13 +3016,14 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
||||||
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (numOfElems == 0) {
|
if (numOfElems == 0) {
|
||||||
// save selectivity value for column consisted of all null values
|
// save selectivity value for column consisted of all null values
|
||||||
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
|
@ -3020,6 +3095,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -3028,15 +3104,16 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
doSaveCurrentVal(pCtx, i, cts, type, data);
|
doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (numOfElems == 0) {
|
|
||||||
// save selectivity value for column consisted of all null values
|
// save selectivity value for column consisted of all null values
|
||||||
|
if (numOfElems == 0) {
|
||||||
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
|
||||||
}
|
}
|
||||||
SET_VAL(pResInfo, numOfElems, 1);
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
@ -3216,11 +3293,13 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
|
||||||
|
int64_t* pts = (int64_t*)pInput->pPTS->pData;
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = pts[i];
|
||||||
numOfElems++;
|
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
|
|
|
@ -29,7 +29,7 @@ typedef struct SStateSessionKey {
|
||||||
int64_t opNum;
|
int64_t opNum;
|
||||||
} SStateSessionKey;
|
} SStateSessionKey;
|
||||||
|
|
||||||
static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
|
static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
|
||||||
if (pWin1->groupId > pWin2->groupId) {
|
if (pWin1->groupId > pWin2->groupId) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (pWin1->groupId < pWin2->groupId) {
|
} else if (pWin1->groupId < pWin2->groupId) {
|
||||||
|
@ -45,6 +45,28 @@ static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pW
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
|
||||||
|
if (pWin1->groupId > pWin2->groupId) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->groupId < pWin2->groupId) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWin1->win.skey > pWin2->win.skey) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->win.skey < pWin2->win.skey) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWin1->win.ekey > pWin2->win.ekey) {
|
||||||
|
return 1;
|
||||||
|
} else if (pWin1->win.ekey < pWin2->win.ekey) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
|
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
|
||||||
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
|
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
|
||||||
|
@ -55,7 +77,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void*
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return sessionKeyCmpr(&pWin1->key, &pWin2->key);
|
return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
||||||
|
@ -400,7 +422,6 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -426,7 +447,6 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
|
||||||
|
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -452,7 +472,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
|
||||||
|
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -496,33 +515,18 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons
|
||||||
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
|
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSessionKey* key) {
|
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
|
||||||
if (pCur == NULL) return NULL;
|
|
||||||
tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL);
|
|
||||||
|
|
||||||
int32_t c = -2;
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
|
||||||
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c);
|
|
||||||
if (c != 0) {
|
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pCur->number = pState->number;
|
|
||||||
return pCur;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||||
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
|
||||||
|
SSessionKey resKey = *key;
|
||||||
void* tmp = NULL;
|
void* tmp = NULL;
|
||||||
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
|
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
|
||||||
|
if (code == 0) {
|
||||||
|
*key = resKey;
|
||||||
*pVal = tdbRealloc(NULL, *pVLen);
|
*pVal = tdbRealloc(NULL, *pVLen);
|
||||||
memcpy(*pVal, tmp, *pVLen);
|
memcpy(*pVal, tmp, *pVLen);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
|
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
@ -544,7 +548,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -558,6 +561,34 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
int32_t c = 0;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c <= 0) return pCur;
|
||||||
|
|
||||||
|
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCur;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
|
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
if (pCur == NULL) {
|
if (pCur == NULL) {
|
||||||
|
@ -572,7 +603,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
tdbTbcClose(pCur->pCur);
|
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -586,13 +616,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
if (!pCur) {
|
if (!pCur) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
const SStateSessionKey* pKTmp = NULL;
|
SStateSessionKey* pKTmp = NULL;
|
||||||
int32_t kLen;
|
int32_t kLen;
|
||||||
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
|
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pKTmp->opNum != pCur->number) {
|
if (pKTmp->opNum != pCur->number) {
|
||||||
|
@ -607,14 +637,14 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, c
|
||||||
|
|
||||||
int32_t streamStateSessionClear(SStreamState* pState) {
|
int32_t streamStateSessionClear(SStreamState* pState) {
|
||||||
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
|
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
|
||||||
streamStateSessionPut(pState, &key, NULL, 0);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pState, &key);
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSessionKey delKey = {0};
|
SSessionKey delKey = {0};
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, buf, &size);
|
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
ASSERT(size > 0);
|
||||||
memset(buf, 0, size);
|
memset(buf, 0, size);
|
||||||
streamStateSessionPut(pState, &delKey, buf, size);
|
streamStateSessionPut(pState, &delKey, buf, size);
|
||||||
} else {
|
} else {
|
||||||
|
@ -623,61 +653,104 @@ int32_t streamStateSessionClear(SStreamState* pState) {
|
||||||
streamStateCurNext(pState, pCur);
|
streamStateCurNext(pState, pCur);
|
||||||
}
|
}
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
streamStateSessionDel(pState, &key);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key) {
|
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
|
||||||
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
SSessionKey resKey = *key;
|
if (pCur == NULL) {
|
||||||
while (1) {
|
return -1;
|
||||||
streamStateCurPrev(pState, pCur);
|
|
||||||
SSessionKey tmpKey = *key;
|
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
|
||||||
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
|
|
||||||
resKey = tmpKey;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return streamStateSessionGetRanomCur(pState, &resKey);
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
int32_t c = 0;
|
||||||
|
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
|
|
||||||
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
|
||||||
SSessionKey resKey = *key;
|
SSessionKey resKey = *key;
|
||||||
int32_t res = -1;
|
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
|
||||||
while (1) {
|
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||||
SSessionKey tmpKey = *key;
|
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
|
||||||
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
|
|
||||||
res = 0;
|
|
||||||
resKey = tmpKey;
|
|
||||||
streamStateCurPrev(pState, pCur);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*curKey = resKey;
|
*curKey = resKey;
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return res;
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c > 0) {
|
||||||
|
streamStateCurNext(pState, pCur);
|
||||||
|
code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
|
||||||
|
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||||
|
*curKey = resKey;
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else if (c < 0) {
|
||||||
|
streamStateCurPrev(pState, pCur);
|
||||||
|
code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
|
||||||
|
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
|
||||||
|
*curKey = resKey;
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
|
||||||
|
int32_t* pVLen) {
|
||||||
// todo refactor
|
// todo refactor
|
||||||
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
|
int32_t res = 0;
|
||||||
int32_t size = *pVLen;
|
SSessionKey originKey = *key;
|
||||||
void* tmp = NULL;
|
SSessionKey searchKey = *key;
|
||||||
*pVal = tdbRealloc(NULL, size);
|
searchKey.win.skey = key->win.skey - gap;
|
||||||
memset(*pVal, 0, size);
|
searchKey.win.ekey = key->win.ekey + gap;
|
||||||
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
|
int32_t valSize = *pVLen;
|
||||||
memcpy(*pVal, tmp, *pVLen);
|
void* tmp = tdbRealloc(NULL, valSize);
|
||||||
streamStateFreeCur(pCur);
|
if (!tmp) {
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
||||||
|
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
|
if (code == 0) {
|
||||||
|
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||||
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
streamStateSessionDel(pState, key);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
streamStateCurNext(pState, pCur);
|
||||||
|
} else {
|
||||||
|
*key = originKey;
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return 1;
|
pCur = streamStateSessionSeekKeyNext(pState, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
|
if (code == 0) {
|
||||||
|
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
|
||||||
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
streamStateSessionDel(pState, key);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*key = originKey;
|
||||||
|
res = 1;
|
||||||
|
memset(tmp, 0, valSize);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
|
||||||
|
*pVal = tmp;
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
|
@ -692,16 +765,18 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
streamStateSessionDel(pState, key);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
if (fn(pKeyData, stateKey) == true) {
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
streamStateSessionDel(pState, key);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,11 +787,12 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
|
||||||
pCur = streamStateSessionSeekKeyNext(pState, key);
|
pCur = streamStateSessionSeekKeyNext(pState, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
if (fn(pKeyData, stateKey) == true) {
|
if (fn(pKeyData, stateKey) == true) {
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
|
streamStateSessionDel(pState, key);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -746,8 +822,11 @@ char* streamStateSessionDump(SStreamState* pState) {
|
||||||
tdbTbcMoveToFirst(pCur->pCur);
|
tdbTbcMoveToFirst(pCur->pCur);
|
||||||
|
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
int32_t code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
|
void* buf = NULL;
|
||||||
|
int32_t bufSize = 0;
|
||||||
|
int32_t code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -762,12 +841,14 @@ char* streamStateSessionDump(SStreamState* pState) {
|
||||||
key = (SSessionKey){0};
|
key = (SSessionKey){0};
|
||||||
code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
|
code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
return dumpBuf;
|
return dumpBuf;
|
||||||
}
|
}
|
||||||
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
|
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
|
||||||
len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
|
len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
|
||||||
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
|
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
|
||||||
}
|
}
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
return dumpBuf;
|
return dumpBuf;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -41,13 +41,12 @@ typedef struct SSyncRespMgr {
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
|
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
||||||
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub);
|
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub);
|
||||||
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
|
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq);
|
||||||
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub);
|
||||||
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo);
|
||||||
void syncRespClean(SSyncRespMgr *pObj);
|
void syncRespClean(SSyncRespMgr *pObj);
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj);
|
void syncRespCleanRsp(SSyncRespMgr *pObj);
|
||||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,9 +26,6 @@ typedef struct SRaftId {
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
} SRaftId;
|
} SRaftId;
|
||||||
|
|
||||||
// for compatibility, the same as syncPropose
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
|
||||||
|
|
||||||
// ------------------ for debug -------------------
|
// ------------------ for debug -------------------
|
||||||
void syncRpcMsgPrint(SRpcMsg* pMsg);
|
void syncRpcMsgPrint(SRpcMsg* pMsg);
|
||||||
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
|
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -452,11 +452,6 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|
||||||
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSyncState syncGetState(int64_t rid) {
|
SSyncState syncGetState(int64_t rid) {
|
||||||
SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
|
SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
|
||||||
|
|
||||||
|
@ -558,109 +553,27 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
|
||||||
return lastIndex;
|
return lastIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
SyncTerm syncGetMyTerm(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return TAOS_SYNC_STATE_ERROR;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncTerm term = pSyncNode->pRaftStore->currentTerm;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return term;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex syncGetLastIndex(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return SYNC_INDEX_INVALID;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return lastIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex syncGetCommitIndex(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return SYNC_INDEX_INVALID;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncIndex cmtIndex = pSyncNode->commitIndex;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return cmtIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncGroupId syncGetVgId(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return TAOS_SYNC_STATE_ERROR;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncGroupId vgId = pSyncNode->vgId;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return vgId;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
memset(pEpSet, 0, sizeof(*pEpSet));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
pEpSet->numOfEps = 0;
|
|
||||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
|
||||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
|
||||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
|
||||||
(pEpSet->numOfEps)++;
|
|
||||||
sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
|
||||||
}
|
|
||||||
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
|
||||||
sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
memset(pEpSet, 0, sizeof(*pEpSet));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pEpSet->numOfEps = 0;
|
pEpSet->numOfEps = 0;
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
|
if (pSyncNode == NULL) return;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
SEp* pEp = &pEpSet->eps[i];
|
||||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
|
||||||
(pEpSet->numOfEps)++;
|
pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||||
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
|
pEpSet->numOfEps++;
|
||||||
pEpSet->eps[i].port);
|
sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
|
||||||
}
|
}
|
||||||
if (pEpSet->numOfEps > 0) {
|
if (pEpSet->numOfEps > 0) {
|
||||||
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||||
}
|
}
|
||||||
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
|
||||||
|
|
||||||
|
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
|
|
||||||
SRespStub stub;
|
|
||||||
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
|
|
||||||
if (ret == 1) {
|
|
||||||
*pInfo = stub.rpcMsg.info;
|
|
||||||
}
|
|
||||||
|
|
||||||
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -719,8 +632,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
|
sNTrace(pSyncNode, "propose message, type:%s", TMSG_INFO(pMsg->msgType));
|
||||||
ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
ret = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
||||||
sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr());
|
sError("vgId:%d, failed to enqueue msg since %s", pSyncNode->vgId, terrstr());
|
||||||
|
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2549,11 +2462,36 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// del resp mgr, call FpCommitCb
|
// del resp mgr, call FpCommitCb
|
||||||
ASSERT(0);
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta = {
|
||||||
|
.index = pEntry->index,
|
||||||
|
.lastConfigIndex = SYNC_INDEX_INVALID,
|
||||||
|
.isWeak = pEntry->isWeak,
|
||||||
|
.code = -1,
|
||||||
|
.state = ths->state,
|
||||||
|
.seqNum = pEntry->seqNum,
|
||||||
|
.term = pEntry->term,
|
||||||
|
.currentTerm = ths->pRaftStore->currentTerm,
|
||||||
|
.flag = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||||
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
||||||
|
|
||||||
|
if (h) {
|
||||||
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2759,7 +2697,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
.flag = flag,
|
.flag = flag,
|
||||||
};
|
};
|
||||||
|
|
||||||
syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
|
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,21 +13,22 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRespMgr.h"
|
#include "syncRespMgr.h"
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
|
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(pObj, 0, sizeof(SSyncRespMgr));
|
|
||||||
|
|
||||||
pObj->pRespHash =
|
pObj->pRespHash =
|
||||||
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
ASSERT(pObj->pRespHash != NULL);
|
if (pObj->pRespHash == NULL) return NULL;
|
||||||
|
|
||||||
pObj->ttl = ttl;
|
pObj->ttl = ttl;
|
||||||
pObj->data = data;
|
pObj->data = data;
|
||||||
pObj->seqNum = 0;
|
pObj->seqNum = 0;
|
||||||
|
@ -38,93 +39,84 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
|
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
taosHashCleanup(pObj->pRespHash);
|
taosHashCleanup(pObj->pRespHash);
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
taosThreadMutexDestroy(&(pObj->mutex));
|
taosThreadMutexDestroy(&(pObj->mutex));
|
||||||
taosMemoryFree(pObj);
|
taosMemoryFree(pObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
uint64_t keyCode = ++(pObj->seqNum);
|
uint64_t seq = ++(pObj->seqNum);
|
||||||
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
|
int32_t code = taosHashPut(pObj->pRespHash, &seq, sizeof(uint64_t), pStub, sizeof(SRespStub));
|
||||||
|
sNTrace(pObj->data, "save message handle:%p, type:%s seq:%" PRIu64 " code:0x%x", pStub->rpcMsg.info.handle,
|
||||||
|
TMSG_INFO(pStub->rpcMsg.msgType), seq, code);
|
||||||
|
|
||||||
sNTrace(pObj->data, "save message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
keyCode, pStub->rpcMsg.info.handle);
|
return seq;
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return keyCode;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
|
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
int32_t code = taosHashRemove(pObj->pRespHash, &seq, sizeof(seq));
|
||||||
|
sNTrace(pObj->data, "remove message handle, seq:%" PRIu64 " code:%d", seq, code);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
|
SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
if (pTmp != NULL) {
|
if (pTmp != NULL) {
|
||||||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
memcpy(pStub, pTmp, sizeof(SRespStub));
|
||||||
|
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
|
||||||
|
pStub->rpcMsg.info.handle);
|
||||||
|
|
||||||
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
index, pStub->rpcMsg.info.handle);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0; // get none object
|
return 0; // get none object
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
|
SRespStub *pStub = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
if (pTmp != NULL) {
|
if (pStub != NULL) {
|
||||||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
*pInfo = pStub->rpcMsg.info;
|
||||||
|
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
|
||||||
|
TMSG_INFO(pStub->rpcMsg.msgType), seq);
|
||||||
|
taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
|
|
||||||
sNTrace(pObj->data, "get-and-del message handle, type:%s seq:%" PRIu64 " handle:%p",
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
|
|
||||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0; // get none object
|
return 0; // get none object
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
|
||||||
syncRespCleanByTTL(pObj, -1, true);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRespClean(SSyncRespMgr *pObj) {
|
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
|
||||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
|
||||||
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
|
|
||||||
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
|
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||||
ASSERT(delIndexArray != NULL);
|
if (delIndexArray == NULL) return;
|
||||||
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
|
||||||
|
|
||||||
|
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
||||||
while (pStub) {
|
while (pStub) {
|
||||||
size_t len;
|
size_t len;
|
||||||
void * key = taosHashGetKey(pStub, &len);
|
void *key = taosHashGetKey(pStub, &len);
|
||||||
uint64_t *pSeqNum = (uint64_t *)key;
|
uint64_t *pSeqNum = (uint64_t *)key;
|
||||||
sum++;
|
sum++;
|
||||||
|
|
||||||
|
@ -149,15 +141,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
pStub->rpcMsg.contLen = 0;
|
pStub->rpcMsg.contLen = 0;
|
||||||
|
|
||||||
// TODO: and make rpcMsg body, call commit cb
|
// TODO: and make rpcMsg body, call commit cb
|
||||||
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
|
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
|
||||||
|
|
||||||
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
if (pStub->rpcMsg.info.handle != NULL) {
|
if (pStub->rpcMsg.info.handle != NULL) {
|
||||||
tmsgSendRsp(&(pStub->rpcMsg));
|
tmsgSendRsp(&pStub->rpcMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||||
|
@ -170,3 +162,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(delIndexArray);
|
taosArrayDestroy(delIndexArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
syncRespCleanByTTL(pObj, -1, true);
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRespClean(SSyncRespMgr *pObj) {
|
||||||
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
}
|
||||||
|
|
|
@ -201,7 +201,7 @@ if $loop_count == 10 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
print ======$rows
|
print =====rows=$rows
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue