commit
1e9eb353ab
|
@ -2,7 +2,7 @@
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "3.0.1.8")
|
||||
SET(TD_VER_NUMBER "3.0.2.0")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -33,16 +33,16 @@ extern "C" {
|
|||
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }}
|
||||
// clang-format on
|
||||
|
||||
#define WAL_PROTO_VER 0
|
||||
#define WAL_NOSUFFIX_LEN 20
|
||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||
#define WAL_LOG_SUFFIX "log"
|
||||
#define WAL_INDEX_SUFFIX "idx"
|
||||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
#define WAL_PROTO_VER 0
|
||||
#define WAL_NOSUFFIX_LEN 20
|
||||
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
|
||||
#define WAL_LOG_SUFFIX "log"
|
||||
#define WAL_INDEX_SUFFIX "idx"
|
||||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
|
||||
typedef enum {
|
||||
TAOS_WAL_WRITE = 1,
|
||||
|
|
|
@ -186,10 +186,10 @@ void taos_free_result(TAOS_RES *res) {
|
|||
destroyRequest(pRequest);
|
||||
} else if (TD_RES_TMQ_METADATA(res)) {
|
||||
SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res;
|
||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
// taosx
|
||||
taosArrayDestroy(pRsp->rsp.createTableLen);
|
||||
taosArrayDestroyP(pRsp->rsp.createTableReq, taosMemoryFree);
|
||||
|
@ -199,10 +199,10 @@ void taos_free_result(TAOS_RES *res) {
|
|||
taosMemoryFree(pRsp);
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj *pRsp = (SMqRspObj *)res;
|
||||
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->rsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
pRsp->resInfo.pRspMsg = NULL;
|
||||
doFreeReqResultInfo(&pRsp->resInfo);
|
||||
taosMemoryFree(pRsp);
|
||||
|
|
|
@ -814,24 +814,55 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
|
||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
|
||||
// do nothing
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
|
||||
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->dataRsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosMemoryFree(pRsp->metaRsp.metaRsp);
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
|
||||
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
|
||||
taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
|
||||
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
|
||||
taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
// taosx
|
||||
taosArrayDestroy(pRsp->taosxRsp.createTableLen);
|
||||
taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
|
||||
}
|
||||
}
|
||||
|
||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||
SMqRspWrapper* msg = NULL;
|
||||
SMqRspWrapper* rspWrapper = NULL;
|
||||
while (1) {
|
||||
taosGetQitem(tmq->qall, (void**)&msg);
|
||||
if (msg)
|
||||
taosFreeQitem(msg);
|
||||
else
|
||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||
if (rspWrapper) {
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(rspWrapper);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
msg = NULL;
|
||||
rspWrapper = NULL;
|
||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
while (1) {
|
||||
taosGetQitem(tmq->qall, (void**)&msg);
|
||||
if (msg)
|
||||
taosFreeQitem(msg);
|
||||
else
|
||||
taosGetQitem(tmq->qall, (void**)&rspWrapper);
|
||||
if (rspWrapper) {
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(rspWrapper);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1644,6 +1675,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
|||
tDeleteSMqAskEpRsp(rspMsg);
|
||||
*pReset = true;
|
||||
} else {
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
*pReset = false;
|
||||
}
|
||||
} else {
|
||||
|
@ -1695,6 +1727,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
} else {
|
||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
|
@ -1713,6 +1746,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
} else {
|
||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
|
@ -1743,6 +1777,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
|||
} else {
|
||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(pollRspWrapper);
|
||||
}
|
||||
} else {
|
||||
|
@ -1794,7 +1829,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
while (1) {
|
||||
tmqHandleAllDelayedTask(tmq);
|
||||
if (tmqPollImpl(tmq, timeout) < 0) {
|
||||
tscDebug("return since poll err");
|
||||
tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId);
|
||||
/*return NULL;*/
|
||||
}
|
||||
|
||||
|
|
|
@ -1946,9 +1946,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len,
|
||||
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
|
||||
"|rows:%d|version:%" PRIu64 "\n",
|
||||
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
|
||||
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
||||
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
|
|
|
@ -554,10 +554,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
goto SUBSCRIBE_OVER;
|
||||
}
|
||||
|
||||
// check topic only
|
||||
#if 0
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
goto SUBSCRIBE_OVER;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
|
|
@ -507,7 +507,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
|
|||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, v);
|
||||
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
||||
if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
|
||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||
return -1;
|
||||
}
|
||||
|
@ -891,7 +891,7 @@ static int32_t mndAddVnodeToVgroup(SMnode *pMnode, STrans *pTrans, SVgObj *pVgro
|
|||
}
|
||||
if (used) continue;
|
||||
|
||||
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
||||
if (pDnode == NULL || pDnode->numOfVnodes >= pDnode->numOfSupportVnodes) {
|
||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -725,9 +725,15 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
}
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
|
||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||
if (code != 0) {
|
||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||
if (pHandle) {
|
||||
if (pHandle->pRef) {
|
||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
||||
}
|
||||
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||
if (code != 0) {
|
||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||
}
|
||||
}
|
||||
|
||||
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
|
||||
|
@ -736,7 +742,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
}
|
||||
|
||||
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
|
||||
ASSERT(0);
|
||||
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "ttime.h"
|
||||
|
||||
#define IS_FINAL_OP(op) ((op)->isFinal)
|
||||
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
||||
|
||||
typedef struct SSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
|
@ -56,6 +57,7 @@ typedef enum SResultTsInterpType {
|
|||
typedef struct SPullWindowInfo {
|
||||
STimeWindow window;
|
||||
uint64_t groupId;
|
||||
STimeWindow calWin;
|
||||
} SPullWindowInfo;
|
||||
|
||||
typedef struct SOpenWindowInfo {
|
||||
|
@ -793,17 +795,18 @@ int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
|
|||
SArray* res = (SArray*)data;
|
||||
SPullWindowInfo* pos = taosArrayGet(res, index);
|
||||
SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
|
||||
if (pData->window.skey == pos->window.skey) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else if (pData->window.skey > pos->window.skey) {
|
||||
if (pData->groupId > pos->groupId) {
|
||||
return 1;
|
||||
} else if (pData->groupId < pos->groupId) {
|
||||
return -1;
|
||||
}
|
||||
return -1;
|
||||
|
||||
if (pData->window.skey > pos->window.ekey) {
|
||||
return 1;
|
||||
} else if (pData->window.ekey < pos->window.skey) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
||||
|
@ -812,10 +815,16 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
|||
if (index == -1) {
|
||||
index = 0;
|
||||
} else {
|
||||
if (comparePullWinKey(pPullInfo, pPullWins, index) > 0) {
|
||||
index++;
|
||||
} else {
|
||||
int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
|
||||
if (code == 0) {
|
||||
SPullWindowInfo* pos = taosArrayGet(pPullWins ,index);
|
||||
pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
|
||||
pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
|
||||
pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
|
||||
pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (code > 0 ){
|
||||
index++;
|
||||
}
|
||||
}
|
||||
if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
|
||||
|
@ -2255,8 +2264,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
|
||||
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
|
||||
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
|
||||
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
|
||||
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
|
||||
colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
|
||||
colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
if ((*pIndex) == size) {
|
||||
|
@ -2266,27 +2275,33 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
}
|
||||
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
|
||||
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsData = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* tsEndData = (TSKEY*)pEndCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
|
||||
int32_t chId = getChildIndex(pBlock);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
|
||||
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
|
||||
if (chIds) {
|
||||
SArray* chArray = *(SArray**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
TSKEY winTs = tsData[i];
|
||||
while (winTs < tsEndData[i]) {
|
||||
SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
|
||||
void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
|
||||
if (chIds) {
|
||||
SArray* chArray = *(SArray**)chIds;
|
||||
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
|
||||
if (index != -1) {
|
||||
qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
|
||||
taosArrayRemove(chArray, index);
|
||||
if (taosArrayGetSize(chArray) == 0) {
|
||||
// pull data is over
|
||||
taosArrayDestroy(chArray);
|
||||
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
}
|
||||
}
|
||||
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2299,12 +2314,13 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
|
|||
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
|
||||
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
|
||||
if (!chIds) {
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId};
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
savePullWindow(&pull, pInfo->pPullWins);
|
||||
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, winKey, size1);
|
||||
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
|
||||
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
|
||||
int32_t size1 = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, winKey, size1);
|
||||
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2374,12 +2390,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
};
|
||||
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
||||
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId};
|
||||
SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
savePullWindow(&pull, pInfo->pPullWins);
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, &winRes, size);
|
||||
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
|
||||
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
addPullWindow(pInfo->pPullDataMap, &winRes, size);
|
||||
qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
|
||||
}
|
||||
} else {
|
||||
int32_t index = -1;
|
||||
SArray* chArray = NULL;
|
||||
|
@ -2560,7 +2577,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
|
||||
processPullOver(pBlock, pInfo->pPullDataMap);
|
||||
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2638,6 +2655,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
|
||||
if (pIntervalPhyNode->window.deleteMark <= 0) {
|
||||
return DEAULT_DELETE_MARK;
|
||||
}
|
||||
int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark);
|
||||
deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
|
||||
return deleteMark;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
@ -2659,9 +2685,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
// for test 315360000000
|
||||
.deleteMark = 1000LL * 60LL * 60LL * 24LL * 365LL * 10LL,
|
||||
// .deleteMark = INT64_MAX,
|
||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||
.deleteMarkSaved = 0,
|
||||
.calTriggerSaved = 0,
|
||||
};
|
||||
|
@ -4805,7 +4829,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.calTrigger = pIntervalPhyNode->window.triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
.minTs = INT64_MAX,
|
||||
.deleteMark = INT64_MAX,
|
||||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||
};
|
||||
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
|
|
@ -31,7 +31,7 @@ extern "C" {
|
|||
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 100
|
||||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SCH_TASK_NUMBER 3000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
||||
#define QW_SCH_TIMEOUT_MSEC 180000
|
||||
|
@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt {
|
|||
|
||||
#define QW_ERR_RET(c) \
|
||||
do { \
|
||||
int32_t _code = (c); \
|
||||
int32_t _code = (c); \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
return _code; \
|
||||
|
@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt {
|
|||
} while (0)
|
||||
#define QW_RET(c) \
|
||||
do { \
|
||||
int32_t _code = (c); \
|
||||
int32_t _code = (c); \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
} \
|
||||
|
@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt {
|
|||
} while (0)
|
||||
#define QW_ERR_JRET(c) \
|
||||
do { \
|
||||
code = (c); \
|
||||
code = (c); \
|
||||
if (code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = code; \
|
||||
goto _return; \
|
||||
|
|
|
@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
|||
|
||||
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
||||
|
@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
int8_t nodeType = mgmt->nodeType;
|
||||
int32_t nodeId = mgmt->nodeId;
|
||||
|
||||
int32_t taskCount = 0;
|
||||
int32_t schStatusCount = 0;
|
||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||
|
||||
taosTmrStop(mgmt->hbTimer);
|
||||
|
@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
uint64_t qId, tId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
|
@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
qwFreeTaskCtx(ctx);
|
||||
QW_TASK_DLOG_E("task ctx freed");
|
||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||
taskCount++;
|
||||
}
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
||||
|
@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
while (pIter) {
|
||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
||||
qwDestroySchStatus(sch);
|
||||
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
schStatusCount++;
|
||||
}
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
|
||||
|
@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
|
||||
qwCloseRef();
|
||||
|
||||
qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||
qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt,
|
||||
taskCount, schStatusCount);
|
||||
}
|
||||
|
||||
int32_t qwOpenRef(void) {
|
||||
|
|
|
@ -8,9 +8,9 @@
|
|||
#include "qwMsg.h"
|
||||
#include "tcommon.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
SQWorkerMgmt gQwMgmt = {
|
||||
.lock = 0,
|
||||
|
@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
|||
if (queryStop) {
|
||||
*queryStop = true;
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks,
|
||||
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
|
||||
pOutput->numOfRows);
|
||||
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
|
||||
|
@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
}
|
||||
|
||||
if (0 == ctx->level) {
|
||||
QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
|
||||
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows,
|
||||
ctx->level);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
||||
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
|
||||
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
|
||||
pOutput->numOfRows);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -538,7 +540,7 @@ _return:
|
|||
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
|
||||
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
|
||||
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
|
||||
if (!rsped) {
|
||||
if (!rsped) {
|
||||
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
|
||||
}
|
||||
}
|
||||
|
@ -650,8 +652,8 @@ _return:
|
|||
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
|
||||
|
||||
if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
void *rsp = NULL;
|
||||
int32_t dataLen = 0;
|
||||
SOutputData sOutput = {0};
|
||||
if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -671,8 +673,8 @@ _return:
|
|||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||
tstrerror(code), dataLen);
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
do {
|
||||
ctx = NULL;
|
||||
|
||||
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
|
||||
|
||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
}
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
|
||||
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code ||
|
||||
0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
|
||||
// Note: query is not running anymore
|
||||
QW_SET_PHASE(ctx, 0);
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||
|
||||
uint64_t qId, tId;
|
||||
int32_t eId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
|
@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
QW_TASK_DLOG_E("start to force stop task");
|
||||
|
||||
|
||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_WLOG_E("task already dropping");
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (QW_QUERY_RUNNING(ctx)) {
|
||||
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
||||
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
|
|
|
@ -285,6 +285,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
}
|
||||
destroyCmsg(msg);
|
||||
}
|
||||
memset(&conn->ctx, 0, sizeof(conn->ctx));
|
||||
}
|
||||
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||
if (!transQueueEmpty(&conn->cliMsgs)) {
|
||||
|
@ -589,6 +590,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
}
|
||||
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||
if (update) {
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
conn->refId = -1;
|
||||
}
|
||||
|
@ -697,6 +699,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||
QUEUE_REMOVE(&conn->q);
|
||||
QUEUE_INIT(&conn->q);
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
conn->refId = -1;
|
||||
|
||||
|
@ -731,6 +734,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
conn->timer = NULL;
|
||||
}
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
taosMemoryFree(conn->ip);
|
||||
taosMemoryFree(conn->stream);
|
||||
|
|
|
@ -282,6 +282,9 @@ void transCtxCleanup(STransCtx* ctx) {
|
|||
}
|
||||
|
||||
void transCtxMerge(STransCtx* dst, STransCtx* src) {
|
||||
if (src->args == NULL || src->freeFunc == NULL) {
|
||||
return;
|
||||
}
|
||||
if (dst->args == NULL) {
|
||||
dst->args = src->args;
|
||||
dst->brokenVal = src->brokenVal;
|
||||
|
|
|
@ -223,6 +223,7 @@ void walClose(SWal *pWal) {
|
|||
taosMemoryFree(pRef);
|
||||
}
|
||||
taosHashCleanup(pWal->pRefHash);
|
||||
pWal->pRefHash = NULL;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||
|
|
|
@ -32,15 +32,18 @@ SWalRef *walOpenRef(SWal *pWal) {
|
|||
return pRef;
|
||||
}
|
||||
|
||||
#if 1
|
||||
void walCloseRef(SWal *pWal, int64_t refId) {
|
||||
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||
if (ppRef == NULL) return;
|
||||
SWalRef *pRef = *ppRef;
|
||||
if (pRef) {
|
||||
wDebug("vgId:%d, wal close ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
|
||||
}
|
||||
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||
taosMemoryFree(pRef);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
||||
SWal *pWal = pRef->pWal;
|
||||
|
|
|
@ -4,11 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
|
|||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/deploy.sh -n dnode5 -i 5
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode5 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode4 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode5 -c supportVnodes -v 5
|
||||
|
||||
print ========== step1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
|
|
@ -461,7 +461,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
|
|||
$loop_count = 0
|
||||
|
||||
loop2:
|
||||
sleep 100
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -519,7 +519,7 @@ print step 6
|
|||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
# sleep 300
|
||||
sleep 300
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
|
@ -618,6 +618,60 @@ if $data41 != 2 then
|
|||
goto loop4
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791343003,4,4,4,3.1);
|
||||
sql insert into t1 values(1648791213004,4,5,5,4.1);
|
||||
|
||||
loop5:
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
|
||||
# row 0
|
||||
if $rows != 7 then
|
||||
print =====rows=$rows
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data21 != 4 then
|
||||
print =====data21=$data21
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data31 != 4 then
|
||||
print =====data31=$data31
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data41 != 2 then
|
||||
print =====data41=$data41
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data51 != 1 then
|
||||
print =====data51=$data51
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data61 != 1 then
|
||||
print =====data61=$data61
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
|
|
|
@ -3,10 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 5
|
||||
system sh/cfg.sh -n dnode4 -c supportVnodes -v 5
|
||||
|
||||
$dbPrefix = br1_db
|
||||
$tbPrefix = br1_tb
|
||||
|
|
Loading…
Reference in New Issue