diff --git a/examples/c/tmq.c b/examples/c/tmq.c
index 7bb8aa84a4..83b40c4f20 100644
--- a/examples/c/tmq.c
+++ b/examples/c/tmq.c
@@ -61,7 +61,7 @@ static int32_t init_env() {
printf("create database\n");
pRes = taos_query(pConn, "drop topic topicname");
if (taos_errno(pRes) != 0) {
- printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
+ printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
diff --git a/include/common/systable.h b/include/common/systable.h
index 9b5f66f64c..558a1ca297 100644
--- a/include/common/systable.h
+++ b/include/common/systable.h
@@ -12,6 +12,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
+#ifndef TDENGINE_SYSTABLE_H
+#define TDENGINE_SYSTABLE_H
#ifdef __cplusplus
extern "C" {
@@ -19,9 +22,6 @@ extern "C" {
#include "os.h"
-#ifndef TDENGINE_SYSTABLE_H
-#define TDENGINE_SYSTABLE_H
-
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "ins_dnodes"
#define TSDB_INS_TABLE_MNODES "ins_mnodes"
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index bbf332c4d4..cb547ee6b3 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -26,6 +26,7 @@ extern "C" {
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
+#include "systable.h"
typedef enum {
JOB_TASK_STATUS_NULL = 0,
@@ -302,9 +303,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define REQUEST_TOTAL_EXEC_TIMES 2
-#define IS_SYS_DBNAME(_dbname) \
- (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || \
- ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
+#define IS_INFORMATION_SCHEMA_DB(_name) ((*(_name) == 'i') && (0 == strcmp(_name, TSDB_INFORMATION_SCHEMA_DB)))
+#define IS_PERFORMANCE_SCHEMA_DB(_name) ((*(_name) == 'p') && (0 == strcmp(_name, TSDB_PERFORMANCE_SCHEMA_DB)))
+
+#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
#define qFatal(...) \
do { \
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index 1eed342f8c..014ed518a3 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -66,6 +66,7 @@ typedef struct {
int64_t commitVer;
int64_t appliedVer;
int64_t lastVer;
+ int64_t logRetention;
} SWalVer;
#pragma pack(push, 1)
@@ -126,7 +127,7 @@ typedef struct SWal {
typedef struct {
int64_t refId;
int64_t refVer;
- int64_t refFile;
+// int64_t refFile;
SWal *pWal;
} SWalRef;
@@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
-int32_t walBeginSnapshot(SWal *, int64_t ver);
+int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention);
int32_t walEndSnapshot(SWal *);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// for tq
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 7c5dd72380..b5838386db 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -49,6 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
return TSDB_CODE_SUCCESS;
}
+static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
+ int32_t code = 0;
+ SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
+ if (NULL == vgInfo) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ return code;
+ }
+
+ vgInfo->vgVersion = rsp->vgVersion;
+ vgInfo->stateTs = rsp->stateTs;
+ vgInfo->hashMethod = rsp->hashMethod;
+ vgInfo->hashPrefix = rsp->hashPrefix;
+ vgInfo->hashSuffix = rsp->hashSuffix;
+ vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
+ if (NULL == vgInfo->vgHash) {
+ taosMemoryFree(vgInfo);
+ tscError("hash init[%d] failed", rsp->vgNum);
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _return;
+ }
+
+ for (int32_t j = 0; j < rsp->vgNum; ++j) {
+ SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
+ if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
+ tscError("hash push failed, errno:%d", errno);
+ taosHashCleanup(vgInfo->vgHash);
+ taosMemoryFree(vgInfo);
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _return;
+ }
+ }
+
+_return:
+ if (code) {
+ taosHashCleanup(vgInfo->vgHash);
+ taosMemoryFreeClear(vgInfo);
+ }
+
+ *pInfo = vgInfo;
+ return code;
+}
+
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
@@ -67,37 +109,22 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
- SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
- if (NULL == vgInfo) {
- code = TSDB_CODE_OUT_OF_MEMORY;
+ SDBVgInfo *vgInfo = NULL;
+ code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
+ if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
- vgInfo->vgVersion = rsp->vgVersion;
- vgInfo->stateTs = rsp->stateTs;
- vgInfo->hashMethod = rsp->hashMethod;
- vgInfo->hashPrefix = rsp->hashPrefix;
- vgInfo->hashSuffix = rsp->hashSuffix;
- vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
- if (NULL == vgInfo->vgHash) {
- taosMemoryFree(vgInfo);
- tscError("hash init[%d] failed", rsp->vgNum);
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _return;
- }
-
- for (int32_t j = 0; j < rsp->vgNum; ++j) {
- SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
- if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
- tscError("hash push failed, errno:%d", errno);
- taosHashCleanup(vgInfo->vgHash);
- taosMemoryFree(vgInfo);
- code = TSDB_CODE_OUT_OF_MEMORY;
- goto _return;
- }
- }
-
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
+
+ if (IS_SYS_DBNAME(rsp->db)) {
+ code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
+ if (TSDB_CODE_SUCCESS != code) {
+ goto _return;
+ }
+
+ catalogUpdateDBVgInfo(pCatalog, (rsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->uid, vgInfo);
+ }
}
if (code) {
@@ -491,6 +518,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
for (int32_t i = 0; i < dbNum; ++i) {
SDbVgVersion *db = &dbs[i];
+ tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
+ i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
+
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->numOfTable = htonl(db->numOfTable);
diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c
index 409c19f65e..a5f963a956 100644
--- a/source/client/src/clientMsgHandler.c
+++ b/source/client/src/clientMsgHandler.c
@@ -162,6 +162,22 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pEpSet);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
+ } else {
+ struct SCatalog* pCatalog = NULL;
+ int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
+ if (TSDB_CODE_SUCCESS == code) {
+ STscObj* pTscObj = pRequest->pTscObj;
+
+ SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
+ .requestId = pRequest->requestId,
+ .requestObjRefId = pRequest->self,
+ .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
+ char dbFName[TSDB_DB_FNAME_LEN];
+ snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
+ catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
+ snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
+ catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
+ }
}
if (pRequest->body.queryFp) {
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index bb87e88214..d913b32642 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -823,7 +823,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
- tscDebug("consumer:0x%" PRIx64 " next retrieve ep from mnode in 1s", pTmq->consumerId);
+ tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
@@ -831,7 +831,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
- tscDebug("consumer:0x%" PRIx64 " next commit to mnode in %.2fs", pTmq->consumerId,
+ tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
@@ -1575,25 +1575,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
}
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
- /*strcpy(pReq->topic, pTopic->topicName);*/
- /*strcpy(pReq->cgroup, tmq->groupId);*/
-
int32_t groupLen = strlen(tmq->groupId);
memcpy(pReq->subKey, tmq->groupId, groupLen);
pReq->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
- pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId;
+ pReq->timeout = timeout;
pReq->epoch = tmq->epoch;
/*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->currentOffset;
- pReq->reqId = generateRequestId();
-
- pReq->useSnapshot = tmq->useSnapshot;
-
pReq->head.vgId = pVg->vgId;
+ pReq->useSnapshot = tmq->useSnapshot;
+ pReq->reqId = generateRequestId();
}
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
@@ -1643,6 +1638,76 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
+static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
+ atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
+ tsem_post(&pTmq->rspSem);
+ return -1;
+}
+
+static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
+ SMqPollReq req = {0};
+ tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
+
+ int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
+ if (msgSize < 0) {
+ return handleErrorBeforePoll(pVg, pTmq);
+ }
+
+ char* msg = taosMemoryCalloc(1, msgSize);
+ if (NULL == msg) {
+ return handleErrorBeforePoll(pVg, pTmq);
+ }
+
+ if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
+ taosMemoryFree(msg);
+ return handleErrorBeforePoll(pVg, pTmq);
+ }
+
+ SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
+ if (pParam == NULL) {
+ taosMemoryFree(msg);
+ return handleErrorBeforePoll(pVg, pTmq);
+ }
+
+ pParam->refId = pTmq->refId;
+ pParam->epoch = pTmq->epoch;
+ pParam->pVg = pVg; // pVg may be released,fix it
+ pParam->pTopic = pTopic;
+ pParam->vgId = pVg->vgId;
+
+ SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
+ if (sendInfo == NULL) {
+ taosMemoryFree(pParam);
+ taosMemoryFree(msg);
+ return handleErrorBeforePoll(pVg, pTmq);
+ }
+
+ sendInfo->msgInfo = (SDataBuf){
+ .pData = msg,
+ .len = msgSize,
+ .handle = NULL,
+ };
+
+ sendInfo->requestId = req.reqId;
+ sendInfo->requestObjRefId = 0;
+ sendInfo->param = pParam;
+ sendInfo->fp = tmqPollCb;
+ sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
+
+ int64_t transporterId = 0;
+ char offsetFormatBuf[80];
+ tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);
+
+ tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
+ pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
+ asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
+
+ pVg->pollCnt++;
+ pTmq->pollCnt++;
+
+ return TSDB_CODE_SUCCESS;
+}
+
// broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
@@ -1650,7 +1715,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
- for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
+ int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
+
+ for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
@@ -1669,77 +1736,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
atomic_store_32(&pVg->vgSkipCnt, 0);
-
- SMqPollReq req = {0};
- tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
- int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
- if (msgSize < 0) {
- atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
- tsem_post(&tmq->rspSem);
- return -1;
+ int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
}
-
- char* msg = taosMemoryCalloc(1, msgSize);
- if (NULL == msg) {
- atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
- tsem_post(&tmq->rspSem);
- return -1;
- }
-
- if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
- taosMemoryFree(msg);
- atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
- tsem_post(&tmq->rspSem);
- return -1;
- }
-
- SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
- if (pParam == NULL) {
- taosMemoryFree(msg);
- atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
- tsem_post(&tmq->rspSem);
- return -1;
- }
-
- pParam->refId = tmq->refId;
- pParam->epoch = tmq->epoch;
-
- pParam->pVg = pVg;
- pParam->pTopic = pTopic;
- pParam->vgId = pVg->vgId;
-
- SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
- if (sendInfo == NULL) {
- taosMemoryFree(msg);
- taosMemoryFree(pParam);
- atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
- tsem_post(&tmq->rspSem);
- return -1;
- }
-
- sendInfo->msgInfo = (SDataBuf){
- .pData = msg,
- .len = msgSize,
- .handle = NULL,
- };
-
- sendInfo->requestId = req.reqId;
- sendInfo->requestObjRefId = 0;
- sendInfo->param = pParam;
- sendInfo->fp = tmqPollCb;
- sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
-
- int64_t transporterId = 0;
-
- char offsetFormatBuf[80];
- tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
-
- tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
- tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
- asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
-
- pVg->pollCnt++;
- tmq->pollCnt++;
}
}
diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h
index 8cd669c769..8ed7fc6a11 100644
--- a/source/dnode/mnode/impl/inc/mndTopic.h
+++ b/source/dnode/mnode/impl/inc/mndTopic.h
@@ -27,14 +27,9 @@ void mndCleanupTopic(SMnode *pMnode);
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName);
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
-
-SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
-SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
-
-int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
-int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb);
-
-const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
+int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
+bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb);
+const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index a8e53b725b..8fd52d52c6 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -259,8 +259,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
- mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64,
- pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
+ mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d",
+ pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
+ hbStatus);
if (status == MQ_CONSUMER_STATUS__READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
@@ -272,6 +273,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
.pCont = pLostMsg,
.contLen = sizeof(SMqConsumerLostMsg),
};
+
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
@@ -285,6 +287,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
.pCont = pClearMsg,
.contLen = sizeof(SMqConsumerClearMsg),
};
+
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
} else if (status == MQ_CONSUMER_STATUS__LOST) {
@@ -664,7 +667,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} else {
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
char *newTopic = taosArrayGetP(newSub, j);
- int comp = compareLenPrefixedStr(oldTopic, newTopic);
+ int comp = strcmp(oldTopic, newTopic);
if (comp == 0) {
i++;
j++;
diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c
index d2b16967f9..3efd8fb249 100644
--- a/source/dnode/mnode/impl/src/mndDb.c
+++ b/source/dnode/mnode/impl/src/mndDb.c
@@ -1039,13 +1039,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int32_t code = -1;
- STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db");
- if (pTrans == NULL) goto _OVER;
- mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db");
+ if (pTrans == NULL) {
+ goto _OVER;
+ }
+
+ mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name);
+
mndTransSetDbName(pTrans, pDb->name, NULL);
- if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
- if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER;
+ if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
+ goto _OVER;
+ }
+
+ if (mndTopicExistsForDb(pMnode, pDb)) {
+ terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
+ goto _OVER;
+ }
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
@@ -1097,10 +1107,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
}
code = mndDropDb(pMnode, pReq, pDb);
- if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
+ if (code == TSDB_CODE_SUCCESS) {
+ code = TSDB_CODE_ACTION_IN_PROGRESS;
+ }
_OVER:
- if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
+ if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop since %s", dropReq.db, terrstr());
}
diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c
index 4bf8f42506..8d8fd1b9b5 100644
--- a/source/dnode/mnode/impl/src/mndSubscribe.c
+++ b/source/dnode/mnode/impl/src/mndSubscribe.c
@@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
+
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c
index 3a10bf5295..991f1099a6 100644
--- a/source/dnode/mnode/impl/src/mndTopic.c
+++ b/source/dnode/mnode/impl/src/mndTopic.c
@@ -31,6 +31,9 @@
#define MND_TOPIC_VER_NUMBER 2
#define MND_TOPIC_RESERVE_SIZE 64
+SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
+SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
+
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
@@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if (pTopic->physicalPlan) {
physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
}
+
int32_t schemaLen = 0;
if (pTopic->schema.nCols) {
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
@@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
- if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
+ if (pRaw == NULL) {
+ goto TOPIC_ENCODE_OVER;
+ }
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
@@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
+
if (pTopic->astLen) {
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
}
@@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
}
+
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
if (pTopic->ntbUid != 0) {
int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
@@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
}
}
+
SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
@@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
taosArrayPush(pTopic->ntbColIds, &colId);
}
+
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
-
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
-
terrno = TSDB_CODE_SUCCESS;
TOPIC_DECODE_OVER:
@@ -266,12 +274,12 @@ TOPIC_DECODE_OVER:
}
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
- mTrace("topic:%s, perform insert action", pTopic->name);
+ mTrace("topic:%s perform insert action", pTopic->name);
return 0;
}
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
- mTrace("topic:%s, perform delete action", pTopic->name);
+ mTrace("topic:%s perform delete action", pTopic->name);
taosMemoryFreeClear(pTopic->sql);
taosMemoryFreeClear(pTopic->ast);
taosMemoryFreeClear(pTopic->physicalPlan);
@@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
}
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
- mTrace("topic:%s, perform update action", pOldTopic->name);
+ mTrace("topic:%s perform update action", pOldTopic->name);
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
@@ -364,7 +372,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
const char *userName) {
- mInfo("topic:%s to create", pCreate->name);
+ mInfo("start to create topic:%s", pCreate->name);
+
SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
@@ -383,19 +392,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.subType = pCreate->subType;
topicObj.withMeta = pCreate->withMeta;
- if (topicObj.withMeta) {
- if (topicObj.subType == TOPIC_SUB_TYPE__COLUMN) {
+
+ if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
+ if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
- }
- if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = taosStrdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
- qDebugL("ast %s", topicObj.ast);
+ qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
@@ -409,18 +417,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
- mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
+ mError("failed to create topic:%s since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
- int64_t ntbUid;
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
if (topicObj.ntbColIds == NULL) {
+ taosMemoryFree(topicObj.ast);
+ taosMemoryFree(topicObj.sql);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
+
extractTopicTbInfo(pAst, &topicObj);
if (topicObj.ntbUid == 0) {
@@ -449,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
return -1;
}
+
topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb);
}
@@ -467,7 +478,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFreeClear(topicObj.physicalPlan);
return -1;
}
- mInfo("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
+
+ mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
@@ -476,6 +488,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mndTransDrop(pTrans);
return -1;
}
+
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) {
@@ -544,7 +557,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.ast);
taosArrayDestroy(topicObj.ntbColIds);
- if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema);
+
+ if (topicObj.schema.nCols) {
+ taosMemoryFreeClear(topicObj.schema.pSchema);
+ }
+
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
@@ -589,11 +606,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
}
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
- if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
+ if (code == 0) {
+ code = TSDB_CODE_ACTION_IN_PROGRESS;
+ }
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
- mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
+ mError("failed to create topic:%s since %s", createTopicReq.name, terrstr());
}
mndReleaseTopic(pMnode, pTopic);
@@ -605,13 +624,18 @@ _OVER:
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
int32_t code = -1;
- if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER;
+ if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) {
+ goto _OVER;
+ }
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
- if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) {
+ goto _OVER;
+ }
+
code = 0;
_OVER:
@@ -650,7 +674,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMqConsumerObj *pConsumer;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
@@ -661,8 +687,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
- mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name,
- pConsumer->consumerId, pConsumer->cgroup);
+ mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
+ dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
}
@@ -768,6 +794,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
+ *pNumOfTopics = 0;
+
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
@@ -780,7 +808,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
if (pTopic->dbUid == pDb->uid) {
numOfTopics++;
@@ -808,11 +838,10 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
SName n;
int32_t cols = 0;
- char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
- tstrncpy(varDataVal(topicName), mndGetDbStr(pTopic->name), sizeof(topicName) - 2);
- /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
- /*tNameGetDbName(&n, varDataVal(topicName));*/
- varDataSetLen(topicName, strlen(varDataVal(topicName)));
+ char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
+ const char *pName = mndGetDbStr(pTopic->name);
+ STR_TO_VARSTR(topicName, pName);
+
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)topicName, false);
@@ -820,6 +849,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(dbName));
varDataSetLen(dbName, strlen(varDataVal(dbName)));
+
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)dbName, false);
@@ -827,8 +857,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
- tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
- varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
+ STR_TO_VARSTR(sql, pTopic->sql);
+
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
@@ -863,24 +893,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
-int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
- SSdb *pSdb = pMnode->pSdb;
-
+bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
+ SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqTopicObj *pTopic = NULL;
+
while (1) {
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
if (pTopic->dbUid == pDb->uid) {
sdbRelease(pSdb, pTopic);
- terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
- return -1;
+ return true;
}
sdbRelease(pSdb, pTopic);
}
- return 0;
+
+ return false;
}
#if 0
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index a03d640ad5..23822c8f20 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
while (1) {
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
code = -1;
- if (mndUserDupObj(pUser, &newUser) != 0) break;
+ if (mndUserDupObj(pUser, &newUser) != 0) {
+ break;
+ }
bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL);
if (inTopic) {
diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c
index c2df5205d6..c2d27ad713 100644
--- a/source/dnode/mnode/sdb/src/sdbFile.c
+++ b/source/dnode/mnode/sdb/src/sdbFile.c
@@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) {
- // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
+ // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0);
if (pSdb->sync == 0) {
code = 0;
} else {
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index d1e4f8e609..284d2129cf 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -114,16 +114,18 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
}
void tqClose(STQ* pTq) {
- if (pTq) {
- tqOffsetClose(pTq->pOffsetStore);
- taosHashCleanup(pTq->pHandle);
- taosHashCleanup(pTq->pPushMgr);
- taosHashCleanup(pTq->pCheckInfo);
- taosMemoryFree(pTq->path);
- tqMetaClose(pTq);
- streamMetaClose(pTq->pStreamMeta);
- taosMemoryFree(pTq);
+ if (pTq == NULL) {
+ return;
}
+
+ tqOffsetClose(pTq->pOffsetStore);
+ taosHashCleanup(pTq->pHandle);
+ taosHashCleanup(pTq->pPushMgr);
+ taosHashCleanup(pTq->pCheckInfo);
+ taosMemoryFree(pTq->path);
+ tqMetaClose(pTq);
+ streamMetaClose(pTq->pStreamMeta);
+ taosMemoryFree(pTq);
}
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
@@ -158,7 +160,7 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
};
tmsgSendRsp(&resp);
- tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
+ tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
return 0;
@@ -213,9 +215,9 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
char buf1[80] = {0};
char buf2[80] = {0};
- tFormatOffset(buf1, 80, &pRsp->reqOffset);
- tFormatOffset(buf2, 80, &pRsp->rspOffset);
- tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
+ tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
+ tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
+ tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0;
@@ -273,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
- tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
+ tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0;
@@ -334,8 +336,8 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
- tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
- ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
+ tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64
+ " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0;
@@ -346,13 +348,15 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
pLeft->val.version <= pRight->val.version;
}
-int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0};
- SDecoder decoder;
- tDecoderInit(&decoder, msg, msgLen);
+
+ SDecoder decoder;
+ tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
return -1;
}
+
tDecoderClear(&decoder);
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
@@ -361,44 +365,45 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
} else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version);
- if (offset.val.version + 1 == version) {
+ if (offset.val.version + 1 == sversion) {
offset.val.version += 1;
}
- /*} else {*/
- /*A(0);*/
- }
- STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
- if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
- return 0;
+ } else {
+ tqError("invalid commit offset type:%d", offset.val.type);
+ return -1;
}
+ STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
+ if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
+ return 0; // no need to update the offset value
+ }
+
+ // save the new offset value
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
return -1;
}
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
- if (pHandle) {
- if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
- return -1;
- }
+ if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
+ return -1;
}
}
- // rsp
-
- /*}*/
- /*}*/
-
return 0;
}
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL;
+
while (1) {
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
+
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
+
if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
@@ -410,6 +415,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
}
}
}
+
return 0;
}
@@ -454,6 +460,7 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
return -1;
}
+
return 0;
}
@@ -472,18 +479,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;
- // 1.find handle
+ // 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
- tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
- TD_VID(pTq->pVnode), req.subKey);
+ tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
+ req.subKey);
return -1;
}
- // check rebalance
+ // 2. check rebalance
if (pHandle->consumerId != consumerId) {
- tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
- ", in vgId:%d, subkey %s, handle consumer id %" PRId64,
+ tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
return -1;
@@ -497,7 +503,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
- tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
+ tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed
@@ -509,7 +515,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffsetNew = pOffset->val;
char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
- tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
+ tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
@@ -533,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
- tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
+ tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1;
@@ -551,7 +557,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
- tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
+ tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
@@ -582,8 +588,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pPushEntry->dataRsp.head.epoch = reqEpoch;
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
- tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
- TD_VID(pTq->pVnode));
+ tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId,
+ pHandle->subKey, TD_VID(pTq->pVnode));
// unlock
taosWUnLockLatch(&pTq->pushLock);
return 0;
@@ -596,7 +602,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = -1;
}
- tqDebug("tmq poll: consumer %" PRId64
+ tqDebug("tmq poll: consumer:0x%" PRIx64
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
@@ -622,8 +628,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
code = -1;
}
- tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
- ",version:%" PRId64 "",
+ tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
+ ",version:%" PRId64,
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
taosMemoryFree(metaRsp.metaRsp);
@@ -641,8 +647,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffsetNew = taosxRsp.rspOffset;
}
- tqDebug("taosx poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
- ",version:%" PRId64 "",
+ tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
+ ",version:%" PRId64,
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
}
@@ -660,7 +666,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
- tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
+ tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break;
@@ -678,7 +684,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCont* pHead = &pCkHead->head;
- tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
+ tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
@@ -725,12 +731,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
}
+
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return 0;
}
-int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
@@ -765,10 +772,10 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
return 0;
}
-int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0};
SDecoder decoder;
- tDecoderInit(&decoder, msg, msgLen);
+ tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@@ -785,7 +792,7 @@ int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
return 0;
}
-int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
@@ -797,7 +804,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
return 0;
}
-int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
// todo lock
@@ -807,7 +814,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
- tqError("vgId:%d, build new consumer handle %s for consumer %" PRId64 ", but old consumerId is %" PRId64 "",
+ tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
}
if (req.newConsumerId == -1) {
@@ -882,7 +889,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
- tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
+ tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
}
@@ -996,7 +1003,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
- tDecoderInit(&decoder, msgBody, msgLen);
+ tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
@@ -1018,7 +1025,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
+ tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SEncoder encoder;
@@ -1049,7 +1056,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
-int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code;
SStreamTaskCheckRsp rsp;
@@ -1062,7 +1069,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
}
tDecoderClear(&decoder);
- tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
+ tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
@@ -1070,23 +1077,27 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
return -1;
}
- code = streamProcessTaskCheckRsp(pTask, &rsp, version);
+ code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code;
}
-int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code;
#if 0
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
if (code < 0) return code;
#endif
+ if (tsDisableStream) {
+ return 0;
+ }
// 1.deserialize msg and build task
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
+
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTask(&decoder, pTask);
@@ -1098,14 +1109,14 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
tDecoderClear(&decoder);
// 2.save task
- code = streamMetaAddTask(pTq->pStreamMeta, version, pTask);
+ code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) {
return -1;
}
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
- streamTaskCheckDownstream(pTask, version);
+ streamTaskCheckDownstream(pTask, sversion);
}
return 0;
@@ -1174,7 +1185,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
-int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
@@ -1183,7 +1194,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
}
// do recovery step 2
- code = streamSourceRecoverScanStep2(pTask, version);
+ code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
@@ -1231,7 +1242,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamRecoverFinishReq req;
SDecoder decoder;
- tDecoderInit(&decoder, msg, msgLen);
+ tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder);
@@ -1385,7 +1396,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
+
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
@@ -1469,7 +1483,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
}
}
-int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
+int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
return 0;
@@ -1481,7 +1495,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
- tDecoderInit(&decoder, msgBody, msgLen);
+ tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId;
@@ -1514,7 +1528,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
SStreamDispatchReq req;
SDecoder decoder;
- tDecoderInit(&decoder, msgBody, msgLen);
+ tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
@@ -1577,4 +1591,4 @@ FAIL:
return -1;
}
-int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; }
+int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c
index a2fafdf7c3..2c8f20d8cd 100644
--- a/source/dnode/vnode/src/tq/tqRead.c
+++ b/source/dnode/vnode/src/tq/tqRead.c
@@ -355,11 +355,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
- if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
- while (true) {
- if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
- if (pReader->pBlock == NULL) break;
- }
+// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
+// while (true) {
+// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
+// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
+// pReader->msgIter.len, pReader->msgIter.uid);
+// if (pReader->pBlock == NULL) break;
+// }
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver;
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 151860d593..347ac369d8 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -110,7 +110,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
-
+ qDebug("stream set total blocks:%d, task id:%s" PRIx64, (int32_t)numOfBlocks, id);
ASSERT(pInfo->validBlockIndex == 0);
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
@@ -1064,18 +1064,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
-#if 0
- if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
- pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
- qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
- pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
- ASSERT(0);
- }
-#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1;
}
- ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index fe02a3c5fd..89e0dd363c 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -1197,13 +1197,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
- releaseBufPage(pBuf, page);
-
- if (pBlock->info.rows <= 0 || pRow->numOfRows > pBlock->info.capacity) {
- qError("error in copy data to ssdatablock, existed rows in block:%d, rows in pRow:%d, capacity:%d, %s",
- pBlock->info.rows, pRow->numOfRows, pBlock->info.capacity, GET_TASKID(pTaskInfo));
- T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
+ // expand the result datablock capacity
+ if (pRow->numOfRows > pBlock->info.capacity) {
+ blockDataEnsureCapacity(pBlock, pRow->numOfRows);
+ qDebug("datablock capacity not sufficient, expand to requried:%d, current capacity:%d, %s", pRow->numOfRows,
+ pBlock->info.capacity, GET_TASKID(pTaskInfo));
+ // todo set the pOperator->resultInfo size
} else {
+ releaseBufPage(pBuf, page);
break;
}
}
diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c
index 9b5664492d..adb045a69f 100644
--- a/source/libs/executor/src/filloperator.c
+++ b/source/libs/executor/src/filloperator.c
@@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
SWinKey key = {.groupId = groupId, .ts = pRow->key};
int32_t code = streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
+ qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
ASSERT(code == TSDB_CODE_SUCCESS);
}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index c5405664c6..8f5c3786e0 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -1117,11 +1117,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
- pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
+ projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
- if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
- longjmp(pTaskInfo->env, pTaskInfo->code);
- }
}
taosHashClear(pInfo->pPartitions);
doStreamHashPartitionImpl(pInfo, pBlock);
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 795f04dfd4..a238b84993 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1623,7 +1623,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
}
- ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
} else {
return NULL;
}
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 09b7679f45..20d4f46eaf 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -1502,16 +1502,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS) {
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
- qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
+ qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey,
tw.ekey, tmpKey.groupId, mark);
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
- qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
+ qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
- qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
+ qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
streamStateFreeCur(pCur);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 589cab1a2f..8c5b41a7ee 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -5092,14 +5092,24 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
}
- if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType &&
- pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
- return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
+ if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
+ if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+ }
+
+ if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
+ }
}
- if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType &&
- tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
- return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
+ if (TSDB_ALTER_TABLE_UPDATE_TAG_BYTES == pStmt->alterType) {
+ if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+ }
+
+ if (tagsLen + calcTypeBytes(pStmt->dataType) - pSchema->bytes > TSDB_MAX_TAGS_LEN) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAGS_LENGTH, TSDB_MAX_TAGS_LEN);
+ }
}
}
diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c
index bb71fd7652..144a9536b9 100644
--- a/source/libs/scalar/src/filter.c
+++ b/source/libs/scalar/src/filter.c
@@ -15,7 +15,7 @@
#include
#include "os.h"
#include "thash.h"
-//#include "queryLog.h"
+// #include "queryLog.h"
#include "filter.h"
#include "filterInt.h"
#include "functionMgt.h"
@@ -123,36 +123,16 @@ int8_t filterGetRangeCompFuncFromOptrs(uint8_t optr, uint8_t optr2) {
return -1;
}
-__compar_fn_t gDataCompare[] = {compareInt32Val,
- compareInt8Val,
- compareInt16Val,
- compareInt64Val,
- compareFloatVal,
- compareDoubleVal,
- compareLenPrefixedStr,
- comparestrPatternMatch,
- compareChkInString,
- comparewcsPatternMatch,
- compareLenPrefixedWStr,
- compareUint8Val,
- compareUint16Val,
- compareUint32Val,
- compareUint64Val,
- setChkInBytes1,
- setChkInBytes2,
- setChkInBytes4,
- setChkInBytes8,
- comparestrRegexMatch,
- comparestrRegexNMatch,
- setChkNotInBytes1,
- setChkNotInBytes2,
- setChkNotInBytes4,
- setChkNotInBytes8,
- compareChkNotInString,
- comparestrPatternNMatch,
- comparewcsPatternNMatch,
- comparewcsRegexMatch,
- comparewcsRegexNMatch,};
+__compar_fn_t gDataCompare[] = {
+ compareInt32Val, compareInt8Val, compareInt16Val, compareInt64Val,
+ compareFloatVal, compareDoubleVal, compareLenPrefixedStr, comparestrPatternMatch,
+ compareChkInString, comparewcsPatternMatch, compareLenPrefixedWStr, compareUint8Val,
+ compareUint16Val, compareUint32Val, compareUint64Val, setChkInBytes1,
+ setChkInBytes2, setChkInBytes4, setChkInBytes8, comparestrRegexMatch,
+ comparestrRegexNMatch, setChkNotInBytes1, setChkNotInBytes2, setChkNotInBytes4,
+ setChkNotInBytes8, compareChkNotInString, comparestrPatternNMatch, comparewcsPatternNMatch,
+ comparewcsRegexMatch, comparewcsRegexNMatch,
+};
__compar_fn_t gInt8SignCompare[] = {compareInt8Val, compareInt8Int16, compareInt8Int32,
compareInt8Int64, compareInt8Float, compareInt8Double};
@@ -341,7 +321,7 @@ __compar_fn_t filterGetCompFuncEx(int32_t lType, int32_t rType, int32_t optr) {
if (TSDB_DATA_TYPE_NULL == rType || TSDB_DATA_TYPE_JSON == rType) {
return NULL;
}
-
+
switch (lType) {
case TSDB_DATA_TYPE_TINYINT: {
if (IS_SIGNED_NUMERIC_TYPE(rType) || IS_FLOAT_TYPE(rType)) {
@@ -519,7 +499,7 @@ int32_t filterReuseRangeCtx(SFilterRangeCtx *ctx, int32_t type, int32_t options)
int32_t filterConvertRange(SFilterRangeCtx *cur, SFilterRange *ra, bool *notNull) {
int64_t tmp = 0;
-
+
if (!FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) {
int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type, &tmp));
if (sr == 0) {
@@ -704,7 +684,7 @@ int32_t filterAddRangeImpl(void *h, SFilterRange *ra, int32_t optr) {
int32_t filterAddRange(void *h, SFilterRange *ra, int32_t optr) {
SFilterRangeCtx *ctx = (SFilterRangeCtx *)h;
- int64_t tmp = 0;
+ int64_t tmp = 0;
if (FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) {
SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type, &tmp));
@@ -991,7 +971,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
bool freeIfExists, int16_t *srcFlag) {
int32_t idx = -1;
uint32_t *num;
- bool sameBuf = false;
+ bool sameBuf = false;
num = &info->fields[type].num;
@@ -1251,13 +1231,13 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u
SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u);
if (u->right.type == FLD_TYPE_VALUE) {
- void *data = FILTER_UNIT_VAL_DATA(src, u);
+ void *data = FILTER_UNIT_VAL_DATA(src, u);
SFilterField *rField = FILTER_UNIT_RIGHT_FIELD(src, u);
if (IS_VAR_DATA_TYPE(type)) {
if (FILTER_UNIT_OPTR(u) == OP_TYPE_IN) {
- filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES,
- false, &rField->flag); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
+ filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES, false,
+ &rField->flag); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
t = FILTER_GET_FIELD(dst, right);
FILTER_SET_FLAG(t->flag, FLD_DATA_IS_HASH);
@@ -3769,31 +3749,31 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
return DEAL_RES_CONTINUE;
}
-/*
- if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
- return DEAL_RES_CONTINUE;
- }
+ /*
+ if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
+ return DEAL_RES_CONTINUE;
+ }
- if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR != valueNode->node.resType.type) {
- return DEAL_RES_CONTINUE;
- }
+ if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR !=
+ valueNode->node.resType.type) { return DEAL_RES_CONTINUE;
+ }
- if (stat->precision < 0) {
- int32_t code = fltAddValueNodeToConverList(stat, valueNode);
- if (code) {
- stat->code = code;
- return DEAL_RES_ERROR;
- }
+ if (stat->precision < 0) {
+ int32_t code = fltAddValueNodeToConverList(stat, valueNode);
+ if (code) {
+ stat->code = code;
+ return DEAL_RES_ERROR;
+ }
- return DEAL_RES_CONTINUE;
- }
+ return DEAL_RES_CONTINUE;
+ }
- int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
- if (code) {
- stat->code = code;
- return DEAL_RES_ERROR;
- }
-*/
+ int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
+ if (code) {
+ stat->code = code;
+ return DEAL_RES_ERROR;
+ }
+ */
return DEAL_RES_CONTINUE;
}
@@ -3939,7 +3919,7 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
}
- int32_t type = vectorGetConvertType(refNode->node.resType.type, listNode->node.resType.type);
+ int32_t type = vectorGetConvertType(refNode->node.resType.type, listNode->node.resType.type);
if (0 != type && type != refNode->node.resType.type) {
stat->scalarMode = true;
return DEAL_RES_CONTINUE;
@@ -3963,14 +3943,14 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode **pNode, SFltTreeStat *pStat) {
FLT_ERR_JRET(pStat->code);
-/*
- int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
- for (int32_t i = 0; i < nodeNum; ++i) {
- SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
+ /*
+ int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
+ for (int32_t i = 0; i < nodeNum; ++i) {
+ SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
- FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
- }
-*/
+ FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
+ }
+ */
_return:
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index a439471143..b07c05dcfe 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -270,88 +270,40 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
return -1;
}
+ SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
+ SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
+ bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
+
+ if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
+ sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
+ syncNodeRelease(pSyncNode);
+ return 0;
+ }
+
int32_t code = 0;
+ int64_t logRetention = 0;
if (syncNodeIsMnode(pSyncNode)) {
// mnode
- int64_t logRetention = SYNC_MNODE_LOG_RETENTION;
-
- SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
- SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
- int64_t logNum = endIndex - beginIndex;
- bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
-
- if (isEmpty || (!isEmpty && logNum < logRetention)) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", log-num:%" PRId64 ", empty:%d, do not delete wal",
- lastApplyIndex, logNum, isEmpty);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- goto _DEL_WAL;
-
+ logRetention = SYNC_MNODE_LOG_RETENTION;
} else {
- SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
- SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
- bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
-
- if (isEmpty || !(lastApplyIndex >= beginIndex && lastApplyIndex <= endIndex)) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 ", empty:%d, do not delete wal", lastApplyIndex, isEmpty);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
// vnode
if (pSyncNode->replicaNum > 1) {
// multi replicas
-
- lastApplyIndex = TMAX(lastApplyIndex - SYNC_VNODE_LOG_RETENTION, beginIndex - 1);
-
- if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
- pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
-
- for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
- int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
- if (lastApplyIndex > matchIndex) {
- sNTrace(pSyncNode,
- "new-snapshot-index:%" PRId64 " is greater than match-index:%" PRId64
- " of dnode:%d, do not delete wal",
- lastApplyIndex, matchIndex, DID(&pSyncNode->peersId[i]));
-
- syncNodeRelease(pSyncNode);
- return 0;
- }
- }
-
- } else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
- if (lastApplyIndex > pSyncNode->minMatchIndex) {
- sNTrace(pSyncNode,
- "new-snapshot-index:%" PRId64 " is greater than min-match-index:%" PRId64 ", do not delete wal",
- lastApplyIndex, pSyncNode->minMatchIndex);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- } else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
- syncNodeRelease(pSyncNode);
- return 0;
-
- } else {
- sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " unknown state, do not delete wal", lastApplyIndex);
- syncNodeRelease(pSyncNode);
- return 0;
- }
-
- goto _DEL_WAL;
-
- } else {
- // one replica
-
- goto _DEL_WAL;
+ logRetention = SYNC_VNODE_LOG_RETENTION;
}
}
+ if (pSyncNode->replicaNum > 1) {
+ if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
+ sNTrace(pSyncNode, "new-snapshot-index:%" PRId64 " candidate or unknown state, do not delete wal",
+ lastApplyIndex);
+ syncNodeRelease(pSyncNode);
+ return 0;
+ }
+ logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex);
+ }
+
_DEL_WAL:
do {
@@ -366,7 +318,7 @@ _DEL_WAL:
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
pSyncNode->snapshottingTime = taosGetTimestampMs();
- code = walBeginSnapshot(pData->pWal, lastApplyIndex);
+ code = walBeginSnapshot(pData->pWal, lastApplyIndex, logRetention);
if (code == 0) {
sNTrace(pSyncNode, "wal snapshot begin, index:%" PRId64 ", last apply index:%" PRId64,
pSyncNode->snapshottingIndex, lastApplyIndex);
@@ -2141,24 +2093,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if (timerLogicClock == msgLogicClock) {
if (tsNow > pData->execTime) {
-#if 0
- sTrace(
- "vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
- "---------",
- pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
-#endif
-
pData->execTime += pSyncTimer->timerMS;
SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
+ pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
+
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
pSyncMsg->srcId = pSyncNode->myRaftId;
pSyncMsg->destId = pData->destId;
pSyncMsg->term = raftStoreGetTerm(pSyncNode);
pSyncMsg->commitIndex = pSyncNode->commitIndex;
- pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
+ pSyncMsg->minMatchIndex = pSyncNode->minMatchIndex;
pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = tsNow;
@@ -2170,11 +2117,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
} else {
-#if 0
- sTrace(
- "vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
- pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
-#endif
}
if (syncIsInit()) {
diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c
index 4f6e3cf8ed..6df2b40000 100644
--- a/source/libs/tdb/src/db/tdbBtree.c
+++ b/source/libs/tdb/src/db/tdbBtree.c
@@ -260,7 +260,7 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
}
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
- SBTC btc;
+ SBTC btc = {0};
int c;
int ret;
@@ -272,10 +272,17 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) {
tdbError("tdb/btree-upsert: btc move to failed with ret: %d.", ret);
+ if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
+ tdbFree(btc.coder.pKey);
+ }
tdbBtcClose(&btc);
return -1;
}
+ if (TDB_CELLDECODER_FREE_KEY(&btc.coder)) {
+ tdbFree(btc.coder.pKey);
+ }
+
if (btc.idx == -1) {
btc.idx = 0;
c = 1;
@@ -1442,15 +1449,19 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
// Clear the state of decoder
if (TDB_CELLDECODER_FREE_VAL(pDecoder)) {
tdbFree(pDecoder->pVal);
+ TDB_CELLDECODER_CLZ_FREE_VAL(pDecoder);
+ // tdbTrace("tdb btc decoder val set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
+ }
+ if (TDB_CELLDECODER_FREE_KEY(pDecoder)) {
+ tdbFree(pDecoder->pKey);
+ TDB_CELLDECODER_CLZ_FREE_KEY(pDecoder);
+ // tdbTrace("tdb btc decoder key set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
}
pDecoder->kLen = -1;
pDecoder->pKey = NULL;
pDecoder->vLen = -1;
pDecoder->pVal = NULL;
pDecoder->pgno = 0;
- TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
-
- // tdbTrace("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
// 1. Decode header part
if (!leaf) {
@@ -2270,10 +2281,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} else {
lidx = lidx + 1;
}
- if (TDB_CELLDECODER_FREE_KEY(&pBtc->coder)) {
- tdbFree((void*)pTKey);
- }
-
// compare last cell
if (lidx <= ridx) {
pBtc->idx = ridx;
@@ -2284,9 +2291,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} else {
ridx = ridx - 1;
}
- if (TDB_CELLDECODER_FREE_KEY(&pBtc->coder)) {
- tdbFree((void*)pTKey);
- }
}
// binary search
@@ -2297,9 +2301,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
pBtc->idx = (lidx + ridx) >> 1;
tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
- if (TDB_CELLDECODER_FREE_KEY(&pBtc->coder)) {
- tdbFree((void*)pTKey);
- }
if (c < 0) {
// pKey < cd.pKey
ridx = pBtc->idx - 1;
diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h
index 62466e9c47..7a0bcc00a4 100644
--- a/source/libs/tdb/src/inc/tdbInt.h
+++ b/source/libs/tdb/src/inc/tdbInt.h
@@ -122,6 +122,8 @@ typedef struct SBtInfo {
#define TDB_CELLD_F_VAL 0x2
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
+#define TDB_CELLDECODER_CLZ_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_KEY)
+#define TDB_CELLDECODER_CLZ_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_VAL)
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
index a41cc0068c..5ff67c87ca 100644
--- a/source/libs/transport/inc/transComm.h
+++ b/source/libs/transport/inc/transComm.h
@@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "taoserror.h"
#include "theap.h"
+#include "tmisce.h"
#include "transLog.h"
#include "transportInt.h"
#include "trpc.h"
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index e88308c432..e008320222 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -11,7 +11,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-
#include "transComm.h"
typedef struct SConnList {
@@ -224,9 +223,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0);
// snprintf may cause performance problem
-#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
- do { \
- snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
+#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
+ do { \
+ char* p = key; \
+ int32_t len = strlen(ip); \
+ if (p != NULL) memcpy(p, ip, len); \
+ p[len] = ':'; \
+ titoa(port, 10, &p[len + 1]); \
} while (0)
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
@@ -664,7 +667,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
- tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
+ tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
@@ -677,7 +680,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
- tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
+ tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
cliHandleExcept(conn);
break;
@@ -1949,11 +1952,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
- STraceId* trace = &pMsg->msg.info.traceId;
- char tbuf[256] = {0};
- EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
- tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
- pCtx->retryStep, pCtx->retryNextInterval);
+ if (rpcDebugFlag & DEBUG_DEBUG) {
+ STraceId* trace = &pMsg->msg.info.traceId;
+ char tbuf[256] = {0};
+ EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
+ tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
+ pCtx->retryStep, pCtx->retryNextInterval);
+ }
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
@@ -1990,7 +1995,7 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
pResp->pCont = buf;
pResp->contLen = len;
- *dst = epset;
+ epsetAssign(dst, &epset);
return true;
}
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
@@ -2015,7 +2020,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
- pCtx->epSet = epSet;
+ epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
@@ -2040,7 +2045,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
- pCtx->epSet = epSet;
+ epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
@@ -2130,10 +2135,6 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
pCtx->retryNextInterval = pCtx->retryMaxInterval;
}
-
- // if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
- // return false;
- // }
} else {
pCtx->retryNextInterval = 0;
pCtx->epsetRetryCnt++;
@@ -2181,9 +2182,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {
- char tbuf[256] = {0};
- EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
- tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
+ if (rpcDebugFlag & DEBUG_TRACE) {
+ char tbuf[256] = {0};
+ EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
+ tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
+ }
}
if (pCtx->pSem != NULL) {
@@ -2310,8 +2313,9 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
- pCtx->epSet = *pEpSet;
- pCtx->origEpSet = *pEpSet;
+ epsetAssign(&pCtx->epSet, pEpSet);
+ epsetAssign(&pCtx->origEpSet, pEpSet);
+
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
@@ -2356,8 +2360,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
- pCtx->epSet = *pEpSet;
- pCtx->origEpSet = *pEpSet;
+ epsetAssign(&pCtx->epSet, pEpSet);
+ epsetAssign(&pCtx->origEpSet, pEpSet);
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->pSem = sem;
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index e31616f53e..7bfd3df9e0 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -96,7 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (walSkipFetchBodyNew(pReader) < 0) {
return -1;
}
- fetchVer++;
+ fetchVer = pReader->curVersion;
}
}
pReader->curStopped = 1;
@@ -142,7 +142,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
}
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
- char fnameStr[WAL_FILE_LEN];
+ char fnameStr[WAL_FILE_LEN] = {0};
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
@@ -299,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
return -1;
}
- if (pReadHead->version != ver) {
- wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
- pReader->pHead->head.version, ver);
- pReader->curInvalid = 1;
- terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
- return -1;
- }
-
if (walValidBodyCksum(pReader->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
pReader->curInvalid = 1;
diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c
index d3c03c335b..4d451db0c0 100644
--- a/source/libs/wal/src/walRef.c
+++ b/source/libs/wal/src/walRef.c
@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
}
pRef->refId = tGenIdPI64();
pRef->refVer = -1;
- pRef->refFile = -1;
+ // pRef->refFile = -1;
pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef;
@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver;
// bsearch in fileSet
- SWalFileInfo tmpInfo;
- tmpInfo.firstVer = ver;
- SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- /*A(pRet != NULL);*/
- pRef->refFile = pRet->firstVer;
+ // SWalFileInfo tmpInfo;
+ // tmpInfo.firstVer = ver;
+ // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+ // ASSERT(pRet != NULL);
+ // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
}
@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
#if 1
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
- pRef->refFile = -1;
+ // pRef->refFile = -1;
}
#endif
@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
}
}
taosThreadMutexLock(&pWal->mutex);
-
int64_t ver = walGetFirstVer(pWal);
-
- wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
-
pRef->refVer = ver;
// bsearch in fileSet
- SWalFileInfo tmpInfo;
- tmpInfo.firstVer = ver;
- SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- ASSERT(pRet != NULL);
- pRef->refFile = pRet->firstVer;
+ // SWalFileInfo tmpInfo;
+ // tmpInfo.firstVer = ver;
+ // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+ // ASSERT(pRet != NULL);
+ // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
+ wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
+
return pRef;
}
@@ -119,8 +117,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- /*A(pRet != NULL);*/
- pRef->refFile = pRet->firstVer;
+ ASSERT(pRet != NULL);
+ // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 701d8da8c0..a992c951fa 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
return 0;
}
-int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
+int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) {
taosThreadMutexLock(&pWal->mutex);
-
+ ASSERT(logRetention >= 0);
pWal->vers.verInSnapshotting = ver;
- wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
- pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
+ pWal->vers.logRetention = logRetention;
+
+ wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
+ ", last ver %" PRId64,
+ pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling
- if (pWal->cfg.retentionPeriod == 0) {
- if (walGetLastFileSize(pWal) != 0) {
- if (walRollImpl(pWal) < 0) {
- wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
- goto _err;
- }
+ if (walGetLastFileSize(pWal) != 0) {
+ if (walRollImpl(pWal) < 0) {
+ wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
+ goto _err;
}
}
+
taosThreadMutexUnlock(&pWal->mutex);
return 0;
@@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting;
- wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
- ver, pWal->vers.firstVer, pWal->vers.lastVer);
+ wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", log retention %" PRId64 " first ver %" PRId64
+ ", last ver %" PRId64,
+ pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer);
if (ver == -1) {
code = -1;
@@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
+ ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index 891e7dcdae..0784db917a 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
if (i == 5) {
- walBeginSnapshot(pWal, i);
+ walBeginSnapshot(pWal, i, 0);
walEndSnapshot(pWal);
}
}
@@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- walBeginSnapshot(pWal, i - 1);
+ walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1);
walEndSnapshot(pWal);
ASSERT_EQ(pWal->vers.snapshotVer, i - 1);
@@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.commitVer, i);
}
- code = walBeginSnapshot(pWal, i - 1);
+ code = walBeginSnapshot(pWal, i - 1, 0);
ASSERT_EQ(code, 0);
code = walEndSnapshot(pWal);
ASSERT_EQ(code, 0);
diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c
index cc018bf842..c1ef57e9c5 100644
--- a/source/os/src/osSemaphore.c
+++ b/source/os/src/osSemaphore.c
@@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) {
int tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
- dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
+ dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
+ dispatch_semaphore_wait(*psem, time);
return 0;
}
diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c
index 264bf53800..7467fa2948 100644
--- a/source/util/src/tarray.c
+++ b/source/util/src/tarray.c
@@ -139,7 +139,8 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
taosArraySet(pArray, pos + 1, p2);
- pos += 1;
+ memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
+ pos += 1;
} else {
pos += 1;
}
@@ -171,13 +172,14 @@ void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp
// do nothing
} else {
if (pos + 1 != i) {
- void* p = taosArrayGet(pArray, pos + 1);
+ void* p = taosArrayGetP(pArray, pos + 1);
if (fp != NULL) {
fp(p);
}
taosArraySet(pArray, pos + 1, p2);
- pos += 1;
+ memset(TARRAY_GET_ELEM(pArray, i), 0, pArray->elemSize);
+ pos += 1;
} else {
pos += 1;
}
diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c
index 75d6a43b24..f8f78ae6a5 100644
--- a/source/util/src/tcompare.c
+++ b/source/util/src/tcompare.c
@@ -1235,7 +1235,7 @@ int32_t taosArrayCompareString(const void *a, const void *b) {
const char *x = *(const char **)a;
const char *y = *(const char **)b;
- return compareLenPrefixedStr(x, y);
+ return strcmp(x, y);
}
int32_t comparestrPatternMatch(const void *pLeft, const void *pRight) {
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index 1624aec5af..4bb082800d 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -523,7 +523,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp")
-TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column length")
+TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column/tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
diff --git a/tests/script/tsim/catalog/alterInCurrent.sim b/tests/script/tsim/catalog/alterInCurrent.sim
index 3cb337bbe1..521858c368 100644
--- a/tests/script/tsim/catalog/alterInCurrent.sim
+++ b/tests/script/tsim/catalog/alterInCurrent.sim
@@ -67,4 +67,19 @@ sql insert into t1 values (1591060628000, 1);
sql alter table st1 drop tag t2;
sql create table t2 using st1 tags(2);
+print ======== drop tag in super table
+sql create database if not exists aaa;
+sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'aaa';
+if $rows != 0 then
+ return -1
+endi
+sql drop database if exists foo;
+sql create database if not exists foo;
+sql create table foo.t(ts timestamp,name varchar(20));
+sql create table foo.xt(ts timestamp,name varchar(20));
+sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'foo';
+if $rows != 2 then
+ return -1
+endi
+
system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py
index 593c91a470..c3ec4875ce 100644
--- a/tests/system-test/7-tmq/tmq_taosx.py
+++ b/tests/system-test/7-tmq/tmq_taosx.py
@@ -200,6 +200,7 @@ class TDTestCase:
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
+ time.sleep(10)
tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'")
uid1 = tdSql.getData(0, 5)
uid2 = tdSql.getData(1, 5)