Merge pull request #14739 from taosdata/feature/stream
fix(wal): read after rolling
This commit is contained in:
commit
f9ba552496
|
@ -55,7 +55,7 @@ enum {
|
||||||
enum {
|
enum {
|
||||||
STREAM_INPUT__DATA_SUBMIT = 1,
|
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||||
STREAM_INPUT__DATA_BLOCK,
|
STREAM_INPUT__DATA_BLOCK,
|
||||||
STREAM_INPUT__TABLE_SCAN,
|
// STREAM_INPUT__TABLE_SCAN,
|
||||||
STREAM_INPUT__TQ_SCAN,
|
STREAM_INPUT__TQ_SCAN,
|
||||||
STREAM_INPUT__DATA_RETRIEVE,
|
STREAM_INPUT__DATA_RETRIEVE,
|
||||||
STREAM_INPUT__TRIGGER,
|
STREAM_INPUT__TRIGGER,
|
||||||
|
|
|
@ -124,6 +124,7 @@ typedef struct SWal {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t scanUncommited;
|
int8_t scanUncommited;
|
||||||
int8_t scanMeta;
|
int8_t scanMeta;
|
||||||
|
int8_t enableRef;
|
||||||
} SWalFilterCond;
|
} SWalFilterCond;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -901,6 +901,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||||
if (pTmq == NULL) {
|
if (pTmq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -917,6 +919,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->delayedTask = taosOpenQueue();
|
pTmq->delayedTask = taosOpenQueue();
|
||||||
|
|
||||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -943,12 +947,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
|
||||||
// init semaphore
|
// init semaphore
|
||||||
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
|
||||||
|
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init connection
|
// init connection
|
||||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
||||||
if (pTmq->pTscObj == NULL) {
|
if (pTmq->pTscObj == NULL) {
|
||||||
|
tscError("consumer %ld setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||||
tsem_destroy(&pTmq->rspSem);
|
tsem_destroy(&pTmq->rspSem);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ int32_t tsNumOfMnodeQueryThreads = 2;
|
||||||
int32_t tsNumOfMnodeFetchThreads = 1;
|
int32_t tsNumOfMnodeFetchThreads = 1;
|
||||||
int32_t tsNumOfMnodeReadThreads = 1;
|
int32_t tsNumOfMnodeReadThreads = 1;
|
||||||
int32_t tsNumOfVnodeQueryThreads = 2;
|
int32_t tsNumOfVnodeQueryThreads = 2;
|
||||||
int32_t tsNumOfVnodeFetchThreads = 1;
|
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||||
int32_t tsNumOfVnodeMergeThreads = 2;
|
int32_t tsNumOfVnodeMergeThreads = 2;
|
||||||
|
@ -190,7 +190,6 @@ int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
int32_t tsTtlPushInterval = 60;
|
int32_t tsTtlPushInterval = 60;
|
||||||
|
|
||||||
|
|
||||||
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
||||||
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
||||||
tsDiskCfg[index].level = level;
|
tsDiskCfg[index].level = level;
|
||||||
|
@ -468,7 +467,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
|
||||||
|
@ -632,7 +631,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||||
int32_t len = strlen(name);
|
int32_t len = strlen(name);
|
||||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||||
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||||
|
@ -662,7 +661,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
|
||||||
} else if (strcasecmp("countAlwaysReturnValue", name) == 0) {
|
} else if (strcasecmp("countAlwaysReturnValue", name) == 0) {
|
||||||
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
|
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
|
||||||
} else if (strcasecmp("cDebugFlag", name) == 0) {
|
} else if (strcasecmp("cDebugFlag", name) == 0) {
|
||||||
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
|
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -687,10 +686,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
@ -700,10 +699,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
@ -774,7 +773,7 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
} else if (strcasecmp("minSlidingTime", name) == 0) {
|
} else if (strcasecmp("minSlidingTime", name) == 0) {
|
||||||
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
||||||
} else if (strcasecmp("minIntervalTime", name) == 0) {
|
} else if (strcasecmp("minIntervalTime", name) == 0) {
|
||||||
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
||||||
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
|
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
|
||||||
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
|
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
|
||||||
}
|
}
|
||||||
|
@ -920,10 +919,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
|
@ -995,14 +994,13 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
terrno = TSDB_CODE_CFG_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||||
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
||||||
if (tsCfg == NULL) osDefaultInit();
|
if (tsCfg == NULL) osDefaultInit();
|
||||||
|
|
|
@ -15,11 +15,11 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
#include "mndOffset.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
#include "mndSubscribe.h"
|
#include "mndSubscribe.h"
|
||||||
|
@ -435,17 +435,6 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
goto SUBSCRIBE_OVER;
|
goto SUBSCRIBE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
// ref topic to prevent drop
|
|
||||||
// TODO make topic complete
|
|
||||||
SMqTopicObj topicObj = {0};
|
|
||||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
|
||||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
|
||||||
mInfo("subscribe topic %s by consumer:%" PRId64 ",cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
|
|
||||||
topicObj.refConsumerCnt);
|
|
||||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,8 +461,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t status = atomic_load_32(&pConsumerOld->status);
|
int32_t status = atomic_load_32(&pConsumerOld->status);
|
||||||
|
|
||||||
mInfo("receive subscribe request from old consumer:%" PRId64 ", current status: %s", consumerId,
|
mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
|
||||||
mndConsumerStatusName(status));
|
consumerId, mndConsumerStatusName(status), newTopicNum);
|
||||||
|
|
||||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -849,12 +838,15 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
|
||||||
|
mDebug("showing consumer %ld no assigned topic, skip", pConsumer->consumerId);
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
|
||||||
|
mDebug("showing consumer %ld", pConsumer->consumerId);
|
||||||
|
|
||||||
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
bool hasTopic = true;
|
bool hasTopic = true;
|
||||||
if (topicSz == 0) {
|
if (topicSz == 0) {
|
||||||
|
|
|
@ -512,9 +512,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
|
||||||
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
|
||||||
}
|
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
|
||||||
|
@ -524,6 +521,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
|
@ -174,28 +174,9 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
|
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
|
||||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
|
||||||
ASSERT(task);
|
|
||||||
qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* pDataBlock = NULL;
|
|
||||||
uint64_t ts = 0;
|
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
if (pDataBlock == NULL) break;
|
|
||||||
|
|
||||||
ASSERT(pDataBlock->info.rows != 0);
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
|
||||||
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
|
||||||
if (pRsp->withTbName) {
|
|
||||||
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
|
|
||||||
tqAddTbNameToRsp(pTq, uid, pRsp);
|
|
||||||
}
|
|
||||||
pRsp->blockNum++;
|
|
||||||
}
|
|
||||||
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
|
||||||
pRsp->withSchema = 1;
|
pRsp->withSchema = 1;
|
||||||
STqReader* pReader = pExec->pExecReader[workerId];
|
STqReader* pReader = pExec->pExecReader[workerId];
|
||||||
tqReaderSetDataMsg(pReader, pReq, 0);
|
tqReaderSetDataMsg(pReader, pReq, 0);
|
||||||
|
@ -232,9 +213,11 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->blockNum == 0) {
|
if (pRsp->blockNum == 0) {
|
||||||
pRsp->skipLogNum++;
|
pRsp->skipLogNum++;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,9 +60,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||||
taosArrayPush(pInfo->pBlockLists, &p);
|
taosArrayPush(pInfo->pBlockLists, &p);
|
||||||
}
|
}
|
||||||
} else if (type == STREAM_INPUT__TABLE_SCAN) {
|
/*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/
|
||||||
// do nothing
|
/*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/
|
||||||
ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -71,6 +70,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
@ -78,6 +78,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
|
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
||||||
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
||||||
|
|
|
@ -299,7 +299,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
pInfo->blockType = STREAM_INPUT__TABLE_SCAN;
|
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
|
||||||
int64_t uid = pOffset->uid;
|
int64_t uid = pOffset->uid;
|
||||||
int64_t ts = pOffset->ts;
|
int64_t ts = pOffset->ts;
|
||||||
|
|
||||||
|
|
|
@ -1123,6 +1123,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
|
||||||
uidCol[i] = getGroupId(pOperator, uidCol[i]);
|
uidCol[i] = getGroupId(pOperator, uidCol[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
|
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||||
|
@ -1216,13 +1217,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
|
// TODO clean data block
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
} else {
|
|
||||||
// data is filtered out, do clean
|
|
||||||
|
|
||||||
/*tDeleteSSDataBlock(&ret.data);*/
|
|
||||||
}
|
}
|
||||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -1240,6 +1237,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||||
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
||||||
|
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
// TODO scan meta
|
||||||
|
ASSERT(0);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
@ -1397,6 +1398,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doRawScan(SOperatorInfo* pInfo) {
|
||||||
|
//
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
||||||
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
|
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
|
||||||
|
@ -1409,6 +1415,19 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
||||||
return tableIdList;
|
return tableIdList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for subscribing db or stb (not including column),
|
||||||
|
// if this scan is used, meta data can be return
|
||||||
|
// and schemas are decided when scanning
|
||||||
|
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||||
|
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
|
||||||
|
// create operator
|
||||||
|
// create tb reader
|
||||||
|
// create meta reader
|
||||||
|
// create tq reader
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
|
@ -1452,16 +1471,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
|
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
|
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
|
||||||
if (pHandle->version > 0) {
|
if (pHandle->version > 0) {
|
||||||
pSTInfo->cond.endVersion = pHandle->version;
|
pTSInfo->cond.endVersion = pHandle->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||||
if (pHandle->initTableReader) {
|
if (pHandle->initTableReader) {
|
||||||
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||||
pSTInfo->dataReader = NULL;
|
pTSInfo->dataReader = NULL;
|
||||||
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) {
|
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1475,14 +1494,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->tqReader = pHandle->tqReader;
|
pInfo->tqReader = pHandle->tqReader;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSTInfo->interval.interval > 0) {
|
if (pTSInfo->interval.interval > 0) {
|
||||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pUpdateInfo = NULL;
|
pInfo->pUpdateInfo = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableScanOp = pTableScanOp;
|
pInfo->pTableScanOp = pTableScanOp;
|
||||||
pInfo->interval = pSTInfo->interval;
|
pInfo->interval = pTSInfo->interval;
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
pInfo->tableUid = pScanPhyNode->uid;
|
pInfo->tableUid = pScanPhyNode->uid;
|
||||||
|
|
|
@ -30,19 +30,24 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
pRead->pWal = pWal;
|
pRead->pWal = pWal;
|
||||||
pRead->pIdxFile = NULL;
|
pRead->pIdxFile = NULL;
|
||||||
pRead->pLogFile = NULL;
|
pRead->pLogFile = NULL;
|
||||||
pRead->curVersion = -5;
|
pRead->curVersion = -1;
|
||||||
pRead->curFileFirstVer = -1;
|
pRead->curFileFirstVer = -1;
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
pRead->capacity = 0;
|
pRead->capacity = 0;
|
||||||
if (cond)
|
if (cond) {
|
||||||
pRead->cond = *cond;
|
pRead->cond = *cond;
|
||||||
else {
|
} else {
|
||||||
pRead->cond.scanMeta = 0;
|
pRead->cond.scanMeta = 0;
|
||||||
pRead->cond.scanUncommited = 0;
|
pRead->cond.scanUncommited = 0;
|
||||||
|
pRead->cond.enableRef = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexInit(&pRead->mutex, NULL);
|
taosThreadMutexInit(&pRead->mutex, NULL);
|
||||||
|
|
||||||
|
/*if (pRead->cond.enableRef) {*/
|
||||||
|
/*walOpenRef(pWal);*/
|
||||||
|
/*}*/
|
||||||
|
|
||||||
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
|
pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
|
||||||
if (pRead->pHead == NULL) {
|
if (pRead->pHead == NULL) {
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
|
@ -151,24 +156,8 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
|
||||||
SWal *pWal = pRead->pWal;
|
SWal *pWal = pRead->pWal;
|
||||||
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
|
||||||
wDebug("wal version %ld match, no need to reset", ver);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRead->curInvalid = 1;
|
|
||||||
pRead->curVersion = ver;
|
|
||||||
|
|
||||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
|
||||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
|
||||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (ver < pWal->vers.snapshotVer) {
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalFileInfo tmpInfo;
|
SWalFileInfo tmpInfo;
|
||||||
tmpInfo.firstVer = ver;
|
tmpInfo.firstVer = ver;
|
||||||
|
@ -190,6 +179,31 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
|
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
|
||||||
|
|
||||||
pRead->curVersion = ver;
|
pRead->curVersion = ver;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||||
|
SWal *pWal = pRead->pWal;
|
||||||
|
if (!pRead->curInvalid && ver == pRead->curVersion) {
|
||||||
|
wDebug("wal version %ld match, no need to reset", ver);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRead->curInvalid = 1;
|
||||||
|
pRead->curVersion = ver;
|
||||||
|
|
||||||
|
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||||
|
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
|
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||||
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ver < pWal->vers.snapshotVer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
if (walReadSeekVerImpl(pRead, ver) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -198,6 +212,8 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
|
||||||
|
|
||||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
|
bool seeked = false;
|
||||||
|
|
||||||
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -205,17 +221,26 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
seeked = true;
|
||||||
}
|
}
|
||||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
while (1) {
|
||||||
if (contLen != sizeof(SWalCkHead)) {
|
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||||
if (contLen < 0) {
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
break;
|
||||||
|
} else if (contLen == 0 && !seeked) {
|
||||||
|
walReadSeekVerImpl(pRead, fetchVer);
|
||||||
|
seeked = true;
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
if (contLen < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
}
|
||||||
|
ASSERT(0);
|
||||||
|
pRead->curInvalid = 1;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(0);
|
|
||||||
pRead->curInvalid = 1;
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -379,20 +404,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
int64_t code;
|
int64_t contLen;
|
||||||
|
bool seeked = false;
|
||||||
|
|
||||||
if (pRead->pWal->vers.firstVer == -1) {
|
if (pRead->pWal->vers.firstVer == -1) {
|
||||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRead->curInvalid || pRead->curVersion != ver) {
|
|
||||||
if (walReadSeekVer(pRead, ver) < 0) {
|
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
|
||||||
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
|
||||||
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
|
||||||
|
@ -400,21 +419,35 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(taosValidFile(pRead->pLogFile) == true);
|
if (pRead->curInvalid || pRead->curVersion != ver) {
|
||||||
|
if (walReadSeekVer(pRead, ver) < 0) {
|
||||||
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
|
||||||
if (code != sizeof(SWalCkHead)) {
|
return -1;
|
||||||
if (code < 0)
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
else {
|
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
return -1;
|
seeked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walValidHeadCksum(pRead->pHead);
|
while (1) {
|
||||||
if (code != 0) {
|
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||||
|
if (contLen == sizeof(SWalCkHead)) {
|
||||||
|
break;
|
||||||
|
} else if (contLen == 0 && !seeked) {
|
||||||
|
walReadSeekVerImpl(pRead, ver);
|
||||||
|
seeked = true;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
}
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
contLen = walValidHeadCksum(pRead->pHead);
|
||||||
|
if (contLen != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -430,9 +463,9 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
pRead->capacity = pRead->pHead->head.bodyLen;
|
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||||
pRead->pHead->head.bodyLen) {
|
pRead->pHead->head.bodyLen) {
|
||||||
if (code < 0)
|
if (contLen < 0)
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
else {
|
else {
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
@ -449,8 +482,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = walValidBodyCksum(pRead->pHead);
|
contLen = walValidBodyCksum(pRead->pHead);
|
||||||
if (code != 0) {
|
if (contLen != 0) {
|
||||||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||||
pRead->curInvalid = 1;
|
pRead->curInvalid = 1;
|
||||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||||
|
|
Loading…
Reference in New Issue